From bf5b3045a1f6fe710a1ae63e935504a6139f05ba Mon Sep 17 00:00:00 2001 From: Eric Goebelbecker Date: Sat, 3 Feb 2018 22:49:11 -0500 Subject: [PATCH] BAEL-1486 - sample code for JGroups. Fixed a typo in InfluxDB. (#3578) --- influxdb/README.md | 1 - .../influxdb/InfluxDBConnectionLiveTest.java | 3 +- jgroups/README.md | 15 ++ jgroups/pom.xml | 36 +++ .../baeldung/jgroups/JGroupsMessenger.java | 222 ++++++++++++++++++ jgroups/src/main/resources/udp.xml | 48 ++++ pom.xml | 1 + 7 files changed, 323 insertions(+), 3 deletions(-) create mode 100644 jgroups/README.md create mode 100644 jgroups/pom.xml create mode 100644 jgroups/src/main/java/com/baeldung/jgroups/JGroupsMessenger.java create mode 100644 jgroups/src/main/resources/udp.xml diff --git a/influxdb/README.md b/influxdb/README.md index 7d1684688d..f2c421580e 100644 --- a/influxdb/README.md +++ b/influxdb/README.md @@ -2,7 +2,6 @@ ### Relevant Article: - [Introduction to using InfluxDB with Java](http://www.baeldung.com/using-influxdb-with-java/) -- [Using InfluxDB with Java](http://www.baeldung.com/java-influxdb) ### Overview This Maven project contains the Java code for the article linked above. diff --git a/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java b/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java index 6c7a03cb70..858903a676 100644 --- a/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java +++ b/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java @@ -8,7 +8,6 @@ import org.influxdb.dto.*; import org.influxdb.impl.InfluxDBResultMapper; import org.junit.Test; -import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -103,7 +102,7 @@ public class InfluxDBConnectionLiveTest { // another brief pause. Thread.sleep(10); - List memoryPointList = getPoints(connection, "Select * from memory", "baeldung"); + List memoryPointList = getPoints(connection, "Select * from memory", "baeldung"); assertEquals(10, memoryPointList.size()); diff --git a/jgroups/README.md b/jgroups/README.md new file mode 100644 index 0000000000..bb2813c3d6 --- /dev/null +++ b/jgroups/README.md @@ -0,0 +1,15 @@ +## Reliable Messaging with JGroups Tutorial Project + +### Relevant Article: +- [Reliable Messaging with JGroups](http://www.baeldung.com/reliable-messaging-with-jgroups/) + +### Overview +This Maven project contains the Java code for the article linked above. + +### Package Organization +Java classes for the intro tutorial are in the org.baeldung.jgroups package. + + +### Running the tests + +``` diff --git a/jgroups/pom.xml b/jgroups/pom.xml new file mode 100644 index 0000000000..0e5971875e --- /dev/null +++ b/jgroups/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + jgroups + 0.1-SNAPSHOT + jar + jgroups + Reliable Messaging with JGroups Tutorial + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.jgroups + jgroups + 4.0.10.Final + + + + commons-cli + commons-cli + 1.4 + + + + + 1.8 + UTF-8 + + + diff --git a/jgroups/src/main/java/com/baeldung/jgroups/JGroupsMessenger.java b/jgroups/src/main/java/com/baeldung/jgroups/JGroupsMessenger.java new file mode 100644 index 0000000000..2a8df8a9ab --- /dev/null +++ b/jgroups/src/main/java/com/baeldung/jgroups/JGroupsMessenger.java @@ -0,0 +1,222 @@ +package com.baeldung.jgroups; + +import org.apache.commons.cli.*; +import org.jgroups.*; +import org.jgroups.util.Util; + +import java.io.*; +import java.util.List; +import java.util.Optional; + +public class JGroupsMessenger extends ReceiverAdapter { + + private JChannel channel; + private String userName; + private String clusterName; + private View lastView; + private boolean running = true; + + // Our shared state + private Integer messageCount = 0; + + /** + * Connect to a JGroups cluster using command line options + * @param args command line arguments + * @throws Exception + */ + private void start(String[] args) throws Exception { + processCommandline(args); + + // Create the channel + // This file could be moved, or made a command line option. + channel = new JChannel("src/main/resources/udp.xml"); + + // Set a name + channel.name(userName); + + // Register for callbacks + channel.setReceiver(this); + + // Ignore our messages + channel.setDiscardOwnMessages(true); + + // Connect + channel.connect(clusterName); + + // Start state transfer + channel.getState(null, 0); + + // Do the things + processInput(); + + // Clean up + channel.close(); + + } + + /** + * Quick and dirty implementaton of commons cli for command line args + * @param args the command line args + * @throws ParseException + */ + private void processCommandline(String[] args) throws ParseException { + + // Options, parser, friendly help + Options options = new Options(); + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + + options.addOption("u", "user", true, "User name") + .addOption("c", "cluster", true, "Cluster name"); + + CommandLine line = parser.parse(options, args); + + if (line.hasOption("user")) { + userName = line.getOptionValue("user"); + } else { + formatter.printHelp("JGroupsMessenger: need a user name.\n", options); + System.exit(-1); + } + + if (line.hasOption("cluster")) { + clusterName = line.getOptionValue("cluster"); + } else { + formatter.printHelp("JGroupsMessenger: need a cluster name.\n", options); + System.exit(-1); + } + } + + // Start it up + public static void main(String[] args) throws Exception { + new JGroupsMessenger().start(args); + } + + + @Override + public void viewAccepted(View newView) { + + // Save view if this is the first + if (lastView == null) { + System.out.println("Received initial view:"); + newView.forEach(System.out::println); + } else { + + // Compare to last view + System.out.println("Received new view."); + + List
newMembers = View.newMembers(lastView, newView); + if (newMembers.size() > 0) { + System.out.println("New members: "); + newMembers.forEach(System.out::println); + } + + List
exMembers = View.leftMembers(lastView, newView); + if (exMembers.size() > 0) { + System.out.println("Exited members:"); + exMembers.forEach(System.out::println); + } + } + lastView = newView; + } + + /** + * Loop on console input until we see 'x' to exit + */ + private void processInput() { + + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + while (running) { + try { + + // Get a destination, means broadcast + Address destination = null; + System.out.print("Enter a destination: "); + System.out.flush(); + String destinationName = in.readLine().toLowerCase(); + + if (destinationName.equals("x")) { + running = false; + continue; + } else if (!destinationName.isEmpty()) { + Optional
optDestination = getAddress(destinationName); + if (optDestination.isPresent()) { + destination = optDestination.get(); + } else { + System.out.println("Destination not found, try again."); + continue; + } + } + + // Accept a string to send + System.out.print("Enter a message: "); + System.out.flush(); + String line = in.readLine().toLowerCase(); + sendMessage(destination, line); + } catch (IOException ioe) { + running = false; + } + } + System.out.println("Exiting."); + } + + /** + * Send message from here + * @param destination the destination + * @param messageString the message + */ + private void sendMessage(Address destination, String messageString) { + try { + System.out.println("Sending " + messageString + " to " + destination); + Message message = new Message(destination, messageString); + channel.send(message); + } catch (Exception exception) { + System.err.println("Exception sending message: " + exception.getMessage()); + running = false; + } + } + + @Override + public void receive(Message message) { + // Print source and dest with message + String line = "Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); + + // Only track the count of broadcast messages + // Tracking direct message would make for a pointless state + if (message.getDest() == null) { + messageCount++; + System.out.println("Message count: " + messageCount); + } + + System.out.println(line); + } + + @Override + public void getState(OutputStream output) throws Exception { + // Serialize into the stream + Util.objectToStream(messageCount, new DataOutputStream(output)); + } + + @Override + public void setState(InputStream input) { + + // NOTE: since we know that incrementing the count and transferring the state + // is done inside the JChannel's thread, we don't have to worry about synchronizing + // messageCount. For production code it should be synchronized! + try { + // Deserialize + messageCount = Util.objectFromStream(new DataInputStream(input)); + } catch (Exception e) { + System.out.println("Error deserialing state!"); + } + System.out.println(messageCount + " is the current messagecount."); + } + + + private Optional
getAddress(String name) { + View view = channel.view(); + return view.getMembers().stream().filter(address -> name.equals(address.toString())).findFirst(); + } + +} + + diff --git a/jgroups/src/main/resources/udp.xml b/jgroups/src/main/resources/udp.xml new file mode 100644 index 0000000000..5a9bfd0851 --- /dev/null +++ b/jgroups/src/main/resources/udp.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 582ee6696e..ca6d4afe82 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ javax-servlets javaxval jaxb + jgroups jee-7 jjwt