Resilience4j, CircuitBreaker & TimeLimiter 적용에 대한 기록

Resilience4j, CircuitBreaker & TimeLimiter 적용에 대한 기록
Photo by Randy Tarampi / Unsplash

Resillience4j 의 Circuit Breaker 와 Time Limiter 에 대한 기록을 남긴다.

1. 개요

일반적으로 서비스를 운영하다 보면 특정 이벤트나 시간대에 트래픽이 몰리는 경우가 있을 수 있고 부분적으로 DB 나 Cache 에 장애가 발생 할 수 있는 상황이 있다.

이런 경우 어플리케이션 레벨에서 대응 할 수 있는 PlanB 를 준비 할 수 있는데 이때 Resillience4j의 Circuit Breaker 를 사용할 수 있겠다.

위 이미지는 리뷰서비스에 장애가 생겼을때의 상황을 그려보았고 설명하자면 정상적으로 서비스를 할때는 Circuit Breaker 가 닫혀(CLOSE)있다가 장애가 감지 되면 Circuit Breaker 가 열리(OPEN)면서 미리 정의한 Fallback 함수로 정상적으로 서비스 할때의 기능 그대로 응답 하게 된다. 이로서 앞단에 있는 다른 서비스에 영향을 줄이면서 서비스의 복구를 OPEN, HALF_OPEN 상태를 오가면서 점검하다가 복구가 되면 다시 정상 서비스(CLOSE)를 하게되는 과정을 거치게 된다.

2. 구현

1) Dependency

공통적으로 사용해야 하는 라이브러리가 있고 Resillience4j 에서 사용하고 싶은 기능만 선택적으로 가져다 쓰면 된다.

    // 공통
    implementation 'org.springframework.boot:spring-boot-starter-actuator:3.1.2'
    implementation 'org.springframework.boot:spring-boot-starter-aop:3.1.3'
    implementation 'io.github.resilience4j:resilience4j-spring-boot3:2.1.0'
    implementation 'io.github.resilience4j:resilience4j-reactor:2.1.0'
    // 선택
    implementation 'io.github.resilience4j:resilience4j-circuitbreaker:2.1.0'
    //    implementation 'io.github.resilience4j:resilience4j-retry:2.1.0'
    //    implementation 'io.github.resilience4j:resilience4j-ratelimiter:2.1.0'
    implementation 'io.github.resilience4j:resilience4j-timelimiter:2.1.0'
    //    implementation 'io.github.resilience4j:resilience4j-bulkhead:2.1.0'
    //    implementation 'io.github.resilience4j:resilience4j-cache:2.1.0'

build.gradle

2) Configure

instances 뒤에 write-fallback 이나 write-limiter 의 경우 해당 기능의 이름을 지정하고 해당 이름의 설정을 각기 따로 정의 할 수 있다.

resilience4j 설정은 application.properties 이외에도 CircuitBreakerConfig 에서도 설정 가능하다.

# resilience4j 설정
resilience4j.circuitbreaker.instances.write-fallback.failure-rate-threshold=50
resilience4j.circuitbreaker.instances.write-fallback.sliding-window-size=10
resilience4j.circuitbreaker.instances.write-fallback.sliding-window-type=COUNT_BASED
resilience4j.circuitbreaker.instances.write-fallback.wait-duration-in-open-state=10s
resilience4j.timelimiter.instances.write-limiter.timeout-duration=250ms

# actuator 설정
management.endpoints.web.base-path=/mng/actuator
management.endpoints.web.exposure.include=*
management.endpoint.circuitbreakerevents.enabled=true

application.properties

3-1) Annotation 구현 방식

CreateItem 는 Redis 와 MongoDB 에 각각 데이터를 생성하는 역할을 하고 있고 서버나 기능에 문제가 생겼을 경우 Circuit Breaker 을 실행하여 데이터를 Kafka 에 전달하는 Fallback 을 수행 하도록 하였다. 추가로 서비스의 성능을 보장하기 위해 TimeLimiter 를 추가 하였고 TimeLimiter 에서 설정한 시간 보다 오래 걸려도 서비스가 정상이 아닌 것으로 간주하고 데이터를 Kafka 에 전달하는 Fallback 을 수행하도록 한다.

CreateItemFallback 의 경우 timeout을 설정 했는데 Kafka 서버가 내려갔을 경우 Kafka 설정의 delivery.timeout.ms (기본값이 120,000 ms)만큼 대기하는 상황을 막기 위해 설정했다. (기본값이 2분이고 여러 요청이 들어오는 상황이면 어플리케이션 서버 자체가 멈추는 상황까지 갈 것이다)

