Spring Boot, WebFlux 에서 Reactor Kafka 기본 연동구현

Spring Boot, WebFlux 환경에서 Reactive Kafka 를 이용해서 Producer 와 Consumer 를 구성한 기록을 남긴다.

Spring Boot, WebFlux 에서 Reactor Kafka 기본 연동구현
Photo by Efe Kurnaz / Unsplash

이전에 작성한 글의 프로젝트를 기본으로 하며, Member API 에서 Create 실행시 DB 에 전달된 오브젝트를 Producer 로 Kafka에 전달하고, 별도 프로젝트로 Consumer 를 구성해서 Reactive Kafka 환경에서의 기본적인 연동을 목표로 한다.

Kafka Cluster 가 구성 되어 있지 않은 환경이라면 전에 작성한 글을 참고하여 Docker Compose 로 간단히 구성하도록 한다.

TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner

1. Dependency

Spring Boot 에서 Reactive Kafka 를 사용하기 위해서 build.gradle 에 다음 의존성을 추가 해준다.

implementation 'io.projectreactor.kafka:reactor-kafka:1.3.19'
implementation 'org.springframework.kafka:spring-kafka:3.0.9'

2. Producer

1) Properties

Producer 를 구성하기 위한 기본적인 설정을 application.properties 에서 설정해주도록 한다.

  • spring.kafka.bootstrap-server
    카프카 클러스터 연결에 필요한 주소값 설정
  • spring.kafka.producer.key-serializer
    프로듀서가 메시지의 Key 값을 바이트로 변환하는데 사용하는 클래스를 지정하는 설정
  • spring.kafka.producer.value-serializer
    프로듀서가 메시지의 Value 값을 바이트로 변환하는데 사용하는 클래스를 지정하는 설정
# Kafka
spring.kafka.bootstrap-servers=127.0.0.1:10000,127.0.0.1:10001,127.0.0.1:10002

# Kafka Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

application.properties

2) Configure

ReactiveKafkaProducerTemplate은 리액터 카프카를 사용하여 카프카에 메시지를 전송하고 결과를 받을 수 있는 클래스이며, KafkaSender와 MessageConverter를 인자로 받아서 생성하고, send 메서드를 통해 다양한 형태의 메시지를 전송할 수 있다.

이러한 ReactiveKafkaProducerTemplate 클래스를 스프링 컨테이너에 Bean으로 등록하여 사용하기로 한다.

package com.example.webfluxapi.config;

import com.example.webfluxapi.dto.MemberDTO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.kafka.sender.SenderOptions;

@Configuration
public class KafkaConfig {

    @Bean
    public ReactiveKafkaProducerTemplate<String, MemberDTO.Item> reactiveKafkaProducerTemplate(
            KafkaProperties props
    ) {
        return new ReactiveKafkaProducerTemplate<>(
                SenderOptions.create(props.buildProducerProperties())
        );
    }

}

config/KafkaConfig.java

3) Service

Member Service 에 Kafka 에 메시지를 보내기 위한 메소드를 추가한다.

package com.example.webfluxapi.service;

import com.example.webfluxapi.dto.MemberDTO;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;

public interface MemberService {
    Mono<MemberDTO.Item> create(MemberDTO.Create dto);

    Mono<MemberDTO.Item> item(Integer id);

    Flux<MemberDTO.Item> list(Integer page, Integer limit);

	// Kafka 에 메시지를 전송하고자 하는 메소드 정의
    Mono<SenderResult<Void>> KafkaSendMessage(String topic, String key, MemberDTO.Item message);
}

service/MemberService.java

package com.example.webfluxapi.service.impl;

import com.example.webfluxapi.dto.MemberDTO;
import com.example.webfluxapi.mapper.MemberMapper;
import com.example.webfluxapi.repository.MemberRepository;
import com.example.webfluxapi.service.MemberService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;

import java.time.LocalDateTime;

@Service
public class MemberServiceDB implements MemberService {
    private final MemberRepository memberRepository;
    private final MemberMapper memberMapper;
    //
    private final ReactiveKafkaProducerTemplate<String, MemberDTO.Item> kafkaSender;

