From f979a4075d829af95eec40aaafcfc39e9eac243c Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Wed, 26 Apr 2017 12:57:16 +0200 Subject: [PATCH] BAEL-856 code for long adder and accumulator (#1723) * BAEL-856 code for long adder and accumulator * BAEL-856 rearange packages * BAEL-856 Formatting --- .../accumulator/LongAccumulatorTest.java | 41 ++++++++++++ .../concurrent/adder/LongAdderTest.java | 65 +++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 core-java/src/test/java/com/baeldung/concurrent/accumulator/LongAccumulatorTest.java create mode 100644 core-java/src/test/java/com/baeldung/concurrent/adder/LongAdderTest.java diff --git a/core-java/src/test/java/com/baeldung/concurrent/accumulator/LongAccumulatorTest.java b/core-java/src/test/java/com/baeldung/concurrent/accumulator/LongAccumulatorTest.java new file mode 100644 index 0000000000..46e107b607 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/accumulator/LongAccumulatorTest.java @@ -0,0 +1,41 @@ +package com.baeldung.concurrent.accumulator; + +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.function.LongBinaryOperator; +import java.util.stream.IntStream; + +import static junit.framework.TestCase.assertEquals; + +public class LongAccumulatorTest { + + @Test + public void givenLongAccumulator_whenApplyActionOnItFromMultipleThrads_thenShouldProduceProperResult() throws InterruptedException { + //given + ExecutorService executorService = Executors.newFixedThreadPool(8); + LongBinaryOperator higherValueFinder = (currentValue, previousValue) -> currentValue > previousValue ? currentValue : previousValue; + LongAccumulator accumulator = new LongAccumulator(higherValueFinder, 0L); + + int numberOfThreads = 4; + int numberOfIncrements = 100; + + //when + Runnable accumulateAction = () -> IntStream + .rangeClosed(0, numberOfIncrements) + .forEach(accumulator::accumulate); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.execute(accumulateAction); + } + + //then + executorService.awaitTermination(500, TimeUnit.MILLISECONDS); + executorService.shutdown(); + + assertEquals(accumulator.get(), 100); + } +} diff --git a/core-java/src/test/java/com/baeldung/concurrent/adder/LongAdderTest.java b/core-java/src/test/java/com/baeldung/concurrent/adder/LongAdderTest.java new file mode 100644 index 0000000000..854db5d9b9 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/adder/LongAdderTest.java @@ -0,0 +1,65 @@ +package com.baeldung.concurrent.adder; + +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream; + +import static junit.framework.TestCase.assertEquals; + +public class LongAdderTest { + @Test + public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThem() throws InterruptedException { + //given + LongAdder counter = new LongAdder(); + ExecutorService executorService = Executors.newFixedThreadPool(8); + + int numberOfThreads = 4; + int numberOfIncrements = 100; + + //when + Runnable incrementAction = () -> IntStream + .range(0, numberOfIncrements) + .forEach((i) -> counter.increment()); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.execute(incrementAction); + } + + //then + executorService.awaitTermination(500, TimeUnit.MILLISECONDS); + executorService.shutdown(); + + assertEquals(counter.sum(), numberOfIncrements * numberOfThreads); + assertEquals(counter.sum(), numberOfIncrements * numberOfThreads); + } + + @Test + public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThemAndResetAdderAfterward() throws InterruptedException { + //given + LongAdder counter = new LongAdder(); + ExecutorService executorService = Executors.newFixedThreadPool(8); + + int numberOfThreads = 4; + int numberOfIncrements = 100; + + //when + Runnable incrementAction = () -> IntStream + .range(0, numberOfIncrements) + .forEach((i) -> counter.increment()); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.execute(incrementAction); + } + + //then + executorService.awaitTermination(500, TimeUnit.MILLISECONDS); + executorService.shutdown(); + + assertEquals(counter.sumThenReset(), numberOfIncrements * numberOfThreads); + assertEquals(counter.sum(), 0); + } +}