BAEL-5680: Guide to Check if Apache Kafka Server is Running (#12600)
Co-authored-by: Tapan Avasthi <tavasthi@Tapans-MacBook-Air.local>
This commit is contained in:
@@ -0,0 +1,35 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.common.Node;
|
||||
|
||||
public class KafkaAdminClient {
|
||||
private final AdminClient client;
|
||||
|
||||
public KafkaAdminClient(String bootstrap) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", bootstrap);
|
||||
props.put("request.timeout.ms", 3000);
|
||||
props.put("connections.max.idle.ms", 5000);
|
||||
|
||||
this.client = AdminClient.create(props);
|
||||
}
|
||||
|
||||
public boolean verifyConnection() throws ExecutionException, InterruptedException {
|
||||
Collection<Node> nodes = this.client.describeCluster()
|
||||
.nodes()
|
||||
.get();
|
||||
return nodes != null && nodes.size() > 0;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||
String defaultBootStrapServer = "localhost:9092";
|
||||
KafkaAdminClient kafkaAdminClient = new KafkaAdminClient(defaultBootStrapServer);
|
||||
System.out.println(kafkaAdminClient.verifyConnection());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
// This live test needs a running Docker instance so that a kafka container can be created
|
||||
|
||||
@Testcontainers
|
||||
class KafkaConnectionLiveTest {
|
||||
|
||||
@Container
|
||||
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
|
||||
private KafkaAdminClient kafkaAdminClient;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
KAFKA_CONTAINER.addExposedPort(9092);
|
||||
this.kafkaAdminClient = new KafkaAdminClient(KAFKA_CONTAINER.getBootstrapServers());
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void destroy() {
|
||||
KAFKA_CONTAINER.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
|
||||
boolean alive = kafkaAdminClient.verifyConnection();
|
||||
assertThat(alive).isTrue();
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user