From f4feab9212a4bce79c79530960a139eced155fd9 Mon Sep 17 00:00:00 2001 From: Denis Zhdanov Date: Mon, 2 Apr 2018 00:36:53 +0300 Subject: [PATCH 1/3] BAEL-1423 Java Concurrency Utility with JCTools Library --- jctools/README.md | 11 ++ jctools/pom.xml | 108 ++++++++++++++++++ .../com/baeldung/jctools/MpmcBenchmark.java | 98 ++++++++++++++++ .../com/baeldung/jctools/JCToolsUnitTest.java | 86 ++++++++++++++ 4 files changed, 303 insertions(+) create mode 100644 jctools/README.md create mode 100644 jctools/pom.xml create mode 100644 jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java create mode 100644 jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java diff --git a/jctools/README.md b/jctools/README.md new file mode 100644 index 0000000000..181e45c1ca --- /dev/null +++ b/jctools/README.md @@ -0,0 +1,11 @@ +## Overview + +This project holds a [couple of tests](./src/test/java/com/baeldung/jctools/JCToolsUnitTest.java) which illustrate JCTools specifics and a [benchmark](./src/main/java/com/baeldung/jctools/MpmcBenchmark.java) in the [JMH](http://openjdk.java.net/projects/code-tools/jmh/) format. + +## How to build and run the JMH benchmark + +Execute the following from the project's root: +```bash +mvn clean install +java -jar ./target/benchmarks.jar MpmcBenchmark -si true +``` \ No newline at end of file diff --git a/jctools/pom.xml b/jctools/pom.xml new file mode 100644 index 0000000000..8a21c610de --- /dev/null +++ b/jctools/pom.xml @@ -0,0 +1,108 @@ + + 4.0.0 + + com.baeldung + jctools + 0.0.1-SNAPSHOT + + jctools + + + + org.jctools + jctools-core + ${jctools.version} + + + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + provided + + + + junit + junit + 4.12 + test + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + 2.1.2 + 3.9.1 + 1.20 + + 1.8 + + benchmarks + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${javac.target} + ${javac.target} + ${javac.target} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + + package + + shade + + + ${uberjar.name} + + + org.openjdk.jmh.Main + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + \ No newline at end of file diff --git a/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java b/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java new file mode 100644 index 0000000000..0e543fa609 --- /dev/null +++ b/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java @@ -0,0 +1,98 @@ +package com.baeldung.jctools; + +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Control; + +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +@Warmup(iterations = 1) +@Measurement(iterations = 3) +public class MpmcBenchmark { + + public static final String GROUP_UNSAFE = "MpmcArrayQueue"; + public static final String GROUP_AFU = "MpmcAtomicArrayQueue"; + public static final String GROUP_JDK = "ArrayBlockingQueue"; + + public static final int PRODUCER_THREADS_NUMBER = 32; + public static final int CONSUMER_THREADS_NUMBER = 32; + + public static final int CAPACITY = 128; + + @State(Scope.Group) + public static class Mpmc { + public final Queue queue = new MpmcArrayQueue<>(CAPACITY); + } + + @State(Scope.Group) + public static class MpmcAtomic { + public final Queue queue = new MpmcAtomicArrayQueue<>(CAPACITY); + } + + @State(Scope.Group) + public static class Jdk { + public final Queue queue = new ArrayBlockingQueue<>(CAPACITY); + } + + @Benchmark + @Group(GROUP_UNSAFE) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void mpmcWrite(Control control, Mpmc state) { + write(control, state.queue); + } + + @Benchmark + @Group(GROUP_UNSAFE) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void mpmcRead(Control control, Mpmc state) { + read(control, state.queue); + } + + @Benchmark + @Group(GROUP_AFU) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void mpmcAtomicWrite(Control control, MpmcAtomic state) { + write(control, state.queue); + } + + @Benchmark + @Group(GROUP_AFU) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void mpmcAtomicRead(Control control, MpmcAtomic state) { + read(control, state.queue); + } + + @Benchmark + @Group(GROUP_JDK) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void jdkWrite(Control control, Jdk state) { + write(control, state.queue); + } + + @Benchmark + @Group(GROUP_JDK) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void jdkRead(Control control, Jdk state) { + read(control, state.queue); + } + + private void write(Control control, Queue queue) { + //noinspection StatementWithEmptyBody + while (!control.stopMeasurement && !queue.offer(1L)) { + // Is intentionally left blank + } + } + + private void read(Control control, Queue queue) { + //noinspection StatementWithEmptyBody + while (!control.stopMeasurement && queue.poll() == null) { + // Is intentionally left blank + } + } +} diff --git a/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java b/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java new file mode 100644 index 0000000000..4a9d0fadb2 --- /dev/null +++ b/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java @@ -0,0 +1,86 @@ +package com.baeldung.jctools; + +import org.jctools.queues.SpscArrayQueue; +import org.jctools.queues.SpscChunkedArrayQueue; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class JCToolsUnitTest { + + @Test + public void givenMultipleProducers_whenSpscQueueUsed_thenNoWarningOccurs() throws InterruptedException { + SpscArrayQueue queue = new SpscArrayQueue(2); + + Thread producer1 = new Thread(() -> { + queue.offer(1); + }); + producer1.start(); + producer1.join(); + + Thread producer2 = new Thread(() -> { + queue.offer(2); + }); + producer2.start(); + producer2.join(); + + Set fromQueue = new HashSet<>(); + Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); + consumer.start(); + consumer.join(); + + assertThat(fromQueue).containsOnly(1, 2); + } + + @Test + public void whenQueueIsFull_thenNoMoreElementsCanBeAdded() throws InterruptedException { + SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue<>(8, 16); + assertThat(queue.capacity()).isEqualTo(16); + + CountDownLatch startConsuming = new CountDownLatch(1); + CountDownLatch awakeProducer = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + Thread producer = new Thread(() -> { + IntStream.range(0, queue.capacity()).forEach(i -> { + assertThat(queue.offer(i)).isTrue(); + }); + assertThat(queue.offer(queue.capacity())).isFalse(); + startConsuming.countDown(); + try { + awakeProducer.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertThat(queue.offer(queue.capacity())).isTrue(); + }); + producer.setUncaughtExceptionHandler((t, e) -> { + error.set(e); + startConsuming.countDown(); + }); + producer.start(); + + startConsuming.await(); + + if (error.get() != null) { + fail("Producer's assertion failed", error.get()); + } + + Set fromQueue = new HashSet<>(); + queue.drain(fromQueue::add); + awakeProducer.countDown(); + producer.join(); + queue.drain(fromQueue::add); + + assertThat(fromQueue).containsAll(IntStream.range(0, 17).boxed().collect(Collectors.toSet())); + } +} From b296e9a4962fe598913c23945a45c181f29fb4d2 Mon Sep 17 00:00:00 2001 From: Denis Zhdanov Date: Mon, 2 Apr 2018 04:32:05 +0300 Subject: [PATCH 2/3] BAEL-1423 Java Concurrency Utility with JCTools Library Refactored JMH benchmark after digging deeper into its features --- .../com/baeldung/jctools/MpmcBenchmark.java | 89 +++++++------------ 1 file changed, 32 insertions(+), 57 deletions(-) diff --git a/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java b/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java index 0e543fa609..7b754bf709 100644 --- a/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java +++ b/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java @@ -14,82 +14,57 @@ import java.util.concurrent.TimeUnit; @Fork(1) @Warmup(iterations = 1) @Measurement(iterations = 3) +@State(Scope.Group) public class MpmcBenchmark { - public static final String GROUP_UNSAFE = "MpmcArrayQueue"; - public static final String GROUP_AFU = "MpmcAtomicArrayQueue"; - public static final String GROUP_JDK = "ArrayBlockingQueue"; + public static final String PARAM_UNSAFE = "MpmcArrayQueue"; + public static final String PARAM_AFU = "MpmcAtomicArrayQueue"; + public static final String PARAM_JDK = "ArrayBlockingQueue"; public static final int PRODUCER_THREADS_NUMBER = 32; public static final int CONSUMER_THREADS_NUMBER = 32; + public static final String GROUP_NAME = "MyGroup"; + public static final int CAPACITY = 128; - @State(Scope.Group) - public static class Mpmc { - public final Queue queue = new MpmcArrayQueue<>(CAPACITY); + @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) + public volatile String implementation; + + public volatile Queue queue; + + @Setup(Level.Trial) + public void setUp() { + switch (implementation) { + case PARAM_UNSAFE: + queue = new MpmcArrayQueue<>(CAPACITY); + break; + case PARAM_AFU: + queue = new MpmcAtomicArrayQueue<>(CAPACITY); + break; + case PARAM_JDK: + queue = new ArrayBlockingQueue<>(CAPACITY); + break; + default: + throw new UnsupportedOperationException("Unsupported implementation " + implementation); + } } - @State(Scope.Group) - public static class MpmcAtomic { - public final Queue queue = new MpmcAtomicArrayQueue<>(CAPACITY); - } - - @State(Scope.Group) - public static class Jdk { - public final Queue queue = new ArrayBlockingQueue<>(CAPACITY); - } @Benchmark - @Group(GROUP_UNSAFE) + @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) - public void mpmcWrite(Control control, Mpmc state) { - write(control, state.queue); - } - - @Benchmark - @Group(GROUP_UNSAFE) - @GroupThreads(CONSUMER_THREADS_NUMBER) - public void mpmcRead(Control control, Mpmc state) { - read(control, state.queue); - } - - @Benchmark - @Group(GROUP_AFU) - @GroupThreads(PRODUCER_THREADS_NUMBER) - public void mpmcAtomicWrite(Control control, MpmcAtomic state) { - write(control, state.queue); - } - - @Benchmark - @Group(GROUP_AFU) - @GroupThreads(CONSUMER_THREADS_NUMBER) - public void mpmcAtomicRead(Control control, MpmcAtomic state) { - read(control, state.queue); - } - - @Benchmark - @Group(GROUP_JDK) - @GroupThreads(PRODUCER_THREADS_NUMBER) - public void jdkWrite(Control control, Jdk state) { - write(control, state.queue); - } - - @Benchmark - @Group(GROUP_JDK) - @GroupThreads(CONSUMER_THREADS_NUMBER) - public void jdkRead(Control control, Jdk state) { - read(control, state.queue); - } - - private void write(Control control, Queue queue) { + public void write(Control control) { //noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // Is intentionally left blank } } - private void read(Control control, Queue queue) { + @Benchmark + @Group(GROUP_NAME) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void read(Control control) { //noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // Is intentionally left blank From 4073b10bbecef1feda641f291d312071d6675f17 Mon Sep 17 00:00:00 2001 From: Denis Zhdanov Date: Mon, 16 Apr 2018 01:39:16 +0300 Subject: [PATCH 3/3] BAEL-1423 Java Concurrency Utility with JCTools library Addressed editor's comments --- jctools/README.md | 11 -- jctools/pom.xml | 108 ------------------ libraries/pom.xml | 45 ++++++++ .../com/baeldung/jctools/MpmcBenchmark.java | 0 .../main/java/com/baeldung/jctools/README.md | 7 ++ .../com/baeldung/jctools/JCToolsUnitTest.java | 0 6 files changed, 52 insertions(+), 119 deletions(-) delete mode 100644 jctools/README.md delete mode 100644 jctools/pom.xml rename {jctools => libraries}/src/main/java/com/baeldung/jctools/MpmcBenchmark.java (100%) create mode 100644 libraries/src/main/java/com/baeldung/jctools/README.md rename {jctools => libraries}/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java (100%) diff --git a/jctools/README.md b/jctools/README.md deleted file mode 100644 index 181e45c1ca..0000000000 --- a/jctools/README.md +++ /dev/null @@ -1,11 +0,0 @@ -## Overview - -This project holds a [couple of tests](./src/test/java/com/baeldung/jctools/JCToolsUnitTest.java) which illustrate JCTools specifics and a [benchmark](./src/main/java/com/baeldung/jctools/MpmcBenchmark.java) in the [JMH](http://openjdk.java.net/projects/code-tools/jmh/) format. - -## How to build and run the JMH benchmark - -Execute the following from the project's root: -```bash -mvn clean install -java -jar ./target/benchmarks.jar MpmcBenchmark -si true -``` \ No newline at end of file diff --git a/jctools/pom.xml b/jctools/pom.xml deleted file mode 100644 index 8a21c610de..0000000000 --- a/jctools/pom.xml +++ /dev/null @@ -1,108 +0,0 @@ - - 4.0.0 - - com.baeldung - jctools - 0.0.1-SNAPSHOT - - jctools - - - - org.jctools - jctools-core - ${jctools.version} - - - - org.openjdk.jmh - jmh-core - ${jmh.version} - - - - org.openjdk.jmh - jmh-generator-annprocess - ${jmh.version} - provided - - - - junit - junit - 4.12 - test - - - - org.assertj - assertj-core - ${assertj.version} - test - - - - - 2.1.2 - 3.9.1 - 1.20 - - 1.8 - - benchmarks - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - ${javac.target} - ${javac.target} - ${javac.target} - - - - - - org.apache.maven.plugins - maven-shade-plugin - 2.2 - - - package - - shade - - - ${uberjar.name} - - - org.openjdk.jmh.Main - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - - - - - \ No newline at end of file diff --git a/libraries/pom.xml b/libraries/pom.xml index 6ff06d7285..8a3a27cec4 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -661,6 +661,12 @@ fugue 4.5.1 + + + org.jctools + jctools-core + ${jctools.version} + @@ -780,6 +786,44 @@ 1.8 + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + + package + + shade + + + benchmarks + + + org.openjdk.jmh.Main + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + @@ -850,6 +894,7 @@ 2.2.0 9.1.5.Final 1.4.9 + 2.1.2 diff --git a/jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java b/libraries/src/main/java/com/baeldung/jctools/MpmcBenchmark.java similarity index 100% rename from jctools/src/main/java/com/baeldung/jctools/MpmcBenchmark.java rename to libraries/src/main/java/com/baeldung/jctools/MpmcBenchmark.java diff --git a/libraries/src/main/java/com/baeldung/jctools/README.md b/libraries/src/main/java/com/baeldung/jctools/README.md new file mode 100644 index 0000000000..3c1b3c1c1e --- /dev/null +++ b/libraries/src/main/java/com/baeldung/jctools/README.md @@ -0,0 +1,7 @@ +## How to build and run the JMH benchmark + +Execute the following from the project's root: +```bash +mvn clean install +java -jar ./target/benchmarks.jar MpmcBenchmark -si true +``` \ No newline at end of file diff --git a/jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java b/libraries/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java similarity index 100% rename from jctools/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java rename to libraries/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java