논외로 kafka config 에서 delivery.timeout.ms 를 적게 조절하면 요청에 대한 응답을 짧게 끊을 수 있겠지만, delivery.timeout.ms 값은 request.timeout.ms 와 linger.ms 의 합보다 커야 하고 request.timeout.ms 의 경우 record 를 보내고 ack 를 받는 시간을 나타내는데 이 시간을 최소화 해도 하나의 API 요청이 응답까지 이루어 지는 시간으로는 적절하지 않다고 판단했다.

    @CircuitBreaker(name = "write-fallback", fallbackMethod = "CreateItemFallback")
    @TimeLimiter(name = "write-limiter", fallbackMethod = "CreateItemFallback")
    public Mono<BoardDocumentDTO.Create> CreateItem(BoardDocumentDTO.Create dto) {
        return redisHash.increment("counter", "board", 1)
                .flatMap((Long counter) -> {
                    LocalDateTime now = LocalDateTime.now();
                    dto.setScore(counter);
                    dto.setModifiedAt(now);
                    dto.setCreatedAt(now);
                    dto.setCreatedTs(System.currentTimeMillis());
                    return boardRepository.save(boardMapper.ItemToEntity(dto));
                })
                .flatMap(board -> {
                    dto.setId(board.getId());

                    Object encode;
                    try {
                        encode = objectMapper.writeValueAsBytes(board);
                    } catch (JsonProcessingException e) {
                        return Mono.error(new RuntimeException(e));
                    }
                    return redisZSet.add("board", encode, dto.getScore())
                            .flatMap(bool -> {
                                if (bool) {
                                    return Mono.just(dto);
                                } else {
                                    return Mono.error(new RuntimeException("redis zset add error"));
                                }
                            });
                })
                // Time Limiter 동작확인
                // .delayElement(java.time.Duration.ofMillis(1000))
                ;
    }

    @Override
    public Mono<BoardDocumentDTO.Create> CreateItemFallback(BoardDocumentDTO.Create dto, Throwable throwable) {
        return kafkaSender.send("board-failure-queue", dto)
                .timeout(java.time.Duration.ofMillis(800), Mono.error(new RuntimeException("Kafka Send Timeout")))
                .flatMap(result -> {
                    Mono<BoardDocumentDTO.Create> res;
                    if (result.exception() == null) {
                        res = Mono.just(dto);
                    } else {
                        res = Mono.error(new RuntimeException("Kafka Send Error"));
                    }
                    return res;
                });
    }
3-2) 함수형 구현 방식

아래와 같이 선언하는 클래스 내 생성자에 정의해도 되고 기능별로 상세설정이 가능하다.

    @Autowired
    public BoardServiceDODB(
            CircuitBreakerRegistry circuitBreakerRegistry,
            TimeLimiterRegistry timeLimiterRegistry,
    ) {
        this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("write-fallback", CircuitBreakerConfig.ofDefaults());
        this.timeLimiter = timeLimiterRegistry.timeLimiter("write-limiter", TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(300)).build());
    }

어노테이션 대신 아래와 같이 transformDeferred 와 CallNotPermittedException.class 예외로 fallback 을 지정할 수 있다.

    public Mono<BoardDocumentDTO.Create> CreateItem(BoardDocumentDTO.Create dto) {

        return redisHash.increment("counter", "board", 1)
                .flatMap((Long counter) -> {
                    LocalDateTime now = LocalDateTime.now();
                    dto.setScore(counter);
                    dto.setModifiedAt(now);
                    dto.setCreatedAt(now);
                    dto.setCreatedTs(System.currentTimeMillis());
                    return boardRepository.save(boardMapper.ItemToEntity(dto));
                })
                .flatMap(board -> {
                    dto.setId(board.getId());

                    Object encode;
                    try {
                        encode = objectMapper.writeValueAsBytes(board);
                    } catch (JsonProcessingException e) {
                        return Mono.error(new RuntimeException(e));
                    }
                    return redisZSet.add("board", encode, dto.getScore())
                            .flatMap(bool -> {
                                if (bool) {
                                    return Mono.just(dto);
                                } else {
                                    return Mono.error(new RuntimeException("redis zset add error"));
                                }
                            })
                            // Time Limiter 동작확인
                            .delayElement(java.time.Duration.ofMillis(2000))
                            ;
                })
                .transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))
                .transformDeferred(TimeLimiterOperator.of(this.timeLimiter))
                .onErrorResume(CallNotPermittedException.class, e -> {
                    log.error("CallNotPermittedException");
                    return CreateItemFallback(dto, e);
                });
    }
CircuitBreaker
Getting started with resilience4j-circuitbreaker