diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/MessageWithKey.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/MessageWithKey.java new file mode 100644 index 0000000000..b03c1e1adc --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/MessageWithKey.java @@ -0,0 +1,106 @@ +package com.baeldung.kafka.message; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageWithKey { + + private static Logger logger = LoggerFactory.getLogger(MessageWithKey.class); + + private static String TOPIC = "baeldung"; + private static int PARTITIONS = 5; + private static short REPLICATION_FACTOR = 1; + + private static String MESSAGE_KEY = "message-key"; + + private static Admin admin; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + public static void main(String[] args) throws ExecutionException, InterruptedException { + setup(); + + publishMessagesWithoutKey(); + + consumeMessages(); + + publishMessagesWithKey(); + + consumeMessages(); + } + + private static void consumeMessages() { + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + for (ConsumerRecord record : records) { + logger.info("Key : {}, Value : {}", record.key(), record.value()); + } + } + + private static void publishMessagesWithKey() throws ExecutionException, InterruptedException { + for (int i = 1; i <= 10; i++) { + ProducerRecord record = new ProducerRecord<>(TOPIC, MESSAGE_KEY, String.valueOf(i)); + Future future = producer.send(record); + RecordMetadata metadata = future.get(); + + logger.info(String.valueOf(metadata.partition())); + } + } + + private static void publishMessagesWithoutKey() throws ExecutionException, InterruptedException { + for (int i = 1; i <= 10; i++) { + ProducerRecord record = new ProducerRecord<>(TOPIC, String.valueOf(i)); + Future future = producer.send(record); + RecordMetadata metadata = future.get(); + + logger.info(String.valueOf(metadata.partition())); + } + } + + private static void setup() { + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID() + .toString()); + + admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + + admin.createTopics(Collections.singleton(new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR))); + } + +} \ No newline at end of file diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/message/MessageWithKeyLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/MessageWithKeyLiveTest.java new file mode 100644 index 0000000000..093dc629cb --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/message/MessageWithKeyLiveTest.java @@ -0,0 +1,130 @@ +package com.baeldung.kafka.message; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +// This live test needs a Docker Daemon running so that a kafka container can be created + +@Testcontainers +public class MessageWithKeyLiveTest { + + private static String TOPIC = "baeldung"; + private static int PARTITIONS = 5; + private static short REPLICATION_FACTOR = 1; + + private static String MESSAGE_KEY = "message-key"; + private static String MESSAGE_VALUE = "Hello World"; + + private static Admin admin; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID() + .toString()); + + admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + + admin.createTopics(Collections.singleton(new NewTopic(TOPIC, PARTITIONS, REPLICATION_FACTOR))); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenAMessageWithKey_whenPublishedToKafkaAndConsumed_thenCheckForKey() throws ExecutionException, InterruptedException { + + ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, MESSAGE_KEY, MESSAGE_VALUE); + Future future = producer.send(producerRecord); + + RecordMetadata metadata = future.get(); + + assertNotNull(metadata); + + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + for (ConsumerRecord consumerRecord : records) { + assertEquals(MESSAGE_KEY, consumerRecord.key()); + assertEquals(MESSAGE_VALUE, consumerRecord.value()); + } + } + + @Test + void givenAListOfMessageWithKeys_whenPublishedToKafka_thenCheckedIfPublishedToSamePartition() throws ExecutionException, InterruptedException { + + boolean isSamePartition = true; + int partition = 0; + + for (int i = 1; i <= 10; i++) { + ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, MESSAGE_KEY, MESSAGE_VALUE); + Future future = producer.send(producerRecord); + + RecordMetadata metadata = future.get(); + + assertNotNull(metadata); + if (i == 1) { + partition = metadata.partition(); + } else { + if (partition != metadata.partition()) { + isSamePartition = false; + } + } + } + + assertTrue(isSamePartition); + } +}