diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/HandleInstanceAlreadyExistsException.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/HandleInstanceAlreadyExistsException.java new file mode 100644 index 0000000000..1de39987b8 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/HandleInstanceAlreadyExistsException.java @@ -0,0 +1,56 @@ +package com.baeldung.spring.kafka.kafkaexception; + +import java.lang.management.ManagementFactory; +import java.util.Properties; +import java.util.UUID; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringSerializer; + +public class HandleInstanceAlreadyExistsException { + + public static void generateUniqueClientIDUsingUUIDRandom() { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", StringSerializer.class); + props.put("value.serializer", StringSerializer.class); + + String clientId = "my-producer-" + UUID.randomUUID(); + props.setProperty("client.id", clientId); + KafkaProducer producer1 = new KafkaProducer<>(props); + + clientId = "my-producer-" + UUID.randomUUID(); + props.setProperty("client.id", clientId); + KafkaProducer producer2 = new KafkaProducer<>(props); + } + + public static void closeProducerProperlyBeforeReinstantiate() { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "my-producer"); + props.put("key.serializer", StringSerializer.class); + props.put("value.serializer", StringSerializer.class); + + KafkaProducer producer1 = new KafkaProducer<>(props); + producer1.close(); + + producer1 = new KafkaProducer<>(props); + } + + public static void useUniqueObjectName() throws Exception { + MBeanServer mBeanServer1 = ManagementFactory.getPlatformMBeanServer(); + MBeanServer mBeanServer2 = ManagementFactory.getPlatformMBeanServer(); + + ObjectName objectName1 = new ObjectName("kafka.server:type=KafkaMetrics,id=metric1"); + ObjectName objectName2 = new ObjectName("kafka.server:type=KafkaMetrics,id=metric2"); + + MyMBean mBean1 = new MyMBean(); + mBeanServer1.registerMBean(mBean1, objectName1); + + MyMBean mBean2 = new MyMBean(); + mBeanServer2.registerMBean(mBean2, objectName2); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/KafkaAppMain.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/KafkaAppMain.java new file mode 100644 index 0000000000..14d6b71b10 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/KafkaAppMain.java @@ -0,0 +1,12 @@ +package com.baeldung.spring.kafka.kafkaexception; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaAppMain { + + public static void main(String[] args) { + SpringApplication.run(KafkaAppMain.class, args); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/SimulateInstanceAlreadyExistsException.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/SimulateInstanceAlreadyExistsException.java new file mode 100644 index 0000000000..d454d317f2 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/kafkaexception/SimulateInstanceAlreadyExistsException.java @@ -0,0 +1,123 @@ +package com.baeldung.spring.kafka.kafkaexception; + +import java.lang.management.ManagementFactory; +import java.util.Properties; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.DynamicMBean; +import javax.management.InvalidAttributeValueException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanConstructorInfo; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanNotificationInfo; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.ReflectionException; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.stereotype.Service; + +@Service +public class SimulateInstanceAlreadyExistsException { + + public static void jmxRegistrationConflicts() throws Exception { + // Create two instances of MBeanServer + MBeanServer mBeanServer1 = ManagementFactory.getPlatformMBeanServer(); + MBeanServer mBeanServer2 = ManagementFactory.getPlatformMBeanServer(); + + // Define the same ObjectName for both MBeans + ObjectName objectName = new ObjectName("kafka.server:type=KafkaMetrics"); + + // Create and register the first MBean + MyMBean mBean1 = new MyMBean(); + mBeanServer1.registerMBean(mBean1, objectName); + + // Attempt to register the second MBean with the same ObjectName + MyMBean mBean2 = new MyMBean(); + mBeanServer2.registerMBean(mBean2, objectName); + } + + public static void duplicateConsumerClientID() { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "my-consumer"); + props.put("group.id", "test-group"); + props.put("key.deserializer", StringDeserializer.class); + props.put("value.deserializer", StringDeserializer.class); + + // Simulating concurrent client creation by multiple threads + for (int i = 0; i < 3; i++) { + new Thread(() -> { + KafkaConsumer consumer = new KafkaConsumer<>(props); + }).start(); + } + } + + public void duplicateProducerClientID() throws Exception { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "my-producer"); + props.put("key.serializer", StringSerializer.class); + props.put("value.serializer", StringSerializer.class); + + KafkaProducer producer1 = new KafkaProducer<>(props); + // Attempting to create another producer using same client.id + KafkaProducer producer2 = new KafkaProducer<>(props); + } + + public static void unclosedProducerAndReinitialize() { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "my-producer"); + props.put("key.serializer", StringSerializer.class); + props.put("value.serializer", StringSerializer.class); + + KafkaProducer producer1 = new KafkaProducer<>(props); + // Attempting to reinitialize without proper close + producer1 = new KafkaProducer<>(props); + } +} + +class MyMBean implements DynamicMBean { + + @Override + public Object getAttribute(String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException { + return null; + } + + @Override + public void setAttribute(Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException { + + } + + @Override + public AttributeList getAttributes(String[] attributes) { + return null; + } + + @Override + public AttributeList setAttributes(AttributeList attributes) { + return null; + } + + @Override + public Object invoke(String actionName, Object[] params, String[] signature) throws MBeanException, ReflectionException { + return null; + } + + @Override + public MBeanInfo getMBeanInfo() { + MBeanAttributeInfo[] attributes = new MBeanAttributeInfo[0]; + MBeanConstructorInfo[] constructors = new MBeanConstructorInfo[0]; + MBeanOperationInfo[] operations = new MBeanOperationInfo[0]; + MBeanNotificationInfo[] notifications = new MBeanNotificationInfo[0]; + return new MBeanInfo(MyMBean.class.getName(), "My MBean", attributes, constructors, operations, notifications); + } +}