From 8f1de1c3b702a07307e172b8745ece9c116f5f05 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 24 May 2020 13:58:50 +0530 Subject: [PATCH] BAEL3889 - Kafka Mock Producer --- .../baeldung/kafka/EvenOddPartitioner.java | 16 +++ .../com/baeldung/kafka/KafkaProducer.java | 40 ++++++ .../baeldung/kafka/KafkaProducerUnitTest.java | 114 ++++++++++++++++++ 3 files changed, 170 insertions(+) create mode 100644 libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java create mode 100644 libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java create mode 100644 libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java diff --git a/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java b/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java new file mode 100644 index 0000000000..12c86828dd --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java @@ -0,0 +1,16 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.Cluster; + +public class EvenOddPartitioner extends DefaultPartitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + + if (((String)key).length() % 2 == 0) + return 0; + + return 1; + } +} diff --git a/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java b/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java new file mode 100644 index 0000000000..8574cd1c40 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java @@ -0,0 +1,40 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.concurrent.Future; + +public class KafkaProducer { + + private final Producer producer; + + public KafkaProducer(Producer producer) { + this.producer = producer; + } + + public Future send(String key, String value) { + ProducerRecord record = new ProducerRecord("topic_sports_news", + key, value); + return producer.send(record); + } + + public void flush() { + producer.flush(); + } + + public void beginTransaction() { + producer.beginTransaction(); + } + + public void initTransaction() { + producer.initTransactions(); + } + + public void commitTransaction() { + producer.commitTransaction(); + } + + +} diff --git a/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java b/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java new file mode 100644 index 0000000000..ce3803322c --- /dev/null +++ b/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java @@ -0,0 +1,114 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static java.util.Collections.emptySet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class KafkaProducerUnitTest { + + private final String TOPIC_NAME = "topic_sports_news"; + + private KafkaProducer kafkaProducer; + private MockProducer mockProducer; + + private void buildMockProducer(boolean autoComplete) { + this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer()); + } + + @Test + void givenKeyValue_whenSend_thenVerifyHistory() throws ExecutionException, InterruptedException { + + buildMockProducer(true); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future recordMetadataFuture = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}"); + + //then + assertTrue(mockProducer.history().size() == 1); + assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data")); + assertTrue(recordMetadataFuture.get().partition() == 0); + + } + + @Test + void givenKeyValue_whenSend_thenSendOnlyAfterFlush() { + + buildMockProducer(false); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}"); + assertFalse(record.isDone()); + + //then + kafkaProducer.flush(); + assertTrue(record.isDone()); + } + + @Test + void givenKeyValue_whenSend_thenReturnException() { + + buildMockProducer(false); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}"); + RuntimeException e = new RuntimeException(); + mockProducer.errorNext(e); + //then + try { + record.get(); + } catch (ExecutionException | InterruptedException ex) { + assertEquals(e, ex.getCause()); + } + assertTrue(record.isDone()); + } + + @Test + void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() { + + buildMockProducer(true); + //when + kafkaProducer = new KafkaProducer(mockProducer); + kafkaProducer.initTransaction(); + kafkaProducer.beginTransaction(); + Future record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}"); + + //then + assertTrue(mockProducer.history().isEmpty()); + kafkaProducer.commitTransaction(); + assertTrue(mockProducer.history().size() == 1); + } + + @Test + void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException { + + PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null); + PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null); + List list = new ArrayList<>(); + list.add(partitionInfo0); + list.add(partitionInfo1); + Cluster cluster = new Cluster("kafkab", new ArrayList(), list, emptySet(), emptySet()); + this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer()); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}"); + + //then + assertTrue(recordMetadataFuture.get().partition() == 1); + + } + +} \ No newline at end of file