BAEL3889 - Kafka Mock Producer
This commit is contained in:
@@ -0,0 +1,16 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
|
||||
public class EvenOddPartitioner extends DefaultPartitioner {
|
||||
|
||||
@Override
|
||||
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
|
||||
|
||||
if (((String)key).length() % 2 == 0)
|
||||
return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class KafkaProducer {
|
||||
|
||||
private final Producer<String, String> producer;
|
||||
|
||||
public KafkaProducer(Producer<String, String> producer) {
|
||||
this.producer = producer;
|
||||
}
|
||||
|
||||
public Future<RecordMetadata> send(String key, String value) {
|
||||
ProducerRecord record = new ProducerRecord("topic_sports_news",
|
||||
key, value);
|
||||
return producer.send(record);
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
producer.flush();
|
||||
}
|
||||
|
||||
public void beginTransaction() {
|
||||
producer.beginTransaction();
|
||||
}
|
||||
|
||||
public void initTransaction() {
|
||||
producer.initTransactions();
|
||||
}
|
||||
|
||||
public void commitTransaction() {
|
||||
producer.commitTransaction();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user