diff --git a/libraries/pom.xml b/libraries/pom.xml index 0f73324386..6ff06d7285 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -114,6 +114,13 @@ jetty-webapp ${jetty.version} + + + io.nats + jnats + ${jnats.version} + + rome rome @@ -797,6 +804,8 @@ 1.6 1.4.196 9.4.8.v20171121 + 1.0 + 4.5.3 2.5 1.2.0 diff --git a/libraries/src/main/java/com/baeldung/jnats/NatsClient.java b/libraries/src/main/java/com/baeldung/jnats/NatsClient.java new file mode 100644 index 0000000000..cdd1e764ef --- /dev/null +++ b/libraries/src/main/java/com/baeldung/jnats/NatsClient.java @@ -0,0 +1,150 @@ +package com.baeldung.jnats; + +import io.nats.client.*; + +import java.io.IOException; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NatsClient { + + private String serverURI; + + private Connection natsConnection; + + private Map subscriptions = new HashMap<>(); + + private final static Logger log = LoggerFactory.getLogger(NatsClient.class); + + public NatsClient() { + this.serverURI = "jnats://localhost:4222"; + natsConnection = initConnection(serverURI); + } + + + public NatsClient(String serverURI) { + + if ((serverURI != null) && (!serverURI.isEmpty())) { + this.serverURI = serverURI; + } else { + this.serverURI = "jnats://localhost:4222"; + } + + natsConnection = initConnection(serverURI); + } + + public void closeConnection() { + // Close connection + natsConnection.close(); + } + + + private Connection initConnection(String uri) { + try { + + Options options = new Options.Builder() + .errorCb(new ExceptionHandler() { + @Override + public void onException(NATSException ex) { + log.error("Connection Exception: ", ex); + } + }) + .disconnectedCb(new DisconnectedCallback() { + @Override + public void onDisconnect(ConnectionEvent event) { + log.error("Channel disconnected: {}", event.getConnection()); + } + }) + .reconnectedCb(new ReconnectedCallback() { + @Override + public void onReconnect(ConnectionEvent event) { + log.error("Reconnected to server: {}", event.getConnection()); + } + }) + .build(); + + return Nats.connect(uri, options); + + } catch (IOException ioe) { + log.error("Error connecting to NATs! ", ioe); + return null; + } + } + + + public void publishMessage(String topic, String replyTo, String message) { + try { + // Simple Publisher + natsConnection.publish(topic, replyTo, message.getBytes()); + } catch (IOException ioe) { + log.error("Error publishing message: {} to {} ", message, topic, ioe); + } + } + + + public void subscribeAsync(String topic) { + + // Simple Async Subscriber + AsyncSubscription subscription = natsConnection.subscribe(topic, new MessageHandler() { + @Override + public void onMessage(Message msg) { + log.info("Received message on {}", msg.getSubject()); + } + }); + + if (subscription == null) { + log.error("Error subscribing to {}", topic); + } else { + subscriptions.put(topic, subscription); + } + } + + public SyncSubscription subscribeSync(String topic) { + // Simple Sync Subscriber + return natsConnection.subscribe(topic); + } + + public void unsubscribe(String topic) { + + try { + Subscription subscription = subscriptions.get(topic); + + if (subscription != null) { + subscription.unsubscribe(); + } else { + log.error("{} not found. Unable to unsubscribe.", topic); + } + + } catch (IOException ioe) { + log.error("Error unsubscribing from {} ", topic, ioe); + } + } + + + public Message makeRequest(String topic, String request) { + + try { + return natsConnection.request(topic, request.getBytes(), 100); + } catch (IOException | InterruptedException ioe) { + log.error("Error making request {} to {} ", topic, request, ioe); + return null; + } + } + + public void installReply(String topic, String reply) { + natsConnection.subscribe(topic, message -> { + try { + natsConnection.publish(message.getReplyTo(), reply.getBytes()); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + public SyncSubscription joinQueueGroup(String topic, String queue) { + return natsConnection.subscribe(topic, queue); + } + +} diff --git a/libraries/src/test/java/com/baeldung/jnats/NatsClientLiveTest.java b/libraries/src/test/java/com/baeldung/jnats/NatsClientLiveTest.java new file mode 100644 index 0000000000..2a6756ed55 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/jnats/NatsClientLiveTest.java @@ -0,0 +1,114 @@ +package com.baeldung.jnats; + +import io.nats.client.Message; +import io.nats.client.SyncSubscription; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class NatsClientLiveTest { + + @Test + public void givenMessageExchange_MessagesReceived() throws Exception { + + NatsClient client = connectClient(); + + SyncSubscription fooSubscription = client.subscribeSync("foo.bar"); + SyncSubscription barSubscription = client.subscribeSync("bar.foo"); + client.publishMessage("foo.bar", "bar.foo", "hello there"); + + Message message = fooSubscription.nextMessage(200); + assertNotNull("No message!", message); + assertEquals("hello there", new String(message.getData())); + + client.publishMessage(message.getReplyTo(), message.getSubject(), "hello back"); + + message = barSubscription.nextMessage(200); + assertNotNull("No message!", message); + assertEquals("hello back", new String(message.getData())); + } + + + private NatsClient connectClient() { + return new NatsClient(); + } + + @Test + public void whenWildCardSubscription_andMatchTopic_MessageReceived() throws Exception { + + NatsClient client = connectClient(); + + SyncSubscription fooSubscription = client.subscribeSync("foo.*"); + + client.publishMessage("foo.bar", "bar.foo", "hello there"); + + Message message = fooSubscription.nextMessage(200); + assertNotNull("No message!", message); + assertEquals("hello there", new String(message.getData())); + } + + @Test + public void whenWildCardSubscription_andNotMatchTopic_NoMessageReceived() throws Exception { + + NatsClient client = connectClient(); + + SyncSubscription fooSubscription = client.subscribeSync("foo.*"); + + client.publishMessage("foo.bar.plop", "bar.foo", "hello there"); + + Message message = fooSubscription.nextMessage(200); + assertNull("Got message!", message); + + + SyncSubscription barSubscription = client.subscribeSync("foo.>"); + + client.publishMessage("foo.bar.plop", "bar.foo", "hello there"); + + message = barSubscription.nextMessage(200); + assertNotNull("No message!", message); + assertEquals("hello there", new String(message.getData())); + + + } + + + @Test + public void givenRequest_ReplyReceived() { + + NatsClient client = connectClient(); + client.installReply("salary.requests", "denied!"); + + Message reply = client.makeRequest("salary.requests", "I need a raise."); + assertNotNull("No message!", reply); + assertEquals("denied!", new String(reply.getData())); + + } + + @Test + public void givenQueueMessage_OnlyOneReceived() throws Exception { + + NatsClient client = connectClient(); + + SyncSubscription queue1 = client.joinQueueGroup("foo.bar.requests", "queue1"); + SyncSubscription queue2 = client.joinQueueGroup("foo.bar.requests", "queue1"); + + client.publishMessage("foo.bar.requests", "queuerequestor", "foobar"); + + List messages = new ArrayList<>(); + + Message message = queue1.nextMessage(200); + + if (message != null) messages.add(message); + message = queue2.nextMessage(200); + + if (message != null) messages.add(message); + assertEquals(1, messages.size()); + + } + +}