diff --git a/libraries/pom.xml b/libraries/pom.xml index 43c8239e96..c036c591ca 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -686,6 +686,12 @@ fugue 4.5.1 + + + org.jctools + jctools-core + ${jctools.version} + @@ -785,6 +791,52 @@ + + + maven-compiler-plugin + 3.7.0 + + 1.8 + 1.8 + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + + package + + shade + + + benchmarks + + + org.openjdk.jmh.Main + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + @@ -856,6 +908,7 @@ 9.1.5.Final 4.1 1.4.9 + 2.1.2 1.10.L001 0.9.4.0006L diff --git a/libraries/src/main/java/com/baeldung/jctools/MpmcBenchmark.java b/libraries/src/main/java/com/baeldung/jctools/MpmcBenchmark.java new file mode 100644 index 0000000000..7b754bf709 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/jctools/MpmcBenchmark.java @@ -0,0 +1,73 @@ +package com.baeldung.jctools; + +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Control; + +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +@Warmup(iterations = 1) +@Measurement(iterations = 3) +@State(Scope.Group) +public class MpmcBenchmark { + + public static final String PARAM_UNSAFE = "MpmcArrayQueue"; + public static final String PARAM_AFU = "MpmcAtomicArrayQueue"; + public static final String PARAM_JDK = "ArrayBlockingQueue"; + + public static final int PRODUCER_THREADS_NUMBER = 32; + public static final int CONSUMER_THREADS_NUMBER = 32; + + public static final String GROUP_NAME = "MyGroup"; + + public static final int CAPACITY = 128; + + @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) + public volatile String implementation; + + public volatile Queue queue; + + @Setup(Level.Trial) + public void setUp() { + switch (implementation) { + case PARAM_UNSAFE: + queue = new MpmcArrayQueue<>(CAPACITY); + break; + case PARAM_AFU: + queue = new MpmcAtomicArrayQueue<>(CAPACITY); + break; + case PARAM_JDK: + queue = new ArrayBlockingQueue<>(CAPACITY); + break; + default: + throw new UnsupportedOperationException("Unsupported implementation " + implementation); + } + } + + + @Benchmark + @Group(GROUP_NAME) + @GroupThreads(PRODUCER_THREADS_NUMBER) + public void write(Control control) { + //noinspection StatementWithEmptyBody + while (!control.stopMeasurement && !queue.offer(1L)) { + // Is intentionally left blank + } + } + + @Benchmark + @Group(GROUP_NAME) + @GroupThreads(CONSUMER_THREADS_NUMBER) + public void read(Control control) { + //noinspection StatementWithEmptyBody + while (!control.stopMeasurement && queue.poll() == null) { + // Is intentionally left blank + } + } +} diff --git a/libraries/src/main/java/com/baeldung/jctools/README.md b/libraries/src/main/java/com/baeldung/jctools/README.md new file mode 100644 index 0000000000..3c1b3c1c1e --- /dev/null +++ b/libraries/src/main/java/com/baeldung/jctools/README.md @@ -0,0 +1,7 @@ +## How to build and run the JMH benchmark + +Execute the following from the project's root: +```bash +mvn clean install +java -jar ./target/benchmarks.jar MpmcBenchmark -si true +``` \ No newline at end of file diff --git a/libraries/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java b/libraries/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java new file mode 100644 index 0000000000..4a9d0fadb2 --- /dev/null +++ b/libraries/src/test/java/com/baeldung/jctools/JCToolsUnitTest.java @@ -0,0 +1,86 @@ +package com.baeldung.jctools; + +import org.jctools.queues.SpscArrayQueue; +import org.jctools.queues.SpscChunkedArrayQueue; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class JCToolsUnitTest { + + @Test + public void givenMultipleProducers_whenSpscQueueUsed_thenNoWarningOccurs() throws InterruptedException { + SpscArrayQueue queue = new SpscArrayQueue(2); + + Thread producer1 = new Thread(() -> { + queue.offer(1); + }); + producer1.start(); + producer1.join(); + + Thread producer2 = new Thread(() -> { + queue.offer(2); + }); + producer2.start(); + producer2.join(); + + Set fromQueue = new HashSet<>(); + Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); + consumer.start(); + consumer.join(); + + assertThat(fromQueue).containsOnly(1, 2); + } + + @Test + public void whenQueueIsFull_thenNoMoreElementsCanBeAdded() throws InterruptedException { + SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue<>(8, 16); + assertThat(queue.capacity()).isEqualTo(16); + + CountDownLatch startConsuming = new CountDownLatch(1); + CountDownLatch awakeProducer = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + Thread producer = new Thread(() -> { + IntStream.range(0, queue.capacity()).forEach(i -> { + assertThat(queue.offer(i)).isTrue(); + }); + assertThat(queue.offer(queue.capacity())).isFalse(); + startConsuming.countDown(); + try { + awakeProducer.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + assertThat(queue.offer(queue.capacity())).isTrue(); + }); + producer.setUncaughtExceptionHandler((t, e) -> { + error.set(e); + startConsuming.countDown(); + }); + producer.start(); + + startConsuming.await(); + + if (error.get() != null) { + fail("Producer's assertion failed", error.get()); + } + + Set fromQueue = new HashSet<>(); + queue.drain(fromQueue::add); + awakeProducer.countDown(); + producer.join(); + queue.drain(fromQueue::add); + + assertThat(fromQueue).containsAll(IntStream.range(0, 17).boxed().collect(Collectors.toSet())); + } +}