    @Autowired
    public MemberServiceDB(MemberRepository memberRepository, MemberMapper memberMapper, ReactiveKafkaProducerTemplate<String, MemberDTO.Item> kafkaSender) {
        this.memberRepository = memberRepository;
        this.memberMapper = memberMapper;
        // ReactiveKafkaConsumerTemplate 을 kafkaSender 변수에 주입
        this.kafkaSender = kafkaSender;
    }

    public Mono<SenderResult<Void>> KafkaSendMessage(String topic, String key, MemberDTO.Item message) {
        return kafkaSender.send(topic, key, message);
    }

    @Override
    public Mono<MemberDTO.Item> create(MemberDTO.Create dto) {
        dto.setCreatedAt(LocalDateTime.now());
        return memberRepository.save(memberMapper.ToCreateEntity(dto)).map(memberMapper::ToDTOItem);
    }
}

service/impl/MemberServiceDB.java

4) Controller

Member API 에 Post 로 사용자 정보를 DB 에 저장할때 Kafka 에도 메시지를 전달 하도록 구현 하였고 가급적 Kafka 서버에 부하가 있거나 접속이 안되는 경우(버퍼에 쌓아두었다가 추후에 일괄 발송된다)에도 지연이 없도록 처리하는 것을 목표로 한다.

아래 코드에서 주요 부분만 요약 해본다.

  • Mono<MemberDTO.Item> dto = memberService.create(memberDTO);
    처음 데이터를 전달받아 memberService 의 create 메소드를 호출해서 DB 에 데이터를 저장하고 Member.Item 객체를 Mono 스트림으로 전달한다.
  • return dto.doOnSuccess(member -> {...})
    Mono 스트림이 성공적으로 완료 될때 실행할 콜백함수를 등록하고 그 콜백함수의 내용은 아래 설명으로 이어진다.
  • 콜백함수 내 memberService.KafkaSendMessage()
    콜백 함수 내에서는 전송결과 SenderResult 를 Mono 스트림으로 리턴하는 KafkaSendMessage 를 Topic, Key, Value 를 인자로 묶어서 호출하고 있으며, .publishOn(Schedulers.boundedElastic()) 코드로 Kafka 에 메시지를 보내는 작업을 별도의 스레드에서 수행 하도록 처리하고 있다.
    또한 KafkaSendMessage 도 성공여부에 따라 SenderResult 값을 받아서 메타 데이터를 로깅하거나, 에러여부에 따라 에러 메시지를 노출하도록 한다.
package com.example.webfluxapi.controller;

import com.example.webfluxapi.common.ApiResponse;
import com.example.webfluxapi.dto.MemberDTO;
import com.example.webfluxapi.service.MemberService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Slf4j
@RestController
@RequestMapping("/v1/api")
public class MemberController {

    private final MemberService memberService;

	// application.properties 에 정의한 토픽을 가져온다
    @Value("${config.producer.member.topic:default}")
    private String memberProducerTopic;

    @Autowired
    public MemberController(MemberService memberService) {
        this.memberService = memberService;
    }

    @PostMapping("/member")
    public Mono<ApiResponse> create(
            @Validated
            @RequestBody
            MemberDTO.Create memberDTO
    ) {
        Mono<MemberDTO.Item> dto = memberService.create(memberDTO);
        return dto.doOnSuccess(member -> {
                    log.info("db insert success");

                    memberService.KafkaSendMessage(memberProducerTopic, member.getId().toString(), member)
                            .publishOn(Schedulers.boundedElastic())
                            .doOnSuccess(result -> {
                                RecordMetadata meta = result.recordMetadata();
                                log.info("kafka send success : topic {} / {}", meta.topic(), meta.offset());
                            })
                            .doOnError(error -> {
                                log.info("kafka send error");
                                log.info(error.toString());
                            })
                            .subscribe();
                })
                .doOnError(error -> {
                    log.info("db insert error");
                    log.info(error.toString());
                })
                .map(member -> {
                    log.info("member reponse");
                    return ApiResponse.builder()
                            .code(200)
                            .message("ok")
                            .data(member)
                            .build();
                });
    }

}

controller/MemberController.java

TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner

3. Consumer

1) Properties

