Spring Kafka - Spring Boot에서 Kafka 사용하기
개요
이직에 성공했다. 이직하는 회사에서는 Kafka를 사용한다. 나는 Kafka를 사용해 본 적이 없기 때문에 입사일이 오기 전에 미리 Kafka에 대해 알아보고 Spring을 통해 간단하게 Kafka를 사용해 보자
Apache Kafka는 대용량 데이터를 빠르고 안정적으로 주고받을 수 있도록 도와주는 실시간 분산 메시지 큐이자 이벤트 스트리밍 플랫폼이다.(주문, 결제, 로그, 알림 등 다양한 이벤트를 빠르게 여러 서비스에 전달해 준다.) 쉽게 말해서, 여러 시스템이나 서비스 사이에서 데이터를 주고받을 때 중간에서 데이터를 안전하게 전달하고, 필요할 때 여러 곳에서 동시에 꺼내 사용할 수 있도록 해주는 우체국 같은 역할을 한다. 넷플릭스, 배달의 민족, 우버 등 실시간, 대용량 데이터 처리가 필요한 서비스에서 널리 쓰이고 있다.
Spring Kafka는 @KafkaListener(메시지 받기), KafkaTemplate(메시지 보내기), application.yml(설정) 등 Spring의 익숙한 방식으로 Kafka의 복잡한 설정을 쉽게 다룰 수 있도록 해준다.
Kafka 주요 개념
Component | Description |
Producer | - 데이터를 생성해서 Kafka로 보내는 역할. 메시지를 Topic에 발행하고, 각 메시지는 특정 Partition에 저장한다. |
Consumer | - Kafka에서 데이터를 읽어오는 역할. - 여러 Consumer가 그룹을 이루면, 같은 Topic의 Partition을 나눠서 병렬로 처리한다. - Consumer Group 안에서는 각 Partition의 메시지를 한 Consumer만 읽으므로 중복 없이 효율적으로 데이터를 처리한다. |
Topic | - Kafka에서 데이터를 분류하는 우편함 같은 역할. - Producer는 Topic에 메시지를 발행, Consumer는 Topic에서 메시지를 읽는다. - "order", "payment", "log" 등 주제별로 데이터를 구분해서 저장할 수 있다. |
Partition | - Topic을 여러 조각으로 나누어 저장한다. - 각 Partition은 메시지의 순서를 보장하고, 데이터가 많아져도 여러 서버에 분산 저장 가능하다. |
Offset | - Partition 안에서 각 메시지의 고유 번호. - Consumer는 Offset을 기억해 어디까지 읽었는지 추적 가능하다. |
Replication | - Partition의 데이터를 여러 브로커에 복제하여 장애 발생 시 데이터 손실 방지한다. - 각 Partition에는 리더(Leader)와 팔로워(Follower)가 있으며, 리더가 쓰기/읽기를 담당하고 팔로워가 데이터를 복제한다. |
Zookeeper | - Kafka 클러스터의 브로커, 파티션 등 메타데이터를 관리하는 중앙 관리 시스템. - 최근에는 Zookeeper 없이도 동작하는 Kafka 모드가 도입되고 있다. |
개발 환경
- Java 21
- Spring Boot 3.4.x
- Spring Kafka 3.3.4
- Gradle
- IntelliJ
Dependencies
build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka' // Kafka
testImplementation 'org.springframework.kafka:spring-kafka-test' // Kafka 테스트
}
Configuration
application.yml
Kafka 속성 구성을 클래스 단에서 할 수도 있고 application.yml/properties에서 설정해둘 수도 있다. 예시를 위해 servers 속성만 application.yml에 작성해두었다.
spring:
kafka:
bootstrap-servers: localhost:9092
KafkaProducerConfig
Kafka 프로듀서를 설정하기 위한 Configuration 클래스이다. Kafka 메시지를 발행하기 위해 필요한 ProducerFactory와 KafkaTemplate를 정의한다. JSON 형식의 메시지를 발행하기 위해 JsonSerializer를 사용하고, 메시지 Key는 문자열로 직렬화된다. KafkaTemplate를 사용해 쉽게 발행할 수 있다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, DemoViewDTO> productFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, DemoViewDTO> kafkaTemplate() {
return new KafkaTemplate<>(productFactory());
}
}
- @Configuration: Spring에서 이 클래스가 설정 클래스임을 나타낸다.
- @Value: application.yml 파일에서 spring.kafka.bootstrap-servers 값을 주입받아 Kafka 브로커 주소를 설정한다.
- ProducerFactory: Kafka 프로듀서를 생성하는 팩토리
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커 주소를 설정한다.
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: 메시지 Key를 문자열로 직렬화하는 클래스(StringSerializer)를 설정한다.
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: 메시지 Value를 JSON으로 직렬화하는 클래스(JsonSerializer)를 설정한다.
- KafkaTemplate: Kafka 메시지를 발행하기 위해 필요한 템플릿이다. ProducerFactory를 사용해 생성한다.
KafkaConsumerConfig
KafkaConsumerConfig 클래스는 Kafka 컨슈머를 설정하기 위한 Spring Configuration 클래스이다. Kafka 메시지를 수신하기 위해 필요한 ConsumerFactory와 ConcurrentKafkaListenerContainerFactory를 정의한다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, DemoViewDTO> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "chanbinme-groupId");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
JsonDeserializer<DemoViewDTO> deserializer = new JsonDeserializer<>(DemoViewDTO.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
- @EnableKafka: Spring Kafka를 활성화하기 위한 어노테이션이다. Kafka Listener를 사용할 수 있도록 설정한다.
- ConsumerFactory: Kafka 컨슈머를 생성하는 팩토리이다.
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG: Kafka 브로커 주소를 설정한다.
- ConsumerConfig.GROUP_ID_CONFIG: 컨슈머 그룹 ID를 설정한다.
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG: 메시지 key를 문자열로 역직렬화하는 클래스(StringDeserializer)를 설정한다.
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG: 메시지 value을 JSON으로 역직렬화하는 클래스(JsonDeserializer)를 설정한다.
- ConcurrentKafkaListenerContainerFactory: Kafka Listener 컨테이너를 생성하는 팩토리 클래스
- setConsumerFactory: 컨슈머 팩토리를 설정한다.
- setRecordFilterStrategy: 메시지를 필터링하는 전략을 사용할 수 있다.
DemoViewDTO
Kafka 메시지의 페이로드로 사용하기 위한 객체. Java17에서 도입된 record를 사용하여 DTO로 만들었다.
@Builder
public record DemoViewDTO(String name, int age, String id) {
}
메시지 발행
KafkaTemplate 클래스를 사용해서 메시지를 발행할 수 있다. send 메서드에서 반환되는 CompletableFuture 객체를 통해 메시지 발행 상태를 관리할 수 있다.
@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, DemoViewDTO> kafkaTemplate;
public void sendMessage(String topic, DemoViewDTO payload) {
CompletableFuture<SendResult<String, DemoViewDTO>> future = kafkaTemplate.send(topic, payload);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.info("Error sending message: {}", ex.getMessage());
} else {
log.info("Message sent successfully: {}", result.getRecordMetadata().offset());
}
});
}
}
메시지 수신
@KafkaListener를 사용해서 특정 Topic에서 메시지를 수신하며, 수신된 메시지를 로깅하고 내부 리스트에 저장한다.
현재 클래스에서는 Kafka Topic에서 메시지를 수신하면 receive 메서드를 호출하게 되고, 메시지의 페이로드를 로그로 출력한 후 payloads 리스트에 저장하도록 했다.
@Component
@Slf4j
@Getter
public class KafkaConsumer {
private CountDownLatch latch = new CountDownLatch(10);
private List<DemoViewDTO> payloads = new ArrayList<>();
private DemoViewDTO payload;
// record를 수신하기 위한 consumer 설정
@KafkaListener(topics = "chanbinme-topic")
public void receive(ConsumerRecord<String, DemoViewDTO> consumerRecord) {
payload = consumerRecord.value();
log.info("Received message: {}", payload);
payloads.add(payload);
latch.countDown();
}
}
- @KafkaListener: Kafka Topic에서 메시지를 수신한다.
- CountDownLatch:
- 다른 스레드들의 작업이 완료될 때까지 대기하도록 동기화해주는 클래스이다.
- 주어진 count로 초기화되며, countDown 메서드 호출로 인해 count가 0이 될 때까지 대기한다.
- 여기서는 테스트 목적으로 메시지 수신을 동기화하기 위해 사용했다. 메시지를 수신할 때마다 countDown 메서드를 호출하도록 했고 10개의 메시지를 수신하면 해제된다.
- payloads: 수신된 메시지를 저장하는 리스트이다.
테스트
@SpringBootTest
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
ports = 9092)
class KafkaConsumerTest {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProducer kafkaProducer;
@Test
void giveEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
String topic = "chanbinme-topic";
DemoViewDTO payload = DemoViewDTO.builder()
.id("id_001")
.name("김찬빈")
.age(32)
.build();
DemoViewDTO payload2 = DemoViewDTO.builder()
.id("id_002")
.name("김찬갱")
.age(30)
.build();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
kafkaProducer.sendMessage(topic, payload);
} else {
kafkaProducer.sendMessage(topic, payload2);
}
}
// 모든 메시지를 수신할 때까지 기다림
kafkaConsumer.getLatch().await(10, TimeUnit.SECONDS);
// 검증 로직 추가
assertEquals(10, kafkaConsumer.getPayloads().size(), "수신된 메시지 개수가 일치하지 않습니다.");
assertTrue(kafkaConsumer.getPayloads().contains(payload), "payload가 수신되지 않았습니다.");
assertTrue(kafkaConsumer.getPayloads().contains(payload2), "payload2가 수신되지 않았습니다.");
}
}
- @EmbeddedKafka: Spring Kafka 테스트 환경에서 실제 Kafka 브로커를 띄우지 않고, 테스트 코드 실행 시 임베디드(내장) Kafka 브로커를 자동으로 구동하기 위해 사용한다.
- partitions = 1: 테스트용 Kafka Topic을 1개의 Partiton으로 생성한다. Partition 수를 조절해서 병렬 처리 테스트도 가능하다.
- brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}: 임베디드 Kafka 브로커가 localhost:9092에서 리스닝하도록 설정한다. 테스트 코드에서 Kafka에 접속할 때 이 주소를 사용하게 된다.
- ports = 9092: 임베디드 Kafka 브로커가 사용할 포트를 지정한다. 테스트 환경에서 포트 충돌을 방지하거나 명확한 포트 사용을 위해 지정할 수 있다.
2025-04-15T11:45:36.594+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.596+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.596+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.596+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.596+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.596+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.597+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.597+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.597+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.597+09:00 INFO 62189 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.696+09:00 INFO 62189 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2025-04-15T11:45:36.705+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 0
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 1
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 2
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 3
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 4
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 5
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 6
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 7
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 8
2025-04-15T11:45:36.706+09:00 INFO 62189 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 9
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T11:45:36.720+09:00 INFO 62189 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
메시지 필터링
Consumer에서 Filter를 걸어 특정 조건에 맞는 데이터만 수신하도록 할 수 있다. DemoViewDTO의 age값이 30이 넘는 데이터는 제외하도록 해보자. 필터링 설정을 추가한 ConcurrentKafkaListenerContainerFactory를 반환하는 메서드를 생성한다.
KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
...
// 수신하는 consumer에서 record 필터링
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO> filteredKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DemoViewDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().age() > 30);
return factory;
}
}
- factory.setRecordFilterStrategy: Spring Kafka에서 Consumer가 메시지를 실제로 처리하기 전에 특정 조건에 따라 메시지를 필터링하는 전략을 설정하는 메서드이다. 위 코드에서는 age값이 30을 넘으면 제외되도록 작성되어 있다.
KafkaConsumer
@Component
@Slf4j
@Getter
public class KafkaConsumer {
private CountDownLatch latch = new CountDownLatch(10);
private List<DemoViewDTO> payloads = new ArrayList<>();
private DemoViewDTO payload;
// record를 수신하기 위한 consumer 설정
@KafkaListener(topics = "chanbinme-topic", containerFactory = "filteredKafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, DemoViewDTO> consumerRecord) {
payload = consumerRecord.value();
log.info("Received message: {}", payload);
payloads.add(payload);
latch.countDown();
}
public void resetLatch() {
latch = new CountDownLatch(1);
}
}
- containerFactory: @KafkaListener 애너테이션에 containerFactory 속성을 추가해서 특정 리스너 컨테이너 팩토리를 지정해야 한다.
테스트
payload의 age는 30을 넘기 때문에 수신되지 않아야 하고 payload2의 객체만 수신되어야 한다.
그렇기 때문에 모든 동작이 끝난 후 payloads에는 payload2 객체가 5개만 들어있고 payload 객체는 포함되지 않아야 한다.
정말 필터링 되었는지 테스트해보자
@SpringBootTest
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
ports = 9092)
class KafkaConsumerTest {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProducer kafkaProducer;
@Test
void giveEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
String topic = "chanbinme-topic";
DemoViewDTO payload = DemoViewDTO.builder()
.id("id_001")
.name("김찬빈")
.age(32)
.build();
DemoViewDTO payload2 = DemoViewDTO.builder()
.id("id_002")
.name("김찬갱")
.age(30)
.build();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
kafkaProducer.sendMessage(topic, payload);
} else {
kafkaProducer.sendMessage(topic, payload2);
}
}
// 모든 메시지를 수신할 때까지 기다림
kafkaConsumer.getLatch().await(10, TimeUnit.SECONDS);
// 검증 로직 추가
assertEquals(10, kafkaConsumer.getPayloads().size(), "수신된 메시지 개수가 일치하지 않습니다.");
assertTrue(kafkaConsumer.getPayloads().contains(payload), "payload가 수신되지 않았습니다.");
assertTrue(kafkaConsumer.getPayloads().contains(payload2), "payload2가 수신되지 않았습니다.");
}
}
2025-04-15T14:32:36.583+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T14:32:36.585+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.585+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T14:32:36.585+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.586+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T14:32:36.586+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.586+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T14:32:36.586+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.586+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬빈, age=32, id=id_001]
2025-04-15T14:32:36.586+09:00 INFO 63772 --- [ Test worker] c.c.p.kafka.KafkaProducer : Sending message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.681+09:00 INFO 63772 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 0
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 1
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 2
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 3
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 4
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 5
2025-04-15T14:32:36.692+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 6
2025-04-15T14:32:36.693+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 7
2025-04-15T14:32:36.693+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 8
2025-04-15T14:32:36.693+09:00 INFO 63772 --- [ad | producer-1] c.c.p.kafka.KafkaProducer : Message sent successfully: 9
2025-04-15T14:32:36.709+09:00 INFO 63772 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.709+09:00 INFO 63772 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.709+09:00 INFO 63772 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.709+09:00 INFO 63772 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
2025-04-15T14:32:36.709+09:00 INFO 63772 --- [ntainer#0-0-C-1] c.c.p.kafka.KafkaConsumer : Received message: DemoViewDTO[name=김찬갱, age=25, id=id_002]
5개의 payload2 객체만 수신되는 것을 log를 통해 확인할 수 있다.
참조
https://www.baeldung.com/spring-kafka
https://www.instaclustr.com/blog/apache-kafka-architecture/
Apache Kafka® Architecture: A Complete Guide [2025]
Apache Kafka is a distributed streaming platform with plenty to offer—from redundant storage of massive data volumes, to a message bus capable of throughput reaching millions of messages each second. This is your complete guide to Apache Kafka Architetur
www.instaclustr.com
https://docs.spring.io/spring-kafka/reference/testing.html#embedded-kafka-annotation
Testing Applications :: Spring Kafka
When using @EmbeddedKafka with @SpringJUnitConfig, it is recommended to use @DirtiesContext on the test class. This is to prevent potential race conditions occurring during the JVM shutdown after running multiple tests in a test suite. For example, without
docs.spring.io