[BAEL-3890] Using Kafka MockConsumer

This commit is contained in:
Bogdan Feraru
2020-05-10 22:30:54 +03:00
parent 1fe51ed7c0
commit bddea5222b
4 changed files with 194 additions and 0 deletions
@@ -0,0 +1,28 @@
package com.baeldung.kafka.consumer;
class CountryPopulation {
private String country;
private Integer population;
public CountryPopulation(String country, Integer population) {
this.country = country;
this.population = population;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Integer getPopulation() {
return population;
}
public void setPopulation(Integer population) {
this.population = population;
}
}
@@ -0,0 +1,60 @@
package com.baeldung.kafka.consumer;
import java.time.Duration;
import java.util.Collections;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CountryPopulationConsumer {
private static Logger logger = LoggerFactory.getLogger(CountryPopulationConsumer.class);
private Consumer<String, Integer> consumer;
private java.util.function.Consumer<Throwable> exceptionConsumer;
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
public CountryPopulationConsumer(
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
this.consumer = consumer;
this.exceptionConsumer = exceptionConsumer;
this.countryPopulationConsumer = countryPopulationConsumer;
}
void startBySubscribing(String topic) {
consume(() -> consumer.subscribe(Collections.singleton(topic)));
}
void startByAssigning(String topic, int partition) {
consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
}
private void consume(Runnable beforePollingTask) {
try {
beforePollingTask.run();
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
StreamSupport.stream(records.spliterator(), false)
.map(record -> new CountryPopulation(record.key(), record.value()))
.forEach(countryPopulationConsumer);
consumer.commitSync();
}
} catch (WakeupException e) {
logger.info("Shutting down...");
} catch (RuntimeException ex) {
exceptionConsumer.accept(ex);
} finally {
consumer.close();
}
}
public void stop() {
consumer.wakeup();
}
}