Consumer 를 구성하기 위한 기본적인 설정을 application.properties 에서 설정해주도록 한다.

  • spring.kafka.bootstrap-server
    카프카 클러스터 연결에 필요한 주소값 설정
  • spring.kafka.consumer.key-deserializer
    컨슈머가 메시지의 Key 를 바이트에서 객체로 변환하는데 사용하는 클래스를 지정하는 설정
  • spring.kafka.consumer.value-deserializer
    컨슈머가 메시지의 Value 를 바이트에서 객체로 변환하는데 사용하는 클래스를 지정하는 설정
  • spring.kafka.properties.spring.json.trusted.packages=*
    컨슈머가 JSON 형식으로 deserialize 할때, 신뢰할 수 있는 패키지 목록을 지정하는 설정이며 * 는 모든 패키지를 신뢰한다는 의미다.
  • spring.kafka.consumer.properties.spring.json.use.type.headers=false
    컨슈머가 JSON 형식으로 deserialize 할때, 헤더에 있는 타입정보를 무시하고 리스너 메서드의 타입을 사용하도록 지정하는 설정이다.
# Kafka
spring.kafka.bootstrap-servers=127.0.0.1:10000,127.0.0.1:10001,127.0.0.1:10002

# Kafka Consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

# Kafka Consumer Deserializer
spring.kafka.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.use.type.headers=false

# Custom Properties
config.consumer.member.topic=tester
config.consumer.member.group-id=member-consumer-group

application.properties

2) Configure

ReactiveKafkaConsumerTemplate 메소드는 카프카 컨슈머를 생성하고 리턴하는 메소드이며 이를 스프링 컨테이너에 Bean 으로 등록하여 사용하기로 한다.

추가로 application.properties 에 설정한 config.consumer.member.topic 부분과 config.consumer.member.group-id 부분이 Consumer 별로 구독하는 topic 이 다르고 group-id 도 달라질 듯해서 서비스별로 따로 구분해서 생성하는 걸로 했다.

package com.example.consumer.config;

import com.example.consumer.dto.MemberDTO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.Collections;
import java.util.Map;

@Configuration
public class KafkaConfig {
    @Value("${config.consumer.member.topic:default}")
    private String memberConsumerTopic;
    @Value("${config.consumer.member.group-id:consumer-group}")
    private String memberGroupId;

    @Bean
    public ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> reactiveMemberConsumerTemplate(
            KafkaProperties props
    ) {
        Map<String, Object> consumerProperties = props.buildConsumerProperties();
        consumerProperties.putAll(Map.of(
                "group.id", memberGroupId,
				// value 값을 deserialize 할때 타입을 참조할 dto 을 정의
                "spring.json.value.default.type", MemberDTO.Item.class
        ));

        ReceiverOptions<String, MemberDTO.Item> cfg = ReceiverOptions.create(consumerProperties);
        cfg = cfg.subscription(Collections.singletonList(memberConsumerTopic));
        return new ReactiveKafkaConsumerTemplate<>(cfg);
    }

}

config/KafkaConfig.java

3) Service

KafkaService 에서는 ApplicationRunner 를 구현 후 run 메소드를 오버라이딩 해서 어플리케이션이 시작될때 실행 되도록 하였으며, 해당 메소드에서는 memberConsumerTemplate 으로 메시지를 자동 커밋하는 스트림을 생성하여 Value 값을 MemberDTO.Item 으로 변환하고 로그를 찍어 주도록 했다.

package com.example.consumer.service;

import com.example.consumer.dto.MemberDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaService implements ApplicationRunner {

    private final ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> memberConsumerTemplate;

    @Autowired
    public KafkaService(ReactiveKafkaConsumerTemplate<String, MemberDTO.Item> reactiveMemberConsumerTemplate) {
        this.memberConsumerTemplate = reactiveMemberConsumerTemplate;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.memberConsumerTemplate
                .receiveAutoAck()
                .doOnNext(r -> {
                    MemberDTO.Item member = r.value();
                    log.info("Member: {} {}", member.getId(), member.getName());
                })
                .doOnError(e -> {
                    System.out.println("Error receiving: " + e);
                })
                .subscribe();
    }
}

service/KafkaService.java

4) Application
package com.example.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

ConsumerApplication.java

TaskVibes: Daily Planner - Apps on Google Play
daily planner, todo list, check list, day planner
Spring Boot, Reactor Kafka 에서의 순서보장과 중복방지에 대한 기록 (feat. redis)
Spring Boot, Reactor Kafka 에서의 순서보장과 Redis 를 이용한 중복방지에 대한 기록