From 3b5e91b0dd38bce9708a6cebf272af2d9d205db4 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 19:34:06 -0500 Subject: [PATCH] BAEL-7374 - Code clean Up --- .../StartStopConsumerUnitTest.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java index fb557ec2b9..20a78f0251 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java @@ -1,16 +1,15 @@ package com.baeldung.spring.kafka.startstopconsumer; import com.baeldung.spring.kafka.start.stop.consumer.*; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,8 +21,10 @@ import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import org.testcontainers.utility.DockerImageName; + +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofSeconds; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -55,21 +56,14 @@ public class StartStopConsumerUnitTest { } @BeforeAll - static void setup() throws ExecutionException, InterruptedException { - KAFKA_CONTAINER.addExposedPort(9092); - - Properties adminProperties = new Properties(); - adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - + static void beforeAll() { Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); - Admin admin = Admin.create(adminProperties); producer = new KafkaProducer<>(producerProperties); - admin.createTopics(ImmutableList.of(new NewTopic(Constants.MULTI_PARTITION_TOPIC, Constants.MULTIPLE_PARTITIONS, Constants.REPLICATION_FACTOR))) - .all() - .get(); + Awaitility.setDefaultTimeout(ofSeconds(5)); + Awaitility.setDefaultPollInterval(ofMillis(50)); } @AfterAll @@ -77,6 +71,11 @@ public class StartStopConsumerUnitTest { KAFKA_CONTAINER.stop(); } + @BeforeEach + void beforeEach() { + this.userEventStore.clearUserEvents(); + } + @Test void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException { kafkaListenerControlService.startListener(Constants.LISTENER_ID);