From c3b97034ed99705a982490d476a90bd739696291 Mon Sep 17 00:00:00 2001 From: SGWebFreelancer Date: Sun, 24 Dec 2023 09:43:23 +0800 Subject: [PATCH 1/4] First version --- .../CustomPartitioner.java | 31 ++++ .../KafkaApplication.java | 49 ++++++ .../KafkaMessageConsumer.java | 30 ++++ .../partitioningstrategy/KafkaProducer.java | 22 +++ .../partitioningstrategy/ReceivedMessage.java | 30 ++++ .../KafkaApplicationUnitTesting.java | 164 ++++++++++++++++++ 6 files changed, 326 insertions(+) create mode 100644 spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java create mode 100644 spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java create mode 100644 spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java create mode 100644 spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java create mode 100644 spring-kafka/src/main/java/com/baeldung/partitioningstrategy/ReceivedMessage.java create mode 100644 spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java new file mode 100644 index 0000000000..63db2e1b2d --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java @@ -0,0 +1,31 @@ +package com.baeldung.partitioningstrategy; + +import java.util.Map; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +public class CustomPartitioner implements Partitioner { + private static final int PREMIUM_PARTITION = 0; + private static final int NORMAL_PARTITION = 1; + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + String customerType = extractCustomerType(key.toString()); + return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION; + } + + private String extractCustomerType(String key) { + String[] parts = key.split("_"); + return parts.length > 1 ? parts[1] : "normal"; + } + + @Override + public void configure(Map configs) { + + } + + @Override + public void close() { + + } +} diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java new file mode 100644 index 0000000000..ddde9ede3a --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java @@ -0,0 +1,49 @@ +package com.baeldung.partitioningstrategy; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@SpringBootApplication +public class KafkaApplication { + + @Bean + public KafkaTemplate kafkaTemplate() + { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() + { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaConsumer kafkaConsumer() { + Map configProps = new HashMap<>(); + configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // Set a unique group ID + return new KafkaConsumer<>(configProps); + } + + public static void main(String[] args) { + SpringApplication.run(KafkaApplication.class, args); + } +} diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java new file mode 100644 index 0000000000..308f34be56 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java @@ -0,0 +1,30 @@ +package com.baeldung.partitioningstrategy; + +import java.util.ArrayList; +import java.util.List; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; +import org.springframework.kafka.support.KafkaHeaders; +import jakarta.annotation.Nullable; + +@Service +public class KafkaMessageConsumer { + private final List receivedMessages = new ArrayList<>(); + + @KafkaListener(topics = {"order-topic", "default-topic"}, groupId = "test-group") + public void listen(@Payload String message, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, + @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) { + ReceivedMessage receivedMessage = new ReceivedMessage(key, message, partition); + System.out.println("Received message: " + receivedMessage); + receivedMessages.add(receivedMessage); + } + + public List getReceivedMessages() { + return receivedMessages; + } + + public void clearReceivedMessages() { receivedMessages.clear(); } +} diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java new file mode 100644 index 0000000000..380292c5b8 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java @@ -0,0 +1,22 @@ +package com.baeldung.partitioningstrategy; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void send(String topic, String key, String message) { + kafkaTemplate.send(topic, key, message); + } + + public void send(String topic, String message) { + kafkaTemplate.send(topic, message); + } +} \ No newline at end of file diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/ReceivedMessage.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/ReceivedMessage.java new file mode 100644 index 0000000000..a262f62e39 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/ReceivedMessage.java @@ -0,0 +1,30 @@ +package com.baeldung.partitioningstrategy; + +public class ReceivedMessage { + private final String key; + private final String message; + private final int partition; + + public ReceivedMessage(String key, String message, int partition) { + this.key = key; + this.message = message; + this.partition = partition; + } + + @Override + public String toString() { + return "Key: " + key + " - Message: " + message + " - Partition: " + partition; + } + + public String getKey() { + return key; + } + + public String getMessage() { + return message; + } + + public int getPartition() { + return partition; + } +} diff --git a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java new file mode 100644 index 0000000000..1ae9670deb --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java @@ -0,0 +1,164 @@ +package com.baeldung.partitioningstrategy; + +import static org.junit.Assert.assertEquals; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +@SpringBootTest +@EmbeddedKafka(partitions = 3, brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}) +public class KafkaApplicationUnitTesting { + @Autowired + private KafkaProducer kafkaProducer; + + @Autowired + private KafkaMessageConsumer kafkaMessageConsumer; + + @Autowired + private EmbeddedKafkaBroker embeddedKafkaBroker; + + @Autowired + private Consumer consumer; + + @BeforeEach + public void clearMessages() { + kafkaMessageConsumer.clearReceivedMessages(); + } + + @Test + public void givenDefaultPartitioner_whenSendingMessagesWithoutKey_shouldUseStickyDistribution() throws InterruptedException { + kafkaProducer.send("default-topic", "message1"); + kafkaProducer.send("default-topic","message2"); + kafkaProducer.send("default-topic","message3"); + + Thread.sleep(2000); + + List records = kafkaMessageConsumer.getReceivedMessages(); + int expectedPartition = records.get(0).getPartition(); + + for (ReceivedMessage record : records) { + if (record.getKey() == null) { + Assert.assertEquals("Message without key should be in the same partition", expectedPartition, record.getPartition()); + } + } + } + + @Test + void givenProducerWithSameKeyMessages_whenSendingMessages_shouldRouteToSamePartition() throws InterruptedException { + kafkaProducer.send("order-topic", "partitionA", "critical data"); + kafkaProducer.send("order-topic", "partitionA", "more critical data"); + kafkaProducer.send("order-topic", "partitionB", "another critical message"); + kafkaProducer.send("order-topic", "partitionA", "another more critical data"); + + Thread.sleep(1000); + + List records = kafkaMessageConsumer.getReceivedMessages(); + Map> messagesByKey = groupMessagesByKey(records); + + messagesByKey.forEach((key, messages) -> { + int expectedPartition = messages.get(0).getPartition(); + for (ReceivedMessage message : messages) { + assertEquals("Messages with key '" + key + "' should be in the same partition", + message.getPartition(), + expectedPartition); + } + }); + } + + @Test + public void givenProducerWithSameKeyMessages_whenSendingMessages_shouldReceiveInProducedOrder() throws InterruptedException { + kafkaProducer.send("order-topic", "partitionA", "message1"); + kafkaProducer.send("order-topic", "partitionA", "message3"); + kafkaProducer.send("order-topic", "partitionA", "message4"); + + Thread.sleep(1000); + + List records = kafkaMessageConsumer.getReceivedMessages(); + + StringBuilder resultMessage = new StringBuilder(); + records.forEach(record -> resultMessage.append(record.getMessage())); + String expectedMessage = "message1message3message4"; + + assertEquals( + "Messages with the same key should be received in the order they were produced within a partition", + expectedMessage, + resultMessage.toString() + ); + } + + @Test + public void givenCustomPartitioner_whenSendingMessages_shouldRouteToCorrectPartition() throws InterruptedException { + // Configure the producer with the custom partitioner + KafkaTemplate kafkaTemplate = setProducerToUseCustomPartitioner(); + + kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message"); + kafkaTemplate.send("order-topic", "456_normal", "Normal order message"); + + Thread.sleep(1000); + + List records = kafkaMessageConsumer.getReceivedMessages(); + + // Validate that messages are routed to the correct partition based on customer type + for (ReceivedMessage record : records) { + if ("123_premium".equals(record.getKey())) { + assertEquals("Premium order message should be in partition 0", 0, record.getPartition()); + } else if ("456_normal".equals(record.getKey())) { + assertEquals("Normal order message should be in partition 1", 1, record.getPartition()); + } + } + } + + @Test + public void givenCustomPartitioner_whenSendingMessages_shouldConsumeOnlyFromSpecificPartition() throws InterruptedException { + KafkaTemplate kafkaTemplate = setProducerToUseCustomPartitioner(); + + kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message"); + kafkaTemplate.send("order-topic", "456_normal", "Normal order message"); + + Thread.sleep(1000); + + consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0))); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + + for (ConsumerRecord record : records) { + assertEquals("Premium order message should be in partition 0", 0, record.partition()); + assertEquals("123_premium", record.key()); + } + + consumer.close(); + } + + private KafkaTemplate setProducerToUseCustomPartitioner() { + Map producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); + DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerProps); + + return new KafkaTemplate<>(producerFactory); + } + + private Map> groupMessagesByKey(List messages) { + return messages.stream() + .filter(message -> message.getKey() != null) + .collect(Collectors.groupingBy(ReceivedMessage::getKey)); + } +} From d10d29e0dc598d8f8dae47c2d59141785faeabac Mon Sep 17 00:00:00 2001 From: SGWebFreelancer Date: Mon, 25 Dec 2023 11:25:53 +0800 Subject: [PATCH 2/4] Fix comment --- .../CustomPartitioner.java | 1 + .../KafkaApplication.java | 7 ++- .../KafkaMessageConsumer.java | 17 ++++--- .../partitioningstrategy/KafkaProducer.java | 2 +- ...> KafkaApplicationIntegrationTesting.java} | 51 +++++++++++-------- 5 files changed, 44 insertions(+), 34 deletions(-) rename spring-kafka/src/test/java/com/baeldung/partitioningstrategy/{KafkaApplicationUnitTesting.java => KafkaApplicationIntegrationTesting.java} (81%) diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java index 63db2e1b2d..f02a91f6fc 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/CustomPartitioner.java @@ -1,6 +1,7 @@ package com.baeldung.partitioningstrategy; import java.util.Map; + import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java index ddde9ede3a..57ecc5e187 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaApplication.java @@ -2,6 +2,7 @@ package com.baeldung.partitioningstrategy; import java.util.HashMap; import java.util.Map; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -18,14 +19,12 @@ import org.springframework.kafka.core.ProducerFactory; public class KafkaApplication { @Bean - public KafkaTemplate kafkaTemplate() - { + public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean - public ProducerFactory producerFactory() - { + public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java index 308f34be56..131a112731 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java @@ -1,24 +1,23 @@ package com.baeldung.partitioningstrategy; -import java.util.ArrayList; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.List; + import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import org.springframework.kafka.support.KafkaHeaders; + import jakarta.annotation.Nullable; @Service public class KafkaMessageConsumer { - private final List receivedMessages = new ArrayList<>(); + private final List receivedMessages = new CopyOnWriteArrayList<>(); - @KafkaListener(topics = {"order-topic", "default-topic"}, groupId = "test-group") - public void listen(@Payload String message, - @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, - @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) { + @KafkaListener(topics = { "order-topic", "default-topic" }, groupId = "test-group") + public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) { ReceivedMessage receivedMessage = new ReceivedMessage(key, message, partition); - System.out.println("Received message: " + receivedMessage); receivedMessages.add(receivedMessage); } @@ -26,5 +25,7 @@ public class KafkaMessageConsumer { return receivedMessages; } - public void clearReceivedMessages() { receivedMessages.clear(); } + public void clearReceivedMessages() { + receivedMessages.clear(); + } } diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java index 380292c5b8..c28f855976 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java @@ -16,7 +16,7 @@ public class KafkaProducer { kafkaTemplate.send(topic, key, message); } - public void send(String topic, String message) { + public void send(String topic, String message) { kafkaTemplate.send(topic, message); } } \ No newline at end of file diff --git a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java similarity index 81% rename from spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java rename to spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java index 1ae9670deb..1e44ae1714 100644 --- a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationUnitTesting.java +++ b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java @@ -1,11 +1,13 @@ package com.baeldung.partitioningstrategy; import static org.junit.Assert.assertEquals; + import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -23,9 +25,12 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; +import static org.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.SECONDS; + @SpringBootTest -@EmbeddedKafka(partitions = 3, brokerProperties = {"listeners=PLAINTEXT://localhost:9092"}) -public class KafkaApplicationUnitTesting { +@EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }) +public class KafkaApplicationIntegrationTesting { @Autowired private KafkaProducer kafkaProducer; @@ -46,13 +51,16 @@ public class KafkaApplicationUnitTesting { @Test public void givenDefaultPartitioner_whenSendingMessagesWithoutKey_shouldUseStickyDistribution() throws InterruptedException { kafkaProducer.send("default-topic", "message1"); - kafkaProducer.send("default-topic","message2"); - kafkaProducer.send("default-topic","message3"); + kafkaProducer.send("default-topic", "message2"); + kafkaProducer.send("default-topic", "message3"); - Thread.sleep(2000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 3); List records = kafkaMessageConsumer.getReceivedMessages(); - int expectedPartition = records.get(0).getPartition(); + int expectedPartition = records.get(0) + .getPartition(); for (ReceivedMessage record : records) { if (record.getKey() == null) { @@ -68,17 +76,18 @@ public class KafkaApplicationUnitTesting { kafkaProducer.send("order-topic", "partitionB", "another critical message"); kafkaProducer.send("order-topic", "partitionA", "another more critical data"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 4); List records = kafkaMessageConsumer.getReceivedMessages(); Map> messagesByKey = groupMessagesByKey(records); messagesByKey.forEach((key, messages) -> { - int expectedPartition = messages.get(0).getPartition(); + int expectedPartition = messages.get(0) + .getPartition(); for (ReceivedMessage message : messages) { - assertEquals("Messages with key '" + key + "' should be in the same partition", - message.getPartition(), - expectedPartition); + assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition); } }); } @@ -89,7 +98,9 @@ public class KafkaApplicationUnitTesting { kafkaProducer.send("order-topic", "partitionA", "message3"); kafkaProducer.send("order-topic", "partitionA", "message4"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 3); List records = kafkaMessageConsumer.getReceivedMessages(); @@ -97,11 +108,7 @@ public class KafkaApplicationUnitTesting { records.forEach(record -> resultMessage.append(record.getMessage())); String expectedMessage = "message1message3message4"; - assertEquals( - "Messages with the same key should be received in the order they were produced within a partition", - expectedMessage, - resultMessage.toString() - ); + assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage, resultMessage.toString()); } @Test @@ -112,7 +119,9 @@ public class KafkaApplicationUnitTesting { kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message"); kafkaTemplate.send("order-topic", "456_normal", "Normal order message"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 2); List records = kafkaMessageConsumer.getReceivedMessages(); @@ -133,7 +142,9 @@ public class KafkaApplicationUnitTesting { kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message"); kafkaTemplate.send("order-topic", "456_normal", "Normal order message"); - Thread.sleep(1000); + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 2); consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0))); ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); @@ -142,8 +153,6 @@ public class KafkaApplicationUnitTesting { assertEquals("Premium order message should be in partition 0", 0, record.partition()); assertEquals("123_premium", record.key()); } - - consumer.close(); } private KafkaTemplate setProducerToUseCustomPartitioner() { From 4a698374066bc6dae7c7f9c0149b6f01cc16e6bd Mon Sep 17 00:00:00 2001 From: SGWebFreelancer Date: Mon, 25 Dec 2023 13:33:22 +0800 Subject: [PATCH 3/4] Fix formatting and added in direct assignment partition --- .../KafkaMessageConsumer.java | 1 + .../partitioningstrategy/KafkaProducer.java | 4 +++ .../KafkaApplicationIntegrationTesting.java | 34 +++++++++++++++---- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java index 131a112731..a500e04737 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaMessageConsumer.java @@ -18,6 +18,7 @@ public class KafkaMessageConsumer { @KafkaListener(topics = { "order-topic", "default-topic" }, groupId = "test-group") public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.RECEIVED_KEY) @Nullable String key) { ReceivedMessage receivedMessage = new ReceivedMessage(key, message, partition); + System.out.println("Received message: " + receivedMessage); receivedMessages.add(receivedMessage); } diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java index c28f855976..acca373a6e 100644 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java +++ b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java @@ -16,6 +16,10 @@ public class KafkaProducer { kafkaTemplate.send(topic, key, message); } + public void send(String topic, Integer partition, String key, String message) { + kafkaTemplate.send(topic, partition, key, message); + } + public void send(String topic, String message) { kafkaTemplate.send(topic, message); } diff --git a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java index 1e44ae1714..55c6c4e4c0 100644 --- a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java +++ b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java @@ -6,12 +6,14 @@ import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Assert; @@ -59,14 +61,12 @@ public class KafkaApplicationIntegrationTesting { .size() >= 3); List records = kafkaMessageConsumer.getReceivedMessages(); - int expectedPartition = records.get(0) - .getPartition(); - for (ReceivedMessage record : records) { - if (record.getKey() == null) { - Assert.assertEquals("Message without key should be in the same partition", expectedPartition, record.getPartition()); - } - } + Set uniquePartitions = records.stream() + .map(ReceivedMessage::getPartition) + .collect(Collectors.toSet()); + + Assert.assertEquals(1, uniquePartitions.size()); } @Test @@ -135,6 +135,26 @@ public class KafkaApplicationIntegrationTesting { } } + @Test + public void givenDirectPartitionAssignment_whenSendingMessages_shouldRouteToSpecifiedPartitions() throws Exception { + kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message"); + kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message"); + + await().atMost(2, SECONDS) + .until(() -> kafkaMessageConsumer.getReceivedMessages() + .size() >= 2); + + List records = kafkaMessageConsumer.getReceivedMessages(); + + for (ReceivedMessage record : records) { + if ("123_premium".equals(record.getKey())) { + assertEquals("Premium order message should be in partition 0", 0, record.getPartition()); + } else if ("456_normal".equals(record.getKey())) { + assertEquals("Normal order message should be in partition 1", 1, record.getPartition()); + } + } + } + @Test public void givenCustomPartitioner_whenSendingMessages_shouldConsumeOnlyFromSpecificPartition() throws InterruptedException { KafkaTemplate kafkaTemplate = setProducerToUseCustomPartitioner(); From f58b136ff9e8f2c27566cf72ad2e61556e3dd9e3 Mon Sep 17 00:00:00 2001 From: SGWebFreelancer Date: Wed, 27 Dec 2023 09:29:52 +0800 Subject: [PATCH 4/4] Remove the Kafka Producer wrapper, fix the class name --- .../partitioningstrategy/KafkaProducer.java | 26 -------------- ...a => KafkaApplicationIntegrationTest.java} | 36 +++++++++---------- 2 files changed, 18 insertions(+), 44 deletions(-) delete mode 100644 spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java rename spring-kafka/src/test/java/com/baeldung/partitioningstrategy/{KafkaApplicationIntegrationTesting.java => KafkaApplicationIntegrationTest.java} (88%) diff --git a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java deleted file mode 100644 index acca373a6e..0000000000 --- a/spring-kafka/src/main/java/com/baeldung/partitioningstrategy/KafkaProducer.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.baeldung.partitioningstrategy; - -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Service; - -@Service -public class KafkaProducer { - - private final KafkaTemplate kafkaTemplate; - - public KafkaProducer(KafkaTemplate kafkaTemplate) { - this.kafkaTemplate = kafkaTemplate; - } - - public void send(String topic, String key, String message) { - kafkaTemplate.send(topic, key, message); - } - - public void send(String topic, Integer partition, String key, String message) { - kafkaTemplate.send(topic, partition, key, message); - } - - public void send(String topic, String message) { - kafkaTemplate.send(topic, message); - } -} \ No newline at end of file diff --git a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTest.java similarity index 88% rename from spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java rename to spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTest.java index 55c6c4e4c0..2f2cccbb12 100644 --- a/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTesting.java +++ b/spring-kafka/src/test/java/com/baeldung/partitioningstrategy/KafkaApplicationIntegrationTest.java @@ -13,10 +13,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -32,9 +30,10 @@ import static java.util.concurrent.TimeUnit.SECONDS; @SpringBootTest @EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }) -public class KafkaApplicationIntegrationTesting { +public class KafkaApplicationIntegrationTest { + @Autowired - private KafkaProducer kafkaProducer; + private KafkaTemplate kafkaTemplate; @Autowired private KafkaMessageConsumer kafkaMessageConsumer; @@ -52,9 +51,9 @@ public class KafkaApplicationIntegrationTesting { @Test public void givenDefaultPartitioner_whenSendingMessagesWithoutKey_shouldUseStickyDistribution() throws InterruptedException { - kafkaProducer.send("default-topic", "message1"); - kafkaProducer.send("default-topic", "message2"); - kafkaProducer.send("default-topic", "message3"); + kafkaTemplate.send("default-topic", "message1"); + kafkaTemplate.send("default-topic", "message2"); + kafkaTemplate.send("default-topic", "message3"); await().atMost(2, SECONDS) .until(() -> kafkaMessageConsumer.getReceivedMessages() @@ -66,15 +65,15 @@ public class KafkaApplicationIntegrationTesting { .map(ReceivedMessage::getPartition) .collect(Collectors.toSet()); - Assert.assertEquals(1, uniquePartitions.size()); + assertEquals(1, uniquePartitions.size()); } @Test void givenProducerWithSameKeyMessages_whenSendingMessages_shouldRouteToSamePartition() throws InterruptedException { - kafkaProducer.send("order-topic", "partitionA", "critical data"); - kafkaProducer.send("order-topic", "partitionA", "more critical data"); - kafkaProducer.send("order-topic", "partitionB", "another critical message"); - kafkaProducer.send("order-topic", "partitionA", "another more critical data"); + kafkaTemplate.send("order-topic", "partitionA", "critical data"); + kafkaTemplate.send("order-topic", "partitionA", "more critical data"); + kafkaTemplate.send("order-topic", "partitionB", "another critical message"); + kafkaTemplate.send("order-topic", "partitionA", "another more critical data"); await().atMost(2, SECONDS) .until(() -> kafkaMessageConsumer.getReceivedMessages() @@ -94,9 +93,9 @@ public class KafkaApplicationIntegrationTesting { @Test public void givenProducerWithSameKeyMessages_whenSendingMessages_shouldReceiveInProducedOrder() throws InterruptedException { - kafkaProducer.send("order-topic", "partitionA", "message1"); - kafkaProducer.send("order-topic", "partitionA", "message3"); - kafkaProducer.send("order-topic", "partitionA", "message4"); + kafkaTemplate.send("order-topic", "partitionA", "message1"); + kafkaTemplate.send("order-topic", "partitionA", "message3"); + kafkaTemplate.send("order-topic", "partitionA", "message4"); await().atMost(2, SECONDS) .until(() -> kafkaMessageConsumer.getReceivedMessages() @@ -108,7 +107,8 @@ public class KafkaApplicationIntegrationTesting { records.forEach(record -> resultMessage.append(record.getMessage())); String expectedMessage = "message1message3message4"; - assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage, resultMessage.toString()); + assertEquals("Messages with the same key should be received in the order they were produced within a partition", expectedMessage, + resultMessage.toString()); } @Test @@ -137,8 +137,8 @@ public class KafkaApplicationIntegrationTesting { @Test public void givenDirectPartitionAssignment_whenSendingMessages_shouldRouteToSpecifiedPartitions() throws Exception { - kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message"); - kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message"); + kafkaTemplate.send("order-topic", 0, "123_premium", "Premium order message"); + kafkaTemplate.send("order-topic", 1, "456_normal", "Normal order message"); await().atMost(2, SECONDS) .until(() -> kafkaMessageConsumer.getReceivedMessages()