Spring Kafka - 여러 타입의 메시지를 하나의 Topic에 발행하고 수신하는 방법 (MessageConverter)
개요
Spring Kafka는 @KafkaListener(메시지 받기), KafkaTemplate(메시지 보내기), application.yml(설정) 등 Spring의 익숙한 방식으로 Kafka의 복잡한 설정을 쉽게 다룰 수 있도록 해준다.
Kafka의 기본 개념과 Spring Kafka 사용 방법은 이전 글을 참고바란다.
Spring Kafka - Spring Boot에서 Kafka 사용하기
개요이직에 성공했다. 이직하는 회사에서는 Kafka를 사용한다. 나는 Kafka를 사용해 본 적이 없기 때문에 입사일이 오기 전에 미리 Kafka에 대해 알아보고 Spring을 통해 간단하게 Kafka를 사용해 보자 A
green-bin.tistory.com
개발 환경
- Java 21
- Spring Boot 3.4.x
- Spring Kafka 3.3.4
- Gradle
- IntelliJ
Dependencies
build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka' // Kafka
testImplementation 'org.springframework.kafka:spring-kafka-test' // Kafka 테스트
}
Configuration
application.yml
Kafka 속성 구성을 클래스 단에서 할 수도 있고 application.yml/properties에서 설정해둘 수도 있다. 예시를 위해 servers 속성만 application.yml에 작성해두었다.
spring:
kafka:
bootstrap-servers: localhost:9092
KafkaProducerConfig
Kafka 프로듀서를 설정하기 위한 Configuration 클래스이다. Kafka 메시지를 발행하기 위해 필요한 ProducerFactory와 KafkaTemplate를 정의한다. JSON 형식의 메시지를 발행하기 위해 JsonSerializer를 사용하고, 메시지 Key는 문자열로 직렬화된다. KafkaTemplate를 사용해 쉽게 발행할 수 있다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS,
"demoViewDTO1:com.chanbinme.practice02.kafka.dto.DemoViewDTO1, "
+ "demoViewDTO2:com.chanbinme.practice02.kafka.dto.DemoViewDTO2");
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}
}
- ProducerFactory: Kafka 프로듀서를 생성하는 팩토리
- JsonSerializer.TYPE_MAPPINGS: Kafka 브로커 주소를 설정한다.
KafkaConsumerConfig
KafkaConsumerConfig 클래스는 Kafka 컨슈머를 설정하기 위한 Spring Configuration 클래스이다.
MessageConverter를 통해 메시지 타입에 따라 자동으로 알맞은 DTO로 변환하여 서비스 로직에서 편리하게 사용할 수 있다.
이전에 다뤘던 내용은 제외하고 멀티 타입을 위해 추가된 내용에 대해서만 다루겠다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.chanbinme.practice02.kafka.dto");
props.put(JsonDeserializer.TYPE_MAPPINGS,
"demoViewDTO1:com.chanbinme.practice02.kafka.dto.DemoViewDTO1, "
+ "demoViewDTO2:com.chanbinme.practice02.kafka.dto.DemoViewDTO2");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setRecordMessageConverter(multiTypeConverter());
return factory;
}
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("demoViewDTO1", DemoViewDTO1.class);
mappings.put("demoViewDTO2", DemoViewDTO2.class);
typeMapper.setIdClassMapping(mappings);
typeMapper.addTrustedPackages("com.chanbinme.practice02.kafka.dto");
converter.setTypeMapper(typeMapper);
return converter;
}
}
내용이 길기 때문에 메서드별로 분리해서 확인해보자
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.chanbinme.practice02.kafka.dto");
props.put(JsonDeserializer.TYPE_MAPPINGS,
"demoViewDTO1:com.chanbinme.practice02.kafka.dto.DemoViewDTO1, "
+ "demoViewDTO2:com.chanbinme.practice02.kafka.dto.DemoViewDTO2");
return new DefaultKafkaConsumerFactory<>(props);
}
- ConsumerFactory: Kafka 컨슈머를 생성하는 팩토리이다.
- JsonDeserializer.TRUSTED_PACKAGES: 지정한 패키지의 클래스만 역직렬화를 허용한다.
- JsonDeserializer.TYPE_MAPPINGS: Kafka 메시지 Type ID(demoViewDTO1)와 실제 Java 클래스(DemoViewDTO1)를 매핑한다.
구조: "{Kafka 메시지 Type ID}:{Java 클래스}"
예시: "demoViewDTO1:com.chanbinme.practice02.kafka.dto.DemoViewDTO1"
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setRecordMessageConverter(multiTypeConverter());
return factory;
}
- factory.setRecordMessageConverter(multiTypeConverter()): factory에 커스텀한 MessageConverter를 설정한다.
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("demoViewDTO1", DemoViewDTO1.class);
mappings.put("demoViewDTO2", DemoViewDTO2.class);
typeMapper.setIdClassMapping(mappings);
typeMapper.addTrustedPackages("com.chanbinme.practice02.kafka.dto");
converter.setTypeMapper(typeMapper);
return converter;
}
- RecordMessageConverter: Kafka 메시지와 Spring의 Message 객체 간 변환을 담당한다. 주로 메시지 직렬화/역직렬화를 커스터마이징할 때 사용된다.
- StringJsonMessageConverter: Kafka 메시지를 JSON 문자열과 Java 객체 간에 변환해주는 메시지 컨버터
- converter.setTypeMapper(typeMapper): 어떤 타입으로 변환할지 결정하는 타입 매퍼를 지정할 수 있다.
- DefaultJackson2JavaTypeMapper: Kafka 메시지의 타입 정보를 기반으로, JSON 데이터를 적절한 Java 객체로 변환(역직렬화)할 때 어떤 클래스 타입으로 변환할지 결정하는 역할을 한다. Kafka 메시지의 헤더에서 타입 정보(__TypeId__)를 읽어와, 미리 매핑해둔 Type ID(demoViewDTO1)와 Java 클래스(DemoViewDTO1) 간의 매핑 정보를 참조해 해당 타입으로 변환한다.
- typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID): 메시지의 타입 정보를 해석할 때, 헤더의 Type ID를 우선적으로 사용하도록 설정한다.
- typeMapper.setIdClassMapping(mappings): 타입 토큰과 Java 클래스의 매핑 정보를 설정한다.
DemoViewDTO
Kafka 메시지의 페이로드로 사용하기 위한 객체. Java17에서 도입된 record를 사용하여 DTO로 만들었다.
@Builder
public record DemoViewDTO1(String name, int age, String id) {
}
@Builder
public record DemoViewDTO2(String name, int age, String id) {
}
메시지 발행
메시지 발행은 이전과 달라지는 부분은 없다.
@Component
@Slf4j
@RequiredConstructor
public class KafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendPayload(String topic, Object payload) {
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, payload);
log.info("Sending message: {}", payload);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.info("Error sending message: {}", ex.getMessage());
} else {
log.info("Message sent successfully: {}", result.getRecordMetadata().offset());
}
});
}
}
메시지 수신
KafkaConsumerConfig에서 설정해두었던 구성을 바탕으로 헤더의 __TypeId__을 확인하고 매핑한다.(예: demoViewDTO1 → DemoViewDTO1.class로 매핑)
매핑된 클래스와 일치하는 파라미터를 가진 메서드를 호출하게 된다. (DemoViewDTO1 → handleDTO1() 호출)
@Slf4j
@Component
@Getter
@KafkaListener(groupId = "chanbinme-groupId", topics = "chanbinme-topic")
public class KafkaConsumer {
private CountDownLatch latch = new CountDownLatch(10);
private List<Object> payloads = new ArrayList<>();
@KafkaHandler
public void handleDTO1(DemoViewDTO1 dto, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Received message from topic {}: {}", topic, dto);
payloads.add(dto);
latch.countDown();
}
@KafkaHandler
public void handleDTO2(DemoViewDTO2 dto, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Received message from topic {}: {}", topic, dto);
payloads.add(dto);
latch.countDown();
}
@KafkaHandler(isDefault = true)
public void handleDefault(Object dto) {
log.info("Received unknown type: {}", dto);
payloads.add(dto);
latch.countDown();
}
}
- @KafkaListener: 이 클래스가 chanbinme-groupId 그룹으로 chanbinme-topic 토픽의 메시지를 구독함을 선언한다.
- @KafkaHandler: @KafkaListener가 클래스 단에 설정되어 있는 경우 메서드에 @KafkaHandler를 추가해주어야 한다.
- @KafkaHandler(isDefault = true): 페이로드가 일치하는 메서드가 없으면 사용할 기본 메서드를 지정해준다.
- @Header(KafkaHeaders.RECEIVE_TOPIC): Spring Kafka에서 Kafka 메시지를 처리할 때, 해당 메시지가 어느 Topic에서 수신되었는지 정보를 메서드 파라미터로 전달해준다. 토픽을 로그로 확인하고 싶어서 추가했다.(없어도 된다.)
Topic 외에도 Partition, Offset 등 다양한 정보를 확인할 수 있다.
테스트
@SpringBootTest
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
ports = 9092)
class KafkaConsumerTest {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProducer kafkaProducer;
@Test
void giveEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
String topic = "chanbinme-topic";
DemoViewDTO1 payload = DemoViewDTO1.builder()
.id("id_001")
.name("김찬빈")
.age(32)
.build();
DemoViewDTO2 payload2 = DemoViewDTO2.builder()
.id("id_002")
.name("김찬갱")
.age(25)
.build();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
kafkaProducer.sendMessage(topic, payload);
} else {
kafkaProducer.sendMessage(topic, payload2);
}
}
// 모든 메시지를 수신할 때까지 기다림
kafkaConsumer.getLatch().await(10, TimeUnit.SECONDS);
// 검증 로직 추가
assertEquals(10, kafkaConsumer.getPayloads().size(), "수신된 메시지 개수가 일치하지 않습니다.");
assertTrue(kafkaConsumer.getPayloads().contains(payload), "payload가 수신되지 않았습니다.");
assertTrue(kafkaConsumer.getPayloads().contains(payload2), "payload2가 수신되지 않았습니다.");
}
}
2025-04-15T18:19:18.118+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T18:19:18.120+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T18:19:18.121+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T18:19:18.122+09:00 INFO 67242 --- [ Test worker] c.c.practice02.kafka.KafkaProducer : Sending message: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.696+09:00 INFO 62189 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2025-04-15T11:45:36.705+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 0
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 1
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 2
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 3
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 4
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 5
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 6
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 7
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 8
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 9
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO1[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message from topic chanbinme-topic: DemoViewDTO2[name=김찬갱, age=25, id=id_002]
참조
@KafkaListener on a Class :: Spring Kafka
When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so: Startin
docs.spring.io
https://www.baeldung.com/spring-kafka
Intro to Apache Kafka with Spring | Baeldung
A quick and practical guide to using Apache Kafka with Spring.
www.baeldung.com