diff --git a/spring-kafka-2/pom.xml b/spring-kafka-2/pom.xml index b2d9d01b8e..8c714a163c 100644 --- a/spring-kafka-2/pom.xml +++ b/spring-kafka-2/pom.xml @@ -26,7 +26,10 @@ org.springframework.kafka spring-kafka - ${spring-kafka.version} + + + org.springframework.kafka + spring-kafka-test org.apache.kafka @@ -46,11 +49,6 @@ org.apache.commons commons-lang3 - - org.springframework.kafka - spring-kafka-test - test - org.testcontainers kafka @@ -70,9 +68,8 @@ - 1.19.3 - 3.1.2 - 3.6.1 + 1.19.7 + 3.7.0 com.baeldung.spring.kafka.dlt.KafkaDltApplication diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/managingkafkaconsumergroups/ManagingConsumerGroupsIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/managingkafkaconsumergroups/ManagingConsumerGroupsIntegrationTest.java index 8fa07e4901..8d806d3809 100644 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/managingkafkaconsumergroups/ManagingConsumerGroupsIntegrationTest.java +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/managingkafkaconsumergroups/ManagingConsumerGroupsIntegrationTest.java @@ -1,5 +1,11 @@ package com.baeldung.spring.kafka.managingkafkaconsumergroups; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Objects; +import java.util.Set; + import org.apache.commons.lang3.RandomUtils; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -8,22 +14,19 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.util.CollectionUtils; - -import java.util.Objects; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.annotation.DirtiesContext.ClassMode; import org.springframework.test.context.ActiveProfiles; -import java.util.Set; +import org.springframework.util.CollectionUtils; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import lombok.extern.slf4j.Slf4j; @SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class) @EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9098", "port=9098"}, topics = {"topic1"}) @DirtiesContext(classMode = ClassMode.BEFORE_CLASS) @ActiveProfiles("managed") -public class ManagingConsumerGroupsIntegrationTest { +@Slf4j +class ManagingConsumerGroupsIntegrationTest { private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1"; private static final int TOTAL_PRODUCED_MESSAGES = 5000; @@ -39,11 +42,11 @@ public class ManagingConsumerGroupsIntegrationTest { MessageConsumerService consumerService; @Test - public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException { + void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException { int currentMessage = 0; - do { kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0)); + Thread.sleep(0,100); // Waiting to let the embedded kafka consume the event. currentMessage++; if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) { @@ -53,12 +56,13 @@ public class ManagingConsumerGroupsIntegrationTest { .findFirst() .orElse(""); MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId); - Thread.sleep(2000); Objects.requireNonNull(container).stop(); kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId); } + if(currentMessage % 1000 == 0){ + log.info("Processed {} of {}", currentMessage, TOTAL_PRODUCED_MESSAGES); + } } while (currentMessage != TOTAL_PRODUCED_MESSAGES); - Thread.sleep(2000); Set partitionsConsumedBy1 = consumerService.consumedPartitions.get("consumer-1"); Set partitionsConsumedBy0 = consumerService.consumedPartitions.get("consumer-0");