java concurrency

This commit is contained in:
DOHA
2017-08-25 17:13:33 +02:00
parent 3f1d94fe3c
commit c0b64d416b
123 changed files with 179601 additions and 22 deletions
+26
View File
@@ -0,0 +1,26 @@
*.class
0.*
#folders#
/target
/neoDb*
/data
/src/main/webapp/WEB-INF/classes
*/META-INF/*
.resourceCache
# Packaged files #
*.jar
*.war
*.ear
# Files generated by integration tests
*.txt
backup-pom.xml
/bin/
/temp
#IntelliJ specific
.idea/
*.iml
+32
View File
@@ -0,0 +1,32 @@
=========
## Core Java Concurrency Examples
### Relevant Articles:
- [Guide To CompletableFuture](http://www.baeldung.com/java-completablefuture)
- [A Guide to the Java ExecutorService](http://www.baeldung.com/java-executor-service-tutorial)
- [Introduction to Thread Pools in Java](http://www.baeldung.com/thread-pool-java-and-guava)
- [Guide to java.util.concurrent.Future](http://www.baeldung.com/java-future)
- [Guide to java.util.concurrent.BlockingQueue](http://www.baeldung.com/java-blocking-queue)
- [Guide to CountDownLatch in Java](http://www.baeldung.com/java-countdown-latch)
- [A Guide to ConcurrentMap](http://www.baeldung.com/java-concurrent-map)
- [Guide to PriorityBlockingQueue in Java](http://www.baeldung.com/java-priority-blocking-queue)
- [Avoiding the ConcurrentModificationException in Java](http://www.baeldung.com/java-concurrentmodificationexception)
- [Custom Thread Pools In Java 8 Parallel Streams](http://www.baeldung.com/java-8-parallel-streams-custom-threadpool)
- [Guide to java.util.concurrent.Locks](http://www.baeldung.com/java-concurrent-locks)
- [An Introduction to ThreadLocal in Java](http://www.baeldung.com/java-threadlocal)
- [Guide to DelayQueue](http://www.baeldung.com/java-delay-queue)
- [A Guide to Java SynchronousQueue](http://www.baeldung.com/java-synchronous-queue)
- [Guide to the Java TransferQueue](http://www.baeldung.com/java-transfer-queue)
- [Guide to the ConcurrentSkipListMap](http://www.baeldung.com/java-concurrent-skip-list-map)
- [Difference Between Wait and Sleep in Java](http://www.baeldung.com/java-wait-and-sleep)
- [LongAdder and LongAccumulator in Java](http://www.baeldung.com/java-longadder-and-longaccumulator)
- [The Dining Philosophers Problem in Java](http://www.baeldung.com/java-dining-philoshophers)
- [Guide to CopyOnWriteArrayList](http://www.baeldung.com/java-copy-on-write-arraylist)
- [Guide to the Java Phaser](http://www.baeldung.com/java-phaser)
- [Guide to Synchronized Keyword in Java](http://www.baeldung.com/java-synchronized)
- [An Introduction to Atomic Variables in Java](http://www.baeldung.com/java-atomic-variables)
- [CyclicBarrier in Java](http://www.baeldung.com/java-cyclic-barrier)
- [Guide to Volatile Keyword in Java](http://www.baeldung.com/java-volatile)
- [Overview of the java.util.concurrent](http://www.baeldung.com/java-util-concurrent)
- [Semaphores in Java](http://www.baeldung.com/java-semaphore)
+237
View File
@@ -0,0 +1,237 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung</groupId>
<artifactId>core-java-concurrency</artifactId>
<version>0.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>core-java-concurrency</name>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<!-- utils -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>${commons-math3.version}</version>
</dependency>
<!-- test scoped -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${avaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>core-java-concurrency</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/libs</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>libs/</classpathPrefix>
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archiveBaseDirectory>${project.basedir}</archiveBaseDirectory>
<archive>
<manifest>
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.jolira</groupId>
<artifactId>onejar-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
<attachToBuild>true</attachToBuild>
<filename>${project.build.finalName}-onejar.${project.packaging}</filename>
</configuration>
<goals>
<goal>one-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>spring-boot</classifier>
<mainClass>org.baeldung.executable.ExecutableMavenJar</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>integration</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<excludes>
<exclude>**/*ManualTest.java</exclude>
</excludes>
<includes>
<include>**/*IntegrationTest.java</include>
</includes>
</configuration>
</execution>
</executions>
<configuration>
<systemPropertyVariables>
<test.mime>json</test.mime>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<properties>
<!-- util -->
<guava.version>21.0</guava.version>
<commons-lang3.version>3.5</commons-lang3.version>
<commons-math3.version>3.6.1</commons-math3.version>
<commons-io.version>2.5</commons-io.version>
<commons-collections4.version>4.1</commons-collections4.version>
<collections-generic.version>4.01</collections-generic.version>
<!-- testing -->
<assertj.version>3.6.1</assertj.version>
<avaitility.version>1.7.0</avaitility.version>
</properties>
</project>
@@ -0,0 +1,34 @@
package com.baeldung.concurrent.Scheduledexecutorservice;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public void execute() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// Task
}, 1, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(() -> {
// Task
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// Task
}, 1, 10, TimeUnit.SECONDS);
Future<String> future = executorService.schedule(() -> {
// Task
return "Hellow world";
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}
}
@@ -0,0 +1,13 @@
package com.baeldung.concurrent.atomic;
public class SafeCounterWithLock {
private volatile int counter;
public int getValue() {
return counter;
}
public synchronized void increment() {
counter++;
}
}
@@ -0,0 +1,21 @@
package com.baeldung.concurrent.atomic;
import java.util.concurrent.atomic.AtomicInteger;
public class SafeCounterWithoutLock {
private final AtomicInteger counter = new AtomicInteger(0);
public int getValue() {
return counter.get();
}
public void increment() {
while(true) {
int existingValue = getValue();
int newValue = existingValue + 1;
if(counter.compareAndSet(existingValue, newValue)) {
return;
}
}
}
}
@@ -0,0 +1,13 @@
package com.baeldung.concurrent.atomic;
public class UnsafeCounter {
int counter;
public int getValue() {
return counter;
}
public void increment() {
counter++;
}
}
@@ -0,0 +1,24 @@
package com.baeldung.concurrent.blockingqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueUsage {
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
for (int i = 0; i < N_PRODUCERS; i++) {
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread(new NumbersConsumer(queue, poisonPill)).start();
}
}
}
@@ -0,0 +1,28 @@
package com.baeldung.concurrent.blockingqueue;
import java.util.concurrent.BlockingQueue;
public class NumbersConsumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int poisonPill;
NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
public void run() {
try {
while (true) {
Integer number = queue.take();
if (number.equals(poisonPill)) {
return;
}
String result = number.toString();
System.out.println(Thread.currentThread().getName() + " result: " + result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@@ -0,0 +1,34 @@
package com.baeldung.concurrent.blockingqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
public class NumbersProducer implements Runnable {
private final BlockingQueue<Integer> numbersQueue;
private final int poisonPill;
private final int poisonPillPerProducer;
NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
public void run() {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
}
}
}
@@ -0,0 +1,23 @@
package com.baeldung.concurrent.countdownlatch;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class BrokenWorker implements Runnable {
private final List<String> outputScraper;
private final CountDownLatch countDownLatch;
BrokenWorker(final List<String> outputScraper, final CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
if (true) {
throw new RuntimeException("Oh dear");
}
countDownLatch.countDown();
outputScraper.add("Counted down");
}
}
@@ -0,0 +1,34 @@
package com.baeldung.concurrent.countdownlatch;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class WaitingWorker implements Runnable {
private final List<String> outputScraper;
private final CountDownLatch readyThreadCounter;
private final CountDownLatch callingThreadBlocker;
private final CountDownLatch completedThreadCounter;
WaitingWorker(final List<String> outputScraper, final CountDownLatch readyThreadCounter, final CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) {
this.outputScraper = outputScraper;
this.readyThreadCounter = readyThreadCounter;
this.callingThreadBlocker = callingThreadBlocker;
this.completedThreadCounter = completedThreadCounter;
}
@Override
public void run() {
// Mark this thread as read / started
readyThreadCounter.countDown();
try {
callingThreadBlocker.await();
outputScraper.add("Counted down");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completedThreadCounter.countDown();
}
}
}
@@ -0,0 +1,22 @@
package com.baeldung.concurrent.countdownlatch;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Worker implements Runnable {
private final List<String> outputScraper;
private final CountDownLatch countDownLatch;
Worker(final List<String> outputScraper, final CountDownLatch countDownLatch) {
this.outputScraper = outputScraper;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
// Do some work
System.out.println("Doing some logic");
outputScraper.add("Counted down");
countDownLatch.countDown();
}
}
@@ -0,0 +1,77 @@
package com.baeldung.concurrent.cyclicbarrier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults = Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;
private void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;
cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each");
for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}
class NumberCruncherThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName + " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class AggregatorThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
System.out.println(thisThreadName + ": Computing final sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult + " ");
sum += partialResult;
}
System.out.println();
}
System.out.println(Thread.currentThread().getName() + ": Final result = " + sum);
}
}
public static void main(String[] args) {
CyclicBarrierDemo play = new CyclicBarrierDemo();
play.runSimulation(5, 3);
}
}
@@ -0,0 +1,24 @@
package com.baeldung.concurrent.cyclicbarrier;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// Task
System.out.println("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
}
@@ -0,0 +1,24 @@
package com.baeldung.concurrent.cyclicbarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Thread : " + Thread.currentThread().getName() + " is waiting");
barrier.await();
System.out.println("Thread : " + Thread.currentThread().getName() + " is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,35 @@
package com.baeldung.concurrent.delayqueue;
import com.google.common.primitives.Ints;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayObject implements Delayed {
private String data;
private long startTime;
DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(this.startTime - ((DelayObject) o).startTime);
}
@Override
public String toString() {
return "{" +
"data='" + data + '\'' +
", startTime=" + startTime +
'}';
}
}
@@ -0,0 +1,30 @@
package com.baeldung.concurrent.delayqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private final Integer numberOfElementsToTake;
final AtomicInteger numberOfConsumedElements = new AtomicInteger();
DelayQueueConsumer(BlockingQueue<DelayObject> queue, Integer numberOfElementsToTake) {
this.queue = queue;
this.numberOfElementsToTake = numberOfElementsToTake;
}
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@@ -0,0 +1,34 @@
package com.baeldung.concurrent.delayqueue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private final Integer numberOfElementsToProduce;
private final Integer delayOfEachProducedMessageMilliseconds;
DelayQueueProducer(BlockingQueue<DelayObject> queue,
Integer numberOfElementsToProduce,
Integer delayOfEachProducedMessageMilliseconds) {
this.queue = queue;
this.numberOfElementsToProduce = numberOfElementsToProduce;
this.delayOfEachProducedMessageMilliseconds = delayOfEachProducedMessageMilliseconds;
}
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object = " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
@@ -0,0 +1,29 @@
package com.baeldung.concurrent.diningphilosophers;
public class DiningPhilosophers {
public static void main(String[] args) throws Exception {
Philosopher[] philosophers = new Philosopher[5];
Object[] forks = new Object[philosophers.length];
for (int i = 0; i < forks.length; i++) {
forks[i] = new Object();
}
for (int i = 0; i < philosophers.length; i++) {
Object leftFork = forks[i];
Object rightFork = forks[(i + 1) % forks.length];
if (i == philosophers.length - 1) {
philosophers[i] = new Philosopher(rightFork, leftFork); // The last philosopher picks up the right fork first
} else {
philosophers[i] = new Philosopher(leftFork, rightFork);
}
Thread t = new Thread(philosophers[i], "Philosopher " + (i + 1));
t.start();
}
}
}
@@ -0,0 +1,36 @@
package com.baeldung.concurrent.diningphilosophers;
public class Philosopher implements Runnable {
private final Object leftFork;
private final Object rightFork;
Philosopher(Object left, Object right) {
this.leftFork = left;
this.rightFork = right;
}
private void doAction(String action) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " " + action);
Thread.sleep(((int) (Math.random() * 100)));
}
@Override
public void run() {
try {
while (true) {
doAction(System.nanoTime() + ": Thinking"); // thinking
synchronized (leftFork) {
doAction(System.nanoTime() + ": Picked up left fork");
synchronized (rightFork) {
doAction(System.nanoTime() + ": Picked up right fork - eating"); // eating
doAction(System.nanoTime() + ": Put down right fork");
}
doAction(System.nanoTime() + ": Put down left fork. Returning to thinking");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@@ -0,0 +1,14 @@
package com.baeldung.concurrent.executor;
import java.util.concurrent.Executor;
public class ExecutorDemo {
public void execute() {
Executor executor = new Invoker();
executor.execute(()->{
// task to be performed
});
}
}
@@ -0,0 +1,12 @@
package com.baeldung.concurrent.executor;
import java.util.concurrent.Executor;
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
@@ -0,0 +1,27 @@
package com.baeldung.concurrent.executorservice;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void execute() {
executor.submit(() -> {
new Task();
});
executor.shutdown();
executor.shutdownNow();
try {
executor.awaitTermination(20l, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,10 @@
package com.baeldung.concurrent.executorservice;
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
@@ -0,0 +1,26 @@
package com.baeldung.concurrent.future;
import java.util.concurrent.RecursiveTask;
public class FactorialSquareCalculator extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
final private Integer n;
FactorialSquareCalculator(Integer n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FactorialSquareCalculator calculator = new FactorialSquareCalculator(n - 1);
calculator.fork();
return n * n + calculator.join();
}
}
@@ -0,0 +1,44 @@
package com.baeldung.concurrent.future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureDemo {
public String invoke() {
String str = null;
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// Task
Thread.sleep(10000l);
return "Hellow world";
});
future.cancel(false);
try {
future.get(20, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e1) {
e1.printStackTrace();
}
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return str;
}
}
@@ -0,0 +1,20 @@
package com.baeldung.concurrent.future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
class SquareCalculator {
private final ExecutorService executor;
SquareCalculator(ExecutorService executor) {
this.executor = executor;
}
Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}
@@ -0,0 +1,83 @@
package com.baeldung.concurrent.locks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Stack;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static java.lang.Thread.sleep;
public class ReentrantLockWithCondition {
private static Logger LOG = LoggerFactory.getLogger(ReentrantLockWithCondition.class);
private Stack<String> stack = new Stack<>();
private static final int CAPACITY = 5;
private ReentrantLock lock = new ReentrantLock();
private Condition stackEmptyCondition = lock.newCondition();
private Condition stackFullCondition = lock.newCondition();
private void pushToStack(String item) throws InterruptedException {
try {
lock.lock();
if (stack.size() == CAPACITY) {
LOG.info(Thread.currentThread().getName() + " wait on stack full");
stackFullCondition.await();
}
LOG.info("Pushing the item " + item);
stack.push(item);
stackEmptyCondition.signalAll();
} finally {
lock.unlock();
}
}
private String popFromStack() throws InterruptedException {
try {
lock.lock();
if (stack.size() == 0) {
LOG.info(Thread.currentThread().getName() + " wait on stack empty");
stackEmptyCondition.await();
}
return stack.pop();
} finally {
stackFullCondition.signalAll();
lock.unlock();
}
}
public static void main(String[] args) {
final int threadCount = 2;
ReentrantLockWithCondition object = new ReentrantLockWithCondition();
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
service.execute(() -> {
for (int i = 0; i < 10; i++) {
try {
object.pushToStack("Item " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
service.execute(() -> {
for (int i = 0; i < 10; i++) {
try {
LOG.info("Item popped " + object.popFromStack());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
service.shutdown();
}
}
@@ -0,0 +1,88 @@
package com.baeldung.concurrent.locks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static java.lang.Thread.sleep;
public class SharedObjectWithLock {
private static final Logger LOG = LoggerFactory.getLogger(SharedObjectWithLock.class);
private ReentrantLock lock = new ReentrantLock(true);
private int counter = 0;
void perform() {
lock.lock();
LOG.info("Thread - " + Thread.currentThread().getName() + " acquired the lock");
try {
LOG.info("Thread - " + Thread.currentThread().getName() + " processing");
counter++;
} catch (Exception exception) {
LOG.error(" Interrupted Exception ", exception);
} finally {
lock.unlock();
LOG.info("Thread - " + Thread.currentThread().getName() + " released the lock");
}
}
private void performTryLock() {
LOG.info("Thread - " + Thread.currentThread().getName() + " attempting to acquire the lock");
try {
boolean isLockAcquired = lock.tryLock(2, TimeUnit.SECONDS);
if (isLockAcquired) {
try {
LOG.info("Thread - " + Thread.currentThread().getName() + " acquired the lock");
LOG.info("Thread - " + Thread.currentThread().getName() + " processing");
sleep(1000);
} finally {
lock.unlock();
LOG.info("Thread - " + Thread.currentThread().getName() + " released the lock");
}
}
} catch (InterruptedException exception) {
LOG.error(" Interrupted Exception ", exception);
}
LOG.info("Thread - " + Thread.currentThread().getName() + " could not acquire the lock");
}
public ReentrantLock getLock() {
return lock;
}
boolean isLocked() {
return lock.isLocked();
}
boolean hasQueuedThreads() {
return lock.hasQueuedThreads();
}
int getCounter() {
return counter;
}
public static void main(String[] args) {
final int threadCount = 2;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
final SharedObjectWithLock object = new SharedObjectWithLock();
service.execute(object::perform);
service.execute(object::performTryLock);
service.shutdown();
}
}
@@ -0,0 +1,104 @@
package com.baeldung.concurrent.locks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.StampedLock;
import static java.lang.Thread.sleep;
public class StampedLockDemo {
private Map<String, String> map = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(StampedLockDemo.class);
private final StampedLock lock = new StampedLock();
public void put(String key, String value) throws InterruptedException {
long stamp = lock.writeLock();
try {
logger.info(Thread.currentThread().getName() + " acquired the write lock with stamp " + stamp);
map.put(key, value);
} finally {
lock.unlockWrite(stamp);
logger.info(Thread.currentThread().getName() + " unlocked the write lock with stamp " + stamp);
}
}
public String get(String key) throws InterruptedException {
long stamp = lock.readLock();
logger.info(Thread.currentThread().getName() + " acquired the read lock with stamp " + stamp);
try {
sleep(5000);
return map.get(key);
} finally {
lock.unlockRead(stamp);
logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp);
}
}
private String readWithOptimisticLock(String key) throws InterruptedException {
long stamp = lock.tryOptimisticRead();
String value = map.get(key);
if (!lock.validate(stamp)) {
stamp = lock.readLock();
try {
sleep(5000);
return map.get(key);
} finally {
lock.unlock(stamp);
logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp);
}
}
return value;
}
public static void main(String[] args) {
final int threadCount = 4;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
StampedLockDemo object = new StampedLockDemo();
Runnable writeTask = () -> {
try {
object.put("key1", "value1");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable readTask = () -> {
try {
object.get("key1");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable readOptimisticTask = () -> {
try {
object.readWithOptimisticLock("key1");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
service.submit(writeTask);
service.submit(writeTask);
service.submit(readTask);
service.submit(readOptimisticTask);
service.shutdown();
}
}
@@ -0,0 +1,120 @@
package com.baeldung.concurrent.locks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.lang.Thread.sleep;
public class SynchronizedHashMapWithRWLock {
private static Map<String, String> syncHashMap = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(SynchronizedHashMapWithRWLock.class);
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public void put(String key, String value) throws InterruptedException {
try {
writeLock.lock();
logger.info(Thread.currentThread().getName() + " writing");
syncHashMap.put(key, value);
sleep(1000);
} finally {
writeLock.unlock();
}
}
public String get(String key) {
try {
readLock.lock();
logger.info(Thread.currentThread().getName() + " reading");
return syncHashMap.get(key);
} finally {
readLock.unlock();
}
}
public String remove(String key) {
try {
writeLock.lock();
return syncHashMap.remove(key);
} finally {
writeLock.unlock();
}
}
public boolean containsKey(String key) {
try {
readLock.lock();
return syncHashMap.containsKey(key);
} finally {
readLock.unlock();
}
}
boolean isReadLockAvailable() {
return readLock.tryLock();
}
public static void main(String[] args) throws InterruptedException {
final int threadCount = 3;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
service.execute(new Thread(new Writer(object), "Writer"));
service.execute(new Thread(new Reader(object), "Reader1"));
service.execute(new Thread(new Reader(object), "Reader2"));
service.shutdown();
}
private static class Reader implements Runnable {
SynchronizedHashMapWithRWLock object;
Reader(SynchronizedHashMapWithRWLock object) {
this.object = object;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
object.get("key" + i);
}
}
}
private static class Writer implements Runnable {
SynchronizedHashMapWithRWLock object;
public Writer(SynchronizedHashMapWithRWLock object) {
this.object = object;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
object.put("key" + i, "value" + i);
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
@@ -0,0 +1,27 @@
package com.baeldung.concurrent.phaser;
import java.util.concurrent.Phaser;
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;
LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
ph.register();
}
@Override
public void run() {
System.out.println("This is phase " + ph.getPhase());
System.out.println("Thread " + threadName + " before long running action");
ph.arriveAndAwaitAdvance();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
ph.arriveAndDeregister();
}
}
@@ -0,0 +1,22 @@
package com.baeldung.concurrent.semaphore;
import java.util.concurrent.Semaphore;
public class SemaPhoreDemo {
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
System.out.println("Available permit : " + semaphore.availablePermits());
System.out.println("Number of threads waiting to acquire: " + semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
semaphore.acquire();
// perform some critical operations
semaphore.release();
}
}
}
@@ -0,0 +1,31 @@
package com.baeldung.concurrent.semaphores;
import java.util.concurrent.Semaphore;
class CounterUsingMutex {
private final Semaphore mutex;
private int count;
CounterUsingMutex() {
mutex = new Semaphore(1);
count = 0;
}
void increase() throws InterruptedException {
mutex.acquire();
this.count = this.count + 1;
Thread.sleep(1000);
mutex.release();
}
int getCount() {
return this.count;
}
boolean hasQueuedThreads() {
return mutex.hasQueuedThreads();
}
}
@@ -0,0 +1,23 @@
package com.baeldung.concurrent.semaphores;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.TimedSemaphore;
class DelayQueueUsingTimedSemaphore {
private final TimedSemaphore semaphore;
DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}
boolean tryAdd() {
return semaphore.tryAcquire();
}
int availableSlots() {
return semaphore.getAvailablePermits();
}
}
@@ -0,0 +1,25 @@
package com.baeldung.concurrent.semaphores;
import java.util.concurrent.Semaphore;
class LoginQueueUsingSemaphore {
private final Semaphore semaphore;
LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}
boolean tryLogin() {
return semaphore.tryAcquire();
}
void logout() {
semaphore.release();
}
int availableSlots() {
return semaphore.availablePermits();
}
}
@@ -0,0 +1,21 @@
package com.baeldung.concurrent.skiplist;
import java.time.ZonedDateTime;
class Event {
private final ZonedDateTime eventTime;
private final String content;
Event(ZonedDateTime eventTime, String content) {
this.eventTime = eventTime;
this.content = content;
}
ZonedDateTime getEventTime() {
return eventTime;
}
String getContent() {
return content;
}
}
@@ -0,0 +1,29 @@
package com.baeldung.concurrent.skiplist;
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
class EventWindowSort {
private final ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(Comparator.comparingLong(value -> value.toInstant().toEpochMilli()));
void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}
ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime
.now()
.minusMinutes(1));
}
ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime
.now()
.minusMinutes(1));
}
}
@@ -0,0 +1,27 @@
package com.baeldung.concurrent.sleepwait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* Example of waking up a waiting thread
*/
public class ThreadA {
private static final Logger LOG = LoggerFactory.getLogger(ThreadA.class);
private static final ThreadB b = new ThreadB();
public static void main(String... args) throws InterruptedException {
b.start();
synchronized (b) {
while (b.sum == 0) {
LOG.debug("Waiting for ThreadB to complete...");
b.wait();
}
LOG.debug("ThreadB has completed. Sum from that thread is: " + b.sum);
}
}
}
@@ -0,0 +1,20 @@
package com.baeldung.concurrent.sleepwait;
/***
* Example of waking up a waiting thread
*/
class ThreadB extends Thread {
int sum;
@Override
public void run() {
synchronized (this) {
int i = 0;
while (i < 100000) {
sum += i;
i++;
}
notify();
}
}
}
@@ -0,0 +1,29 @@
package com.baeldung.concurrent.sleepwait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
* Example of wait() and sleep() methods
*/
public class WaitSleepExample {
private static final Logger LOG = LoggerFactory.getLogger(WaitSleepExample.class);
private static final Object LOCK = new Object();
public static void main(String... args) throws InterruptedException {
sleepWaitInSynchronizedBlocks();
}
private static void sleepWaitInSynchronizedBlocks() throws InterruptedException {
Thread.sleep(1000); // called on the thread
LOG.debug("Thread '" + Thread.currentThread().getName() + "' is woken after sleeping for 1 second");
synchronized (LOCK) {
LOCK.wait(1000); // called on the object, synchronization required
LOG.debug("Object '" + LOCK + "' is woken after waiting for 1 second");
}
}
}
@@ -0,0 +1,35 @@
package com.baeldung.concurrent.synchronize;
public class BaeldungSynchronizedBlocks {
private int count = 0;
private static int staticCount = 0;
void performSynchronisedTask() {
synchronized (this) {
setCount(getCount() + 1);
}
}
static void performStaticSyncTask() {
synchronized (BaeldungSynchronizedBlocks.class) {
setStaticCount(getStaticCount() + 1);
}
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
static int getStaticCount() {
return staticCount;
}
private static void setStaticCount(int staticCount) {
BaeldungSynchronizedBlocks.staticCount = staticCount;
}
}
@@ -0,0 +1,37 @@
package com.baeldung.concurrent.synchronize;
public class BaeldungSynchronizedMethods {
private int sum = 0;
private int syncSum = 0;
static int staticSum = 0;
void calculate() {
setSum(getSum() + 1);
}
synchronized void synchronisedCalculate() {
setSyncSum(getSyncSum() + 1);
}
static synchronized void syncStaticCalculate() {
staticSum = staticSum + 1;
}
public int getSum() {
return sum;
}
public void setSum(int sum) {
this.sum = sum;
}
int getSyncSum() {
return syncSum;
}
private void setSyncSum(int syncSum) {
this.syncSum = syncSum;
}
}
@@ -0,0 +1,23 @@
package com.baeldung.concurrent.threadfactory;
import java.util.concurrent.ThreadFactory;
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
System.out.println("created new thread with id : " + threadId + " and name : " + t.getName());
threadId++;
return t;
}
}
@@ -0,0 +1,13 @@
package com.baeldung.concurrent.threadfactory;
public class Demo {
public void execute() {
BaeldungThreadFactory factory = new BaeldungThreadFactory("BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
}
}
@@ -0,0 +1,10 @@
package com.baeldung.concurrent.threadfactory;
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
@@ -0,0 +1,13 @@
package com.baeldung.concurrent.volatilekeyword;
public class SharedObject {
private volatile int count=0;
void increamentCount(){
count++;
}
public int getCount(){
return count;
}
}
@@ -0,0 +1,17 @@
package com.baeldung.threadlocal;
public class Context {
private final String userName;
public Context(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "Context{" +
"userNameSecret='" + userName + '\'' +
'}';
}
}
@@ -0,0 +1,21 @@
package com.baeldung.threadlocal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SharedMapWithUserContext implements Runnable {
public final static Map<Integer, Context> userContextPerUserId = new ConcurrentHashMap<>();
private final Integer userId;
private UserRepository userRepository = new UserRepository();
public SharedMapWithUserContext(Integer userId) {
this.userId = userId;
}
@Override
public void run() {
String userName = userRepository.getUserNameForUserId(userId);
userContextPerUserId.put(userId, new Context(userName));
}
}
@@ -0,0 +1,24 @@
package com.baeldung.threadlocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadLocalWithUserContext implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ThreadLocalWithUserContext.class);
private static final ThreadLocal<Context> userContext = new ThreadLocal<>();
private final Integer userId;
private UserRepository userRepository = new UserRepository();
public ThreadLocalWithUserContext(Integer userId) {
this.userId = userId;
}
@Override
public void run() {
String userName = userRepository.getUserNameForUserId(userId);
userContext.set(new Context(userName));
LOG.debug("thread context for given userId: " + userId + " is: " + userContext.get());
}
}
@@ -0,0 +1,10 @@
package com.baeldung.threadlocal;
import java.util.UUID;
public class UserRepository {
public String getUserNameForUserId(Integer userId) {
return UUID.randomUUID().toString();
}
}
@@ -0,0 +1,20 @@
package com.baeldung.threadpool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
public class CountingTask extends RecursiveTask<Integer> {
private final TreeNode node;
public CountingTask(TreeNode node) {
this.node = node;
}
@Override
protected Integer compute() {
return node.value + node.children.stream().map(childNode -> new CountingTask(childNode).fork()).collect(Collectors.summingInt(ForkJoinTask::join));
}
}
@@ -0,0 +1,29 @@
package com.baeldung.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.MoreExecutors;
/**
* This class demonstrates the usage of Guava's exiting executor services that keep the VM from hanging.
* Without the exiting executor service, the task would hang indefinitely.
* This behaviour cannot be demonstrated in JUnit tests, as JUnit kills the VM after the tests.
*/
public class ExitingExecutorServiceExample {
public static void main(String... args) {
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
final ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS);
executorService.submit((Runnable) () -> {
while (true) {
}
});
}
}
@@ -0,0 +1,18 @@
package com.baeldung.threadpool;
import java.util.Set;
import com.google.common.collect.Sets;
public class TreeNode {
int value;
Set<TreeNode> children;
public TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}
}
@@ -0,0 +1,41 @@
package com.baeldung.transferqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Consumer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final TransferQueue<String> transferQueue;
private final String name;
private final int numberOfMessagesToConsume;
public final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
public Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
LOG.debug("Consumer: " + name + " is waiting to take element...");
String element = transferQueue.take();
longProcessing(element);
LOG.debug("Consumer: " + name + " received element: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element) throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(500);
}
}
@@ -0,0 +1,41 @@
package com.baeldung.transferqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
private final TransferQueue<String> transferQueue;
private final String name;
private final Integer numberOfMessagesToProduce;
public final AtomicInteger numberOfProducedMessages = new AtomicInteger();
public Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
}
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
LOG.debug("Producer: " + name + " is waiting to transfer...");
boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if (added) {
numberOfProducedMessages.incrementAndGet();
LOG.debug("Producer: " + name + " transferred element: A" + i);
} else {
LOG.debug("can not add an element due to the timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@@ -0,0 +1,9 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
@@ -0,0 +1,453 @@
#
# OWASP Enterprise Security API (ESAPI) Properties file -- PRODUCTION Version
#
# This file is part of the Open Web Application Security Project (OWASP)
# Enterprise Security API (ESAPI) project. For details, please see
# http://www.owasp.org/index.php/ESAPI.
#
# Copyright (c) 2008,2009 - The OWASP Foundation
#
# DISCUSS: This may cause a major backwards compatibility issue, etc. but
# from a name space perspective, we probably should have prefaced
# all the property names with ESAPI or at least OWASP. Otherwise
# there could be problems is someone loads this properties file into
# the System properties. We could also put this file into the
# esapi.jar file (perhaps as a ResourceBundle) and then allow an external
# ESAPI properties be defined that would overwrite these defaults.
# That keeps the application's properties relatively simple as usually
# they will only want to override a few properties. If looks like we
# already support multiple override levels of this in the
# DefaultSecurityConfiguration class, but I'm suggesting placing the
# defaults in the esapi.jar itself. That way, if the jar is signed,
# we could detect if those properties had been tampered with. (The
# code to check the jar signatures is pretty simple... maybe 70-90 LOC,
# but off course there is an execution penalty (similar to the way
# that the separate sunjce.jar used to be when a class from it was
# first loaded). Thoughts?
###############################################################################
#
# WARNING: Operating system protection should be used to lock down the .esapi
# resources directory and all the files inside and all the directories all the
# way up to the root directory of the file system. Note that if you are using
# file-based implementations, that some files may need to be read-write as they
# get updated dynamically.
#
# Before using, be sure to update the MasterKey and MasterSalt as described below.
# N.B.: If you had stored data that you have previously encrypted with ESAPI 1.4,
# you *must* FIRST decrypt it using ESAPI 1.4 and then (if so desired)
# re-encrypt it with ESAPI 2.0. If you fail to do this, you will NOT be
# able to decrypt your data with ESAPI 2.0.
#
# YOU HAVE BEEN WARNED!!! More details are in the ESAPI 2.0 Release Notes.
#
#===========================================================================
# ESAPI Configuration
#
# If true, then print all the ESAPI properties set here when they are loaded.
# If false, they are not printed. Useful to reduce output when running JUnit tests.
# If you need to troubleshoot a properties related problem, turning this on may help.
# This is 'false' in the src/test/resources/.esapi version. It is 'true' by
# default for reasons of backward compatibility with earlier ESAPI versions.
ESAPI.printProperties=true
# ESAPI is designed to be easily extensible. You can use the reference implementation
# or implement your own providers to take advantage of your enterprise's security
# infrastructure. The functions in ESAPI are referenced using the ESAPI locator, like:
#
# String ciphertext =
# ESAPI.encryptor().encrypt("Secret message"); // Deprecated in 2.0
# CipherText cipherText =
# ESAPI.encryptor().encrypt(new PlainText("Secret message")); // Preferred
#
# Below you can specify the classname for the provider that you wish to use in your
# application. The only requirement is that it implement the appropriate ESAPI interface.
# This allows you to switch security implementations in the future without rewriting the
# entire application.
#
# ExperimentalAccessController requires ESAPI-AccessControlPolicy.xml in .esapi directory
ESAPI.AccessControl=org.owasp.esapi.reference.DefaultAccessController
# FileBasedAuthenticator requires users.txt file in .esapi directory
ESAPI.Authenticator=org.owasp.esapi.reference.FileBasedAuthenticator
ESAPI.Encoder=org.owasp.esapi.reference.DefaultEncoder
ESAPI.Encryptor=org.owasp.esapi.reference.crypto.JavaEncryptor
ESAPI.Executor=org.owasp.esapi.reference.DefaultExecutor
ESAPI.HTTPUtilities=org.owasp.esapi.reference.DefaultHTTPUtilities
ESAPI.IntrusionDetector=org.owasp.esapi.reference.DefaultIntrusionDetector
# Log4JFactory Requires log4j.xml or log4j.properties in classpath - http://www.laliluna.de/log4j-tutorial.html
ESAPI.Logger=org.owasp.esapi.reference.Log4JLogFactory
#ESAPI.Logger=org.owasp.esapi.reference.JavaLogFactory
ESAPI.Randomizer=org.owasp.esapi.reference.DefaultRandomizer
ESAPI.Validator=org.owasp.esapi.reference.DefaultValidator
#===========================================================================
# ESAPI Authenticator
#
Authenticator.AllowedLoginAttempts=3
Authenticator.MaxOldPasswordHashes=13
Authenticator.UsernameParameterName=username
Authenticator.PasswordParameterName=password
# RememberTokenDuration (in days)
Authenticator.RememberTokenDuration=14
# Session Timeouts (in minutes)
Authenticator.IdleTimeoutDuration=20
Authenticator.AbsoluteTimeoutDuration=120
#===========================================================================
# ESAPI Encoder
#
# ESAPI canonicalizes input before validation to prevent bypassing filters with encoded attacks.
# Failure to canonicalize input is a very common mistake when implementing validation schemes.
# Canonicalization is automatic when using the ESAPI Validator, but you can also use the
# following code to canonicalize data.
#
# ESAPI.Encoder().canonicalize( "%22hello world&#x22;" );
#
# Multiple encoding is when a single encoding format is applied multiple times. Allowing
# multiple encoding is strongly discouraged.
Encoder.AllowMultipleEncoding=false
# Mixed encoding is when multiple different encoding formats are applied, or when
# multiple formats are nested. Allowing multiple encoding is strongly discouraged.
Encoder.AllowMixedEncoding=false
# The default list of codecs to apply when canonicalizing untrusted data. The list should include the codecs
# for all downstream interpreters or decoders. For example, if the data is likely to end up in a URL, HTML, or
# inside JavaScript, then the list of codecs below is appropriate. The order of the list is not terribly important.
Encoder.DefaultCodecList=HTMLEntityCodec,PercentCodec,JavaScriptCodec
#===========================================================================
# ESAPI Encryption
#
# The ESAPI Encryptor provides basic cryptographic functions with a simplified API.
# To get started, generate a new key using java -classpath esapi.jar org.owasp.esapi.reference.crypto.JavaEncryptor
# There is not currently any support for key rotation, so be careful when changing your key and salt as it
# will invalidate all signed, encrypted, and hashed data.
#
# WARNING: Not all combinations of algorithms and key lengths are supported.
# If you choose to use a key length greater than 128, you MUST download the
# unlimited strength policy files and install in the lib directory of your JRE/JDK.
# See http://java.sun.com/javase/downloads/index.jsp for more information.
#
# Backward compatibility with ESAPI Java 1.4 is supported by the two deprecated API
# methods, Encryptor.encrypt(String) and Encryptor.decrypt(String). However, whenever
# possible, these methods should be avoided as they use ECB cipher mode, which in almost
# all circumstances a poor choice because of it's weakness. CBC cipher mode is the default
# for the new Encryptor encrypt / decrypt methods for ESAPI Java 2.0. In general, you
# should only use this compatibility setting if you have persistent data encrypted with
# version 1.4 and even then, you should ONLY set this compatibility mode UNTIL
# you have decrypted all of your old encrypted data and then re-encrypted it with
# ESAPI 2.0 using CBC mode. If you have some reason to mix the deprecated 1.4 mode
# with the new 2.0 methods, make sure that you use the same cipher algorithm for both
# (256-bit AES was the default for 1.4; 128-bit is the default for 2.0; see below for
# more details.) Otherwise, you will have to use the new 2.0 encrypt / decrypt methods
# where you can specify a SecretKey. (Note that if you are using the 256-bit AES,
# that requires downloading the special jurisdiction policy files mentioned above.)
#
# ***** IMPORTANT: Do NOT forget to replace these with your own values! *****
# To calculate these values, you can run:
# java -classpath esapi.jar org.owasp.esapi.reference.crypto.JavaEncryptor
#
Encryptor.MasterKey=tzfztf56ftv
Encryptor.MasterSalt=123456ztrewq
# Provides the default JCE provider that ESAPI will "prefer" for its symmetric
# encryption and hashing. (That is it will look to this provider first, but it
# will defer to other providers if the requested algorithm is not implemented
# by this provider.) If left unset, ESAPI will just use your Java VM's current
# preferred JCE provider, which is generally set in the file
# "$JAVA_HOME/jre/lib/security/java.security".
#
# The main intent of this is to allow ESAPI symmetric encryption to be
# used with a FIPS 140-2 compliant crypto-module. For details, see the section
# "Using ESAPI Symmetric Encryption with FIPS 140-2 Cryptographic Modules" in
# the ESAPI 2.0 Symmetric Encryption User Guide, at:
# http://owasp-esapi-java.googlecode.com/svn/trunk/documentation/esapi4java-core-2.0-symmetric-crypto-user-guide.html
# However, this property also allows you to easily use an alternate JCE provider
# such as "Bouncy Castle" without having to make changes to "java.security".
# See Javadoc for SecurityProviderLoader for further details. If you wish to use
# a provider that is not known to SecurityProviderLoader, you may specify the
# fully-qualified class name of the JCE provider class that implements
# java.security.Provider. If the name contains a '.', this is interpreted as
# a fully-qualified class name that implements java.security.Provider.
#
# NOTE: Setting this property has the side-effect of changing it in your application
# as well, so if you are using JCE in your application directly rather than
# through ESAPI (you wouldn't do that, would you? ;-), it will change the
# preferred JCE provider there as well.
#
# Default: Keeps the JCE provider set to whatever JVM sets it to.
Encryptor.PreferredJCEProvider=
# AES is the most widely used and strongest encryption algorithm. This
# should agree with your Encryptor.CipherTransformation property.
# By default, ESAPI Java 1.4 uses "PBEWithMD5AndDES" and which is
# very weak. It is essentially a password-based encryption key, hashed
# with MD5 around 1K times and then encrypted with the weak DES algorithm
# (56-bits) using ECB mode and an unspecified padding (it is
# JCE provider specific, but most likely "NoPadding"). However, 2.0 uses
# "AES/CBC/PKCSPadding". If you want to change these, change them here.
# Warning: This property does not control the default reference implementation for
# ESAPI 2.0 using JavaEncryptor. Also, this property will be dropped
# in the future.
# @deprecated
Encryptor.EncryptionAlgorithm=AES
# For ESAPI Java 2.0 - New encrypt / decrypt methods use this.
Encryptor.CipherTransformation=AES/CBC/PKCS5Padding
# Applies to ESAPI 2.0 and later only!
# Comma-separated list of cipher modes that provide *BOTH*
# confidentiality *AND* message authenticity. (NIST refers to such cipher
# modes as "combined modes" so that's what we shall call them.) If any of these
# cipher modes are used then no MAC is calculated and stored
# in the CipherText upon encryption. Likewise, if one of these
# cipher modes is used with decryption, no attempt will be made
# to validate the MAC contained in the CipherText object regardless
# of whether it contains one or not. Since the expectation is that
# these cipher modes support support message authenticity already,
# injecting a MAC in the CipherText object would be at best redundant.
#
# Note that as of JDK 1.5, the SunJCE provider does not support *any*
# of these cipher modes. Of these listed, only GCM and CCM are currently
# NIST approved. YMMV for other JCE providers. E.g., Bouncy Castle supports
# GCM and CCM with "NoPadding" mode, but not with "PKCS5Padding" or other
# padding modes.
Encryptor.cipher_modes.combined_modes=GCM,CCM,IAPM,EAX,OCB,CWC
# Applies to ESAPI 2.0 and later only!
# Additional cipher modes allowed for ESAPI 2.0 encryption. These
# cipher modes are in _addition_ to those specified by the property
# 'Encryptor.cipher_modes.combined_modes'.
# Note: We will add support for streaming modes like CFB & OFB once
# we add support for 'specified' to the property 'Encryptor.ChooseIVMethod'
# (probably in ESAPI 2.1).
# DISCUSS: Better name?
Encryptor.cipher_modes.additional_allowed=CBC
# 128-bit is almost always sufficient and appears to be more resistant to
# related key attacks than is 256-bit AES. Use '_' to use default key size
# for cipher algorithms (where it makes sense because the algorithm supports
# a variable key size). Key length must agree to what's provided as the
# cipher transformation, otherwise this will be ignored after logging a
# warning.
#
# NOTE: This is what applies BOTH ESAPI 1.4 and 2.0. See warning above about mixing!
Encryptor.EncryptionKeyLength=128
# Because 2.0 uses CBC mode by default, it requires an initialization vector (IV).
# (All cipher modes except ECB require an IV.) There are two choices: we can either
# use a fixed IV known to both parties or allow ESAPI to choose a random IV. While
# the IV does not need to be hidden from adversaries, it is important that the
# adversary not be allowed to choose it. Also, random IVs are generally much more
# secure than fixed IVs. (In fact, it is essential that feed-back cipher modes
# such as CFB and OFB use a different IV for each encryption with a given key so
# in such cases, random IVs are much preferred. By default, ESAPI 2.0 uses random
# IVs. If you wish to use 'fixed' IVs, set 'Encryptor.ChooseIVMethod=fixed' and
# uncomment the Encryptor.fixedIV.
#
# Valid values: random|fixed|specified 'specified' not yet implemented; planned for 2.1
Encryptor.ChooseIVMethod=random
# If you choose to use a fixed IV, then you must place a fixed IV here that
# is known to all others who are sharing your secret key. The format should
# be a hex string that is the same length as the cipher block size for the
# cipher algorithm that you are using. The following is an *example* for AES
# from an AES test vector for AES-128/CBC as described in:
# NIST Special Publication 800-38A (2001 Edition)
# "Recommendation for Block Cipher Modes of Operation".
# (Note that the block size for AES is 16 bytes == 128 bits.)
#
Encryptor.fixedIV=0x000102030405060708090a0b0c0d0e0f
# Whether or not CipherText should use a message authentication code (MAC) with it.
# This prevents an adversary from altering the IV as well as allowing a more
# fool-proof way of determining the decryption failed because of an incorrect
# key being supplied. This refers to the "separate" MAC calculated and stored
# in CipherText, not part of any MAC that is calculated as a result of a
# "combined mode" cipher mode.
#
# If you are using ESAPI with a FIPS 140-2 cryptographic module, you *must* also
# set this property to false.
Encryptor.CipherText.useMAC=true
# Whether or not the PlainText object may be overwritten and then marked
# eligible for garbage collection. If not set, this is still treated as 'true'.
Encryptor.PlainText.overwrite=true
# Do not use DES except in a legacy situations. 56-bit is way too small key size.
#Encryptor.EncryptionKeyLength=56
#Encryptor.EncryptionAlgorithm=DES
# TripleDES is considered strong enough for most purposes.
# Note: There is also a 112-bit version of DESede. Using the 168-bit version
# requires downloading the special jurisdiction policy from Sun.
#Encryptor.EncryptionKeyLength=168
#Encryptor.EncryptionAlgorithm=DESede
Encryptor.HashAlgorithm=SHA-512
Encryptor.HashIterations=1024
Encryptor.DigitalSignatureAlgorithm=SHA1withDSA
Encryptor.DigitalSignatureKeyLength=1024
Encryptor.RandomAlgorithm=SHA1PRNG
Encryptor.CharacterEncoding=UTF-8
# This is the Pseudo Random Function (PRF) that ESAPI's Key Derivation Function
# (KDF) normally uses. Note this is *only* the PRF used for ESAPI's KDF and
# *not* what is used for ESAPI's MAC. (Currently, HmacSHA1 is always used for
# the MAC, mostly to keep the overall size at a minimum.)
#
# Currently supported choices for JDK 1.5 and 1.6 are:
# HmacSHA1 (160 bits), HmacSHA256 (256 bits), HmacSHA384 (384 bits), and
# HmacSHA512 (512 bits).
# Note that HmacMD5 is *not* supported for the PRF used by the KDF even though
# the JDKs support it. See the ESAPI 2.0 Symmetric Encryption User Guide
# further details.
Encryptor.KDF.PRF=HmacSHA256
#===========================================================================
# ESAPI HttpUtilties
#
# The HttpUtilities provide basic protections to HTTP requests and responses. Primarily these methods
# protect against malicious data from attackers, such as unprintable characters, escaped characters,
# and other simple attacks. The HttpUtilities also provides utility methods for dealing with cookies,
# headers, and CSRF tokens.
#
# Default file upload location (remember to escape backslashes with \\)
HttpUtilities.UploadDir=C:\\ESAPI\\testUpload
HttpUtilities.UploadTempDir=C:\\temp
# Force flags on cookies, if you use HttpUtilities to set cookies
HttpUtilities.ForceHttpOnlySession=false
HttpUtilities.ForceSecureSession=false
HttpUtilities.ForceHttpOnlyCookies=true
HttpUtilities.ForceSecureCookies=true
# Maximum size of HTTP headers
HttpUtilities.MaxHeaderSize=4096
# File upload configuration
HttpUtilities.ApprovedUploadExtensions=.zip,.pdf,.doc,.docx,.ppt,.pptx,.tar,.gz,.tgz,.rar,.war,.jar,.ear,.xls,.rtf,.properties,.java,.class,.txt,.xml,.jsp,.jsf,.exe,.dll
HttpUtilities.MaxUploadFileBytes=500000000
# Using UTF-8 throughout your stack is highly recommended. That includes your database driver,
# container, and any other technologies you may be using. Failure to do this may expose you
# to Unicode transcoding injection attacks. Use of UTF-8 does not hinder internationalization.
HttpUtilities.ResponseContentType=text/html; charset=UTF-8
# This is the name of the cookie used to represent the HTTP session
# Typically this will be the default "JSESSIONID"
HttpUtilities.HttpSessionIdName=JSESSIONID
#===========================================================================
# ESAPI Executor
# CHECKME - Not sure what this is used for, but surely it should be made OS independent.
Executor.WorkingDirectory=C:\\Windows\\Temp
Executor.ApprovedExecutables=C:\\Windows\\System32\\cmd.exe,C:\\Windows\\System32\\runas.exe
#===========================================================================
# ESAPI Logging
# Set the application name if these logs are combined with other applications
Logger.ApplicationName=ExampleApplication
# If you use an HTML log viewer that does not properly HTML escape log data, you can set LogEncodingRequired to true
Logger.LogEncodingRequired=false
# Determines whether ESAPI should log the application name. This might be clutter in some single-server/single-app environments.
Logger.LogApplicationName=true
# Determines whether ESAPI should log the server IP and port. This might be clutter in some single-server environments.
Logger.LogServerIP=true
# LogFileName, the name of the logging file. Provide a full directory path (e.g., C:\\ESAPI\\ESAPI_logging_file) if you
# want to place it in a specific directory.
Logger.LogFileName=ESAPI_logging_file
# MaxLogFileSize, the max size (in bytes) of a single log file before it cuts over to a new one (default is 10,000,000)
Logger.MaxLogFileSize=10000000
#===========================================================================
# ESAPI Intrusion Detection
#
# Each event has a base to which .count, .interval, and .action are added
# The IntrusionException will fire if we receive "count" events within "interval" seconds
# The IntrusionDetector is configurable to take the following actions: log, logout, and disable
# (multiple actions separated by commas are allowed e.g. event.test.actions=log,disable
#
# Custom Events
# Names must start with "event." as the base
# Use IntrusionDetector.addEvent( "test" ) in your code to trigger "event.test" here
# You can also disable intrusion detection completely by changing
# the following parameter to true
#
IntrusionDetector.Disable=false
#
IntrusionDetector.event.test.count=2
IntrusionDetector.event.test.interval=10
IntrusionDetector.event.test.actions=disable,log
# Exception Events
# All EnterpriseSecurityExceptions are registered automatically
# Call IntrusionDetector.getInstance().addException(e) for Exceptions that do not extend EnterpriseSecurityException
# Use the fully qualified classname of the exception as the base
# any intrusion is an attack
IntrusionDetector.org.owasp.esapi.errors.IntrusionException.count=1
IntrusionDetector.org.owasp.esapi.errors.IntrusionException.interval=1
IntrusionDetector.org.owasp.esapi.errors.IntrusionException.actions=log,disable,logout
# for test purposes
# CHECKME: Shouldn't there be something in the property name itself that designates
# that these are for testing???
IntrusionDetector.org.owasp.esapi.errors.IntegrityException.count=10
IntrusionDetector.org.owasp.esapi.errors.IntegrityException.interval=5
IntrusionDetector.org.owasp.esapi.errors.IntegrityException.actions=log,disable,logout
# rapid validation errors indicate scans or attacks in progress
# org.owasp.esapi.errors.ValidationException.count=10
# org.owasp.esapi.errors.ValidationException.interval=10
# org.owasp.esapi.errors.ValidationException.actions=log,logout
# sessions jumping between hosts indicates session hijacking
IntrusionDetector.org.owasp.esapi.errors.AuthenticationHostException.count=2
IntrusionDetector.org.owasp.esapi.errors.AuthenticationHostException.interval=10
IntrusionDetector.org.owasp.esapi.errors.AuthenticationHostException.actions=log,logout
#===========================================================================
# ESAPI Validation
#
# The ESAPI Validator works on regular expressions with defined names. You can define names
# either here, or you may define application specific patterns in a separate file defined below.
# This allows enterprises to specify both organizational standards as well as application specific
# validation rules.
#
Validator.ConfigurationFile=validation.properties
# Validators used by ESAPI
Validator.AccountName=^[a-zA-Z0-9]{3,20}$
Validator.SystemCommand=^[a-zA-Z\\-\\/]{1,64}$
Validator.RoleName=^[a-z]{1,20}$
#the word TEST below should be changed to your application
#name - only relative URL's are supported
Validator.Redirect=^\\/test.*$
# Global HTTP Validation Rules
# Values with Base64 encoded data (e.g. encrypted state) will need at least [a-zA-Z0-9\/+=]
Validator.HTTPScheme=^(http|https)$
Validator.HTTPServerName=^[a-zA-Z0-9_.\\-]*$
Validator.HTTPParameterName=^[a-zA-Z0-9_]{1,32}$
Validator.HTTPParameterValue=^[a-zA-Z0-9.\\-\\/+=@_ ]*$
Validator.HTTPCookieName=^[a-zA-Z0-9\\-_]{1,32}$
Validator.HTTPCookieValue=^[a-zA-Z0-9\\-\\/+=_ ]*$
Validator.HTTPHeaderName=^[a-zA-Z0-9\\-_]{1,32}$
Validator.HTTPHeaderValue=^[a-zA-Z0-9()\\-=\\*\\.\\?;,+\\/:&_ ]*$
Validator.HTTPContextPath=^\\/?[a-zA-Z0-9.\\-\\/_]*$
Validator.HTTPServletPath=^[a-zA-Z0-9.\\-\\/_]*$
Validator.HTTPPath=^[a-zA-Z0-9.\\-_]*$
Validator.HTTPQueryString=^[a-zA-Z0-9()\\-=\\*\\.\\?;,+\\/:&_ %]*$
Validator.HTTPURI=^[a-zA-Z0-9()\\-=\\*\\.\\?;,+\\/:&_ ]*$
Validator.HTTPURL=^.*$
Validator.HTTPJSESSIONID=^[A-Z0-9]{10,30}$
# Validation of file related input
Validator.FileName=^[a-zA-Z0-9!@#$%^&{}\\[\\]()_+\\-=,.~'` ]{1,255}$
Validator.DirectoryName=^[a-zA-Z0-9:/\\\\!@#$%^&{}\\[\\]()_+\\-=,.~'` ]{1,255}$
# Validation of dates. Controls whether or not 'lenient' dates are accepted.
# See DataFormat.setLenient(boolean flag) for further details.
Validator.AcceptLenientDates=false
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence
http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd" version="2.1">
<!-- JDO tutorial "unit" -->
<persistence-unit name="Tutorial">
<exclude-unlisted-classes/>
<properties>
<property name="javax.jdo.PersistenceManagerFactoryClass" value="org.datanucleus.api.jdo.JDOPersistenceManagerFactory"/>
<property name="javax.jdo.option.ConnectionURL" value="jdbc:h2:mem:mypersistence"/>
<property name="javax.jdo.option.ConnectionDriverName" value="org.h2.Driver"/>
<property name="javax.jdo.option.ConnectionUserName" value="sa"/>
<property name="javax.jdo.option.ConnectionPassword" value=""/>
<property name="datanucleus.schema.autoCreateAll" value="true"/>
</properties>
</persistence-unit>
</persistence>
@@ -0,0 +1,3 @@
1|IND|India
2|MY|Malaysia
3|AU|Australia
1 1 IND India
2 2 MY Malaysia
3 3 AU Australia
@@ -0,0 +1,6 @@
dataSourceClassName=//TBD
dataSource.user=//TBD
dataSource.password=//TBD
dataSource.databaseName=//TBD
dataSource.portNumber=//TBD
dataSource.serverName=//TBD
@@ -0,0 +1,15 @@
var first = {
name: "Whiskey",
age: 5
};
var second = {
volume: 100
};
Object.bindProperties(first, second);
print(first.volume);
second.volume = 1000;
print(first.volume);
@@ -0,0 +1 @@
print(__FILE__, __LINE__, __DIR__);
@@ -0,0 +1,19 @@
var math = {
increment: function (num) {
return ++num;
},
failFunc: function () {
try {
throw "BOOM";
} catch (e if typeof e === 'string') {
print("String thrown: " + e);
}
catch (e) {
print("this shouldn't happen!");
}
}
};
math;
@@ -0,0 +1,11 @@
var demo = {
__noSuchProperty__: function (propName) {
print("Accessed non-existing property: " + propName);
},
__noSuchMethod__: function (methodName) {
print("Invoked non-existing method: " + methodName);
}
};
demo;
@@ -0,0 +1 @@
function increment(num) ++num;
@@ -0,0 +1,2 @@
print(" hello world".trimLeft());
print("hello world ".trimRight());
@@ -0,0 +1,9 @@
function arrays(arr) {
var javaIntArray = Java.to(arr, "int[]");
print(javaIntArray[0]);
print(javaIntArray[1]);
print(javaIntArray[2]);
}
arrays([100, "1654", true]);
@@ -0,0 +1,6 @@
log4j.rootLogger=DEBUG, A1
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
@@ -0,0 +1,9 @@
# Root logger
log4j.rootLogger=INFO, file, stdout
# Write to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>web - %date [%thread] %-5level %logger{36} - %message%n
</pattern>
</encoder>
</appender>
<logger name="org.springframework" level="WARN" />
<logger name="org.springframework.transaction" level="WARN" />
<!-- in order to debug some marshalling issues, this needs to be TRACE -->
<logger name="org.springframework.web.servlet.mvc" level="WARN" />
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,2 @@
line 1
a second line
@@ -0,0 +1,177 @@
package com.baeldung.completablefuture;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CompletableFutureLongRunningUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(CompletableFutureLongRunningUnitTest.class);
@Test
public void whenRunningCompletableFutureAsynchronously_thenGetMethodWaitsForResult() throws InterruptedException, ExecutionException {
Future<String> completableFuture = calculateAsync();
String result = completableFuture.get();
assertEquals("Hello", result);
}
private Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
@Test
public void whenRunningCompletableFutureWithResult_thenGetMethodReturnsImmediately() throws InterruptedException, ExecutionException {
Future<String> completableFuture = CompletableFuture.completedFuture("Hello");
String result = completableFuture.get();
assertEquals("Hello", result);
}
private Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
@Test(expected = CancellationException.class)
public void whenCancelingTheFuture_thenThrowsCancellationException() throws ExecutionException, InterruptedException {
Future<String> future = calculateAsyncWithCancellation();
future.get();
}
@Test
public void whenCreatingCompletableFutureWithSupplyAsync_thenFutureReturnsValue() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
assertEquals("Hello", future.get());
}
@Test
public void whenAddingThenAcceptToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAccept(s -> LOG.debug("Computation returned: " + s));
future.get();
}
@Test
public void whenAddingThenRunToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenRun(() -> LOG.debug("Computation finished."));
future.get();
}
@Test
public void whenAddingThenApplyToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
}
@Test
public void whenUsingThenCompose_thenFuturesExecuteSequentially() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello").thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenCombine_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello").thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenAcceptBoth_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> "Hello").thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> LOG.debug(s1 + s2));
}
@Test
public void whenFutureCombinedWithAllOfCompletes_thenAllFuturesAreDone() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
}
@Test
public void whenFutureThrows_thenHandleMethodReceivesException() throws ExecutionException, InterruptedException {
String name = null;
// ...
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
}
@Test(expected = ExecutionException.class)
public void whenCompletingFutureExceptionally_thenGetMethodThrows() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
// ...
completableFuture.get();
}
@Test
public void whenAddingThenApplyAsyncToFuture_thenFunctionExecutesAfterComputationIsFinished() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
}
}
@@ -0,0 +1,38 @@
package com.baeldung.concurrent.accumulator;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.LongBinaryOperator;
import java.util.stream.IntStream;
import static junit.framework.TestCase.assertEquals;
public class LongAccumulatorUnitTest {
@Test
public void givenLongAccumulator_whenApplyActionOnItFromMultipleThrads_thenShouldProduceProperResult() throws InterruptedException {
// given
ExecutorService executorService = Executors.newFixedThreadPool(8);
LongBinaryOperator sum = Long::sum;
LongAccumulator accumulator = new LongAccumulator(sum, 0L);
int numberOfThreads = 4;
int numberOfIncrements = 100;
// when
Runnable accumulateAction = () -> IntStream.rangeClosed(0, numberOfIncrements).forEach(accumulator::accumulate);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(accumulateAction);
}
// then
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
executorService.shutdown();
assertEquals(accumulator.get(), 20200);
}
}
@@ -0,0 +1,67 @@
package com.baeldung.concurrent.adder;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.IntStream;
import static com.jayway.awaitility.Awaitility.await;
import static junit.framework.TestCase.assertEquals;
public class LongAdderUnitTest {
@Test
public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThem() throws InterruptedException {
//given
LongAdder counter = new LongAdder();
ExecutorService executorService = Executors.newFixedThreadPool(8);
int numberOfThreads = 4;
int numberOfIncrements = 100;
//when
Runnable incrementAction = () -> IntStream
.range(0, numberOfIncrements)
.forEach((i) -> counter.increment());
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(incrementAction);
}
//then
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
executorService.shutdown();
assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);
assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);
}
@Test
public void givenMultipleThread_whenTheyWriteToSharedLongAdder_thenShouldCalculateSumForThemAndResetAdderAfterward() throws InterruptedException {
//given
LongAdder counter = new LongAdder();
ExecutorService executorService = Executors.newFixedThreadPool(8);
int numberOfThreads = 4;
int numberOfIncrements = 100;
//when
Runnable incrementAction = () -> IntStream
.range(0, numberOfIncrements)
.forEach((i) -> counter.increment());
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(incrementAction);
}
//then
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
executorService.shutdown();
assertEquals(counter.sumThenReset(), numberOfIncrements * numberOfThreads);
await().until(() -> assertEquals(counter.sum(), 0));
}
}
@@ -0,0 +1,38 @@
package com.baeldung.concurrent.atomic;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Test;
public class ThreadSafeCounterTest {
@Test
public void givenMultiThread_whenSafeCounterWithLockIncrement() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
SafeCounterWithLock safeCounter = new SafeCounterWithLock();
IntStream.range(0, 1000)
.forEach(count -> service.submit(safeCounter::increment));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, safeCounter.getValue());
}
@Test
public void givenMultiThread_whenSafeCounterWithoutLockIncrement() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
SafeCounterWithoutLock safeCounter = new SafeCounterWithoutLock();
IntStream.range(0, 1000)
.forEach(count -> service.submit(safeCounter::increment));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, safeCounter.getValue());
}
}
@@ -0,0 +1,33 @@
package com.baeldung.concurrent.atomic;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Test;
/**
* This test shows the behaviour of a thread-unsafe class in a multithreaded scenario. We are calling
* the increment methods 1000 times from a pool of 3 threads. In most of the cases, the counter will
* less than 1000, because of lost updates, however, occasionally it may reach 1000, when no threads
* called the method simultaneously. This may cause the build to fail occasionally. Hence excluding this
* test from build by adding this in manual test
*/
public class ThreadUnsafeCounterManualTest {
@Test
public void givenMultiThread_whenUnsafeCounterIncrement() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
UnsafeCounter unsafeCounter = new UnsafeCounter();
IntStream.range(0, 1000)
.forEach(count -> service.submit(unsafeCounter::increment));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, unsafeCounter.getValue());
}
}
@@ -0,0 +1,53 @@
package com.baeldung.concurrent.copyonwrite;
import org.junit.Test;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.assertj.core.api.Assertions.assertThat;
public class CopyOnWriteArrayListUnitTest {
@Test
public void givenCopyOnWriteList_whenIterateAndAddElementToUnderneathList_thenShouldNotChangeIterator() {
//given
final CopyOnWriteArrayList<Integer> numbers =
new CopyOnWriteArrayList<>(new Integer[]{1, 3, 5, 8});
//when
Iterator<Integer> iterator = numbers.iterator();
numbers.add(10);
//then
List<Integer> result = new LinkedList<>();
iterator.forEachRemaining(result::add);
assertThat(result).containsOnly(1, 3, 5, 8);
//and
Iterator<Integer> iterator2 = numbers.iterator();
List<Integer> result2 = new LinkedList<>();
iterator2.forEachRemaining(result2::add);
//then
assertThat(result2).containsOnly(1, 3, 5, 8, 10);
}
@Test(expected = UnsupportedOperationException.class)
public void givenCopyOnWriteList_whenIterateOverItAndTryToRemoveElement_thenShouldThrowException() {
//given
final CopyOnWriteArrayList<Integer> numbers =
new CopyOnWriteArrayList<>(new Integer[]{1, 3, 5, 8});
//when
Iterator<Integer> iterator = numbers.iterator();
while (iterator.hasNext()) {
iterator.remove();
}
}
}
@@ -0,0 +1,70 @@
package com.baeldung.concurrent.countdownlatch;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
public class CountdownLatchExampleIntegrationTest {
@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException {
// Given
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(outputScraper, countDownLatch))).limit(5).collect(toList());
// When
workers.forEach(Thread::start);
countDownLatch.await(); // Block until workers finish
outputScraper.add("Latch released");
// Then
outputScraper.forEach(Object::toString);
assertThat(outputScraper).containsExactly("Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released");
}
@Test
public void whenFailingToParallelProcess_thenMainThreadShouldTimeout() throws InterruptedException {
// Given
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))).limit(5).collect(toList());
// When
workers.forEach(Thread::start);
final boolean result = countDownLatch.await(3L, TimeUnit.SECONDS);
// Then
assertThat(result).isFalse();
}
@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException {
// Given
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch readyThreadCounter = new CountDownLatch(5);
CountDownLatch callingThreadBlocker = new CountDownLatch(1);
CountDownLatch completedThreadCounter = new CountDownLatch(5);
List<Thread> workers = Stream.generate(() -> new Thread(new WaitingWorker(outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))).limit(5).collect(toList());
// When
workers.forEach(Thread::start);
readyThreadCounter.await(); // Block until workers start
outputScraper.add("Workers ready");
callingThreadBlocker.countDown(); // Start workers
completedThreadCounter.await(); // Block until workers finish
outputScraper.add("Workers complete");
// Then
outputScraper.forEach(Object::toString);
assertThat(outputScraper).containsExactly("Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete");
}
}
@@ -0,0 +1,82 @@
package com.baeldung.concurrent.delayqueue;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertEquals;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class DelayQueueIntegrationTest {
@Test
public void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
//given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
//when
executor.submit(producer);
executor.submit(consumer);
//then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
@Test
public void givenDelayQueue_whenProduceElementWithHugeDelay_thenConsumerWasNotAbleToConsumeMessageInGivenTime() throws InterruptedException {
//given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
//when
executor.submit(producer);
executor.submit(consumer);
//then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
}
@Test
public void givenDelayQueue_whenProduceElementWithNegativeDelay_thenConsumeMessageImmediately() throws InterruptedException {
//given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
//when
executor.submit(producer);
executor.submit(consumer);
//then
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
}
}
@@ -0,0 +1,22 @@
package com.baeldung.concurrent.future;
import org.junit.Test;
import java.util.concurrent.ForkJoinPool;
import static org.junit.Assert.assertEquals;
public class FactorialSquareCalculatorUnitTest {
@Test
public void whenCalculatesFactorialSquare_thenReturnCorrectValue() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);
forkJoinPool.execute(calculator);
assertEquals("The sum of the squares from 1 to 10 is 385", 385, calculator.join().intValue());
}
}
@@ -0,0 +1,99 @@
package com.baeldung.concurrent.future;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SquareCalculatorIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(SquareCalculatorIntegrationTest.class);
@Rule
public TestName name = new TestName();
private long start;
private SquareCalculator squareCalculator;
@Test
public void givenExecutorIsSingleThreaded_whenTwoExecutionsAreTriggered_thenRunInSequence() throws InterruptedException, ExecutionException {
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
Future<Integer> result1 = squareCalculator.calculate(4);
Future<Integer> result2 = squareCalculator.calculate(1000);
while (!result1.isDone() || !result2.isDone()) {
LOG.debug(String.format("Task 1 is %s and Task 2 is %s.", result1.isDone() ? "done" : "not done", result2.isDone() ? "done" : "not done"));
Thread.sleep(300);
}
assertEquals(16, result1.get().intValue());
assertEquals(1000000, result2.get().intValue());
}
@Test(expected = TimeoutException.class)
public void whenGetWithTimeoutLowerThanExecutionTime_thenThrowException() throws InterruptedException, ExecutionException, TimeoutException {
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
Future<Integer> result = squareCalculator.calculate(4);
result.get(500, TimeUnit.MILLISECONDS);
}
@Test
public void givenExecutorIsMultiThreaded_whenTwoExecutionsAreTriggered_thenRunInParallel() throws InterruptedException, ExecutionException {
squareCalculator = new SquareCalculator(Executors.newFixedThreadPool(2));
Future<Integer> result1 = squareCalculator.calculate(4);
Future<Integer> result2 = squareCalculator.calculate(1000);
while (!result1.isDone() || !result2.isDone()) {
LOG.debug(String.format("Task 1 is %s and Task 2 is %s.", result1.isDone() ? "done" : "not done", result2.isDone() ? "done" : "not done"));
Thread.sleep(300);
}
assertEquals(16, result1.get().intValue());
assertEquals(1000000, result2.get().intValue());
}
@Test(expected = CancellationException.class)
public void whenCancelFutureAndCallGet_thenThrowException() throws InterruptedException, ExecutionException, TimeoutException {
squareCalculator = new SquareCalculator(Executors.newSingleThreadExecutor());
Future<Integer> result = squareCalculator.calculate(4);
boolean canceled = result.cancel(true);
assertTrue("Future was canceled", canceled);
assertTrue("Future was canceled", result.isCancelled());
result.get();
}
@Before
public void start() {
start = System.currentTimeMillis();
}
@After
public void end() {
LOG.debug(String.format("Test %s took %s ms \n", name.getMethodName(), System.currentTimeMillis() - start));
}
}
@@ -0,0 +1,73 @@
package com.baeldung.concurrent.locks;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static junit.framework.TestCase.assertEquals;
public class SharedObjectWithLockManualTest {
@Test
public void whenLockAcquired_ThenLockedIsTrue() {
final SharedObjectWithLock object = new SharedObjectWithLock();
final int threadCount = 2;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
executeThreads(object, threadCount, service);
assertEquals(true, object.isLocked());
service.shutdown();
}
@Test
public void whenLocked_ThenQueuedThread() {
final int threadCount = 4;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
final SharedObjectWithLock object = new SharedObjectWithLock();
executeThreads(object, threadCount, service);
assertEquals(object.hasQueuedThreads(), true);
service.shutdown();
}
public void whenTryLock_ThenQueuedThread() {
final SharedObjectWithLock object = new SharedObjectWithLock();
final int threadCount = 2;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
executeThreads(object, threadCount, service);
assertEquals(true, object.isLocked());
service.shutdown();
}
@Test
public void whenGetCount_ThenCorrectCount() throws InterruptedException {
final int threadCount = 4;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
final SharedObjectWithLock object = new SharedObjectWithLock();
executeThreads(object, threadCount, service);
Thread.sleep(1000);
assertEquals(object.getCounter(), 4);
service.shutdown();
}
private void executeThreads(SharedObjectWithLock object, int threadCount, ExecutorService service) {
for (int i = 0; i < threadCount; i++) {
service.execute(object::perform);
}
}
}
@@ -0,0 +1,57 @@
package com.baeldung.concurrent.locks;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static junit.framework.TestCase.assertEquals;
public class SynchronizedHashMapWithRWLockManualTest {
@Test
public void whenWriting_ThenNoReading() {
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
final int threadCount = 3;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
executeWriterThreads(object, threadCount, service);
assertEquals(object.isReadLockAvailable(), false);
service.shutdown();
}
@Test
public void whenReading_ThenMultipleReadingAllowed() {
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
final int threadCount = 5;
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
executeReaderThreads(object, threadCount, service);
assertEquals(object.isReadLockAvailable(), true);
service.shutdown();
}
private void executeWriterThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
for (int i = 0; i < threadCount; i++) {
service.execute(() -> {
try {
object.put("key" + threadCount, "value" + threadCount);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
private void executeReaderThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
for (int i = 0; i < threadCount; i++)
service.execute(() -> {
object.get("key" + threadCount);
});
}
}
@@ -0,0 +1,41 @@
package com.baeldung.concurrent.phaser;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import static junit.framework.TestCase.assertEquals;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class PhaserUnitTest {
@Test
public void givenPhaser_whenCoordinateWorksBetweenThreads_thenShouldCoordinateBetweenMultiplePhases() {
//given
ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase());
//when
executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));
//then
ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase());
//and
executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();
assertEquals(2, ph.getPhase());
ph.arriveAndDeregister();
}
}
@@ -0,0 +1,57 @@
package com.baeldung.concurrent.priorityblockingqueue;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.util.Lists.newArrayList;
public class PriorityBlockingQueueIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(PriorityBlockingQueueIntegrationTest.class);
@Test
public void givenUnorderedValues_whenPolling_thenShouldOrderQueue() throws InterruptedException {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);
queue.drainTo(polledElements);
assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);
}
@Test
public void whenPollingEmptyQueue_thenShouldBlockThread() throws InterruptedException {
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
final Thread thread = new Thread(() -> {
LOG.debug("Polling...");
while (true) {
try {
Integer poll = queue.take();
LOG.debug("Polled: " + poll);
} catch (InterruptedException ignored) {
}
}
});
thread.start();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
LOG.debug("Adding to queue");
queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
}
@@ -0,0 +1,114 @@
package com.baeldung.concurrent.semaphores;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SemaphoresManualTest {
// ========= login queue ======
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
final int slots = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
final int slots = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();
assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}
// ========= delay queue =======
@Test
public void givenDelayQueue_whenReachLimit_thenBlocked() {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
// ========== mutex ========
@Test
public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException {
final int count = 5;
final ExecutorService executorService = Executors.newFixedThreadPool(count);
final CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
}
@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException {
final int count = 5;
final ExecutorService executorService = Executors.newFixedThreadPool(count);
final CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
Thread.sleep(5000);
assertFalse(counter.hasQueuedThreads());
assertEquals(count, counter.getCount());
}
}
@@ -0,0 +1,120 @@
package com.baeldung.concurrent.skiplist;
import org.junit.Test;
import java.time.ZonedDateTime;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ConcurrentSkipListSetIntegrationTest {
@Test
public void givenThreadsProducingEvents_whenGetForEventsFromLastMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException {
//given
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;
//when
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime
.now()
.minusSeconds(index), UUID
.randomUUID()
.toString())));
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}
Thread.sleep(500);
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute = eventWindowSort.getEventsFromLastMinute();
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e
.getKey()
.isBefore(ZonedDateTime
.now()
.minusMinutes(1)))
.count();
assertEquals(eventsOlderThanOneMinute, 0);
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e
.getKey()
.isAfter(ZonedDateTime
.now()
.minusMinutes(1)))
.count();
//then
assertTrue(eventYoungerThanOneMinute > 0);
executorService.awaitTermination(1, TimeUnit.SECONDS);
executorService.shutdown();
}
@Test
public void givenThreadsProducingEvents_whenGetForEventsOlderThanOneMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException {
//given
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;
//when
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime
.now()
.minusSeconds(index), UUID
.randomUUID()
.toString())));
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}
Thread.sleep(500);
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute = eventWindowSort.getEventsOlderThatOneMinute();
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e
.getKey()
.isBefore(ZonedDateTime
.now()
.minusMinutes(1)))
.count();
assertTrue(eventsOlderThanOneMinute > 0);
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e
.getKey()
.isAfter(ZonedDateTime
.now()
.minusMinutes(1)))
.count();
//then
assertEquals(eventYoungerThanOneMinute, 0);
executorService.awaitTermination(1, TimeUnit.SECONDS);
executorService.shutdown();
}
}
@@ -0,0 +1,37 @@
package com.baeldung.concurrent.synchronize;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
public class BaeldungSychronizedBlockTest {
@Test
public void givenMultiThread_whenBlockSync() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
BaeldungSynchronizedBlocks synchronizedBlocks = new BaeldungSynchronizedBlocks();
IntStream.range(0, 1000)
.forEach(count -> service.submit(synchronizedBlocks::performSynchronisedTask));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, synchronizedBlocks.getCount());
}
@Test
public void givenMultiThread_whenStaticSyncBlock() throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
IntStream.range(0, 1000)
.forEach(count -> service.submit(BaeldungSynchronizedBlocks::performStaticSyncTask));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, BaeldungSynchronizedBlocks.getStaticCount());
}
}
@@ -0,0 +1,51 @@
package com.baeldung.concurrent.synchronize;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
public class BaeldungSynchronizeMethodsTest {
@Test
@Ignore
public void givenMultiThread_whenNonSyncMethod() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
BaeldungSynchronizedMethods method = new BaeldungSynchronizedMethods();
IntStream.range(0, 1000)
.forEach(count -> service.submit(method::calculate));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, method.getSum());
}
@Test
public void givenMultiThread_whenMethodSync() throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
BaeldungSynchronizedMethods method = new BaeldungSynchronizedMethods();
IntStream.range(0, 1000)
.forEach(count -> service.submit(method::synchronisedCalculate));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, method.getSyncSum());
}
@Test
public void givenMultiThread_whenStaticSyncMethod() throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
IntStream.range(0, 1000)
.forEach(count -> service.submit(BaeldungSynchronizedMethods::syncStaticCalculate));
service.awaitTermination(100, TimeUnit.MILLISECONDS);
assertEquals(1000, BaeldungSynchronizedMethods.staticSum);
}
}
@@ -0,0 +1,71 @@
package com.baeldung.concurrent.volatilekeyword;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.Assert.assertEquals;
public class SharedObjectManualTest {
private SharedObject sharedObject;
private int valueReadByThread2;
private int valueReadByThread3;
@Before
public void setUp() {
sharedObject = new SharedObject();
}
@Test
public void whenOneThreadWrites_thenVolatileReadsFromMainMemory() throws InterruptedException {
Thread writer = new Thread(() -> sharedObject.increamentCount());
writer.start();
Thread readerOne = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
valueReadByThread2 = sharedObject.getCount();
});
readerOne.start();
Thread readerTwo = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
valueReadByThread3 = sharedObject.getCount();
});
readerTwo.start();
assertEquals(1, valueReadByThread2);
assertEquals(1, valueReadByThread3);
}
@Test
public void whenTwoThreadWrites_thenVolatileReadsFromMainMemory() throws InterruptedException {
Thread writerOne = new Thread(() -> sharedObject.increamentCount());
writerOne.start();
Thread.sleep(100);
Thread writerTwo = new Thread(() -> sharedObject.increamentCount());
writerTwo.start();
Thread.sleep(100);
Thread readerOne = new Thread(() -> valueReadByThread2 = sharedObject.getCount());
readerOne.start();
Thread readerTwo = new Thread(() -> valueReadByThread3 = sharedObject.getCount());
readerTwo.start();
assertEquals(2, valueReadByThread2);
assertEquals(2, valueReadByThread3);
}
}
@@ -0,0 +1,76 @@
package com.baeldung.java.concurrentmap;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
public class ConcurrentMapAggregateStatusManualTest {
private ExecutorService executorService;
private Map<String, Integer> concurrentMap;
private List<Integer> mapSizes;
private int MAX_SIZE = 100000;
@Before
public void init() {
executorService = Executors.newFixedThreadPool(2);
concurrentMap = new ConcurrentHashMap<>();
mapSizes = new ArrayList<>(MAX_SIZE);
}
@Test
public void givenConcurrentMap_whenSizeWithoutConcurrentUpdates_thenCorrect() throws InterruptedException {
Runnable collectMapSizes = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
concurrentMap.put(String.valueOf(i), i);
mapSizes.add(concurrentMap.size());
}
};
Runnable retrieveMapData = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
concurrentMap.get(String.valueOf(i));
}
};
executorService.execute(retrieveMapData);
executorService.execute(collectMapSizes);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
for (int i = 1; i <= MAX_SIZE; i++) {
assertEquals("map size should be consistently reliable", i, mapSizes.get(i - 1).intValue());
}
assertEquals(MAX_SIZE, concurrentMap.size());
}
@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() throws InterruptedException {
Runnable collectMapSizes = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
mapSizes.add(concurrentMap.size());
}
};
Runnable updateMapData = () -> {
for (int i = 0; i < MAX_SIZE; i++) {
concurrentMap.put(String.valueOf(i), i);
}
};
executorService.execute(updateMapData);
executorService.execute(collectMapSizes);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
assertNotEquals("map size collected with concurrent updates not reliable", MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
assertEquals(MAX_SIZE, concurrentMap.size());
}
}
@@ -0,0 +1,160 @@
package com.baeldung.java.concurrentmap;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.assertNull;
public class ConcurrentMapNullKeyValueManualTest {
ConcurrentMap<String, Object> concurrentMap;
@Before
public void setup() {
concurrentMap = new ConcurrentHashMap<>();
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenGetWithNullKey_thenThrowsNPE() {
concurrentMap.get(null);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenGetOrDefaultWithNullKey_thenThrowsNPE() {
concurrentMap.getOrDefault(null, new Object());
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
concurrentMap.put(null, new Object());
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
concurrentMap.put("test", null);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMapAndKeyAbsent_whenPutWithNullKey_thenThrowsNPE() {
concurrentMap.putIfAbsent(null, new Object());
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMapAndMapWithNullKey_whenPutNullKeyMap_thenThrowsNPE() {
Map<String, Object> nullKeyMap = new HashMap<>();
nullKeyMap.put(null, new Object());
concurrentMap.putAll(nullKeyMap);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMapAndMapWithNullValue_whenPutNullValueMap_thenThrowsNPE() {
Map<String, Object> nullValueMap = new HashMap<>();
nullValueMap.put("test", null);
concurrentMap.putAll(nullValueMap);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenReplaceNullKeyWithValues_thenThrowsNPE() {
concurrentMap.replace(null, new Object(), new Object());
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenReplaceWithNullNewValue_thenThrowsNPE() {
Object o = new Object();
concurrentMap.replace("test", o, null);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenReplaceOldNullValue_thenThrowsNPE() {
Object o = new Object();
concurrentMap.replace("test", null, o);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenReplaceWithNullValue_thenThrowsNPE() {
concurrentMap.replace("test", null);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenReplaceNullKey_thenThrowsNPE() {
concurrentMap.replace(null, "test");
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenReplaceAllMappingNull_thenThrowsNPE() {
concurrentMap.put("test", new Object());
concurrentMap.replaceAll((s, o) -> null);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenRemoveNullKey_thenThrowsNPE() {
concurrentMap.remove(null);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenRemoveNullKeyWithValue_thenThrowsNPE() {
concurrentMap.remove(null, new Object());
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenMergeNullKeyWithValue_thenThrowsNPE() {
concurrentMap.merge(null, new Object(), (o, o2) -> o2);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenMergeKeyWithNullValue_thenThrowsNPE() {
concurrentMap.put("test", new Object());
concurrentMap.merge("test", null, (o, o2) -> o2);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMapAndAssumeKeyAbsent_whenComputeWithNullKey_thenThrowsNPE() {
concurrentMap.computeIfAbsent(null, s -> s);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMapAndAssumeKeyPresent_whenComputeWithNullKey_thenThrowsNPE() {
concurrentMap.computeIfPresent(null, (s, o) -> o);
}
@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenComputeWithNullKey_thenThrowsNPE() {
concurrentMap.compute(null, (s, o) -> o);
}
@Test
public void givenConcurrentHashMap_whenMergeKeyRemappingNull_thenRemovesMapping() {
Object oldValue = new Object();
concurrentMap.put("test", oldValue);
concurrentMap.merge("test", new Object(), (o, o2) -> null);
assertNull(concurrentMap.get("test"));
}
@Test
public void givenConcurrentHashMapAndKeyAbsent_whenComputeWithKeyRemappingNull_thenRemainsAbsent() {
concurrentMap.computeIfPresent("test", (s, o) -> null);
assertNull(concurrentMap.get("test"));
}
@Test
public void givenKeyPresent_whenComputeIfPresentRemappingNull_thenMappingRemoved() {
Object oldValue = new Object();
concurrentMap.put("test", oldValue);
concurrentMap.computeIfPresent("test", (s, o) -> null);
assertNull(concurrentMap.get("test"));
}
@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
Object oldValue = new Object();
concurrentMap.put("test", oldValue);
concurrentMap.compute("test", (s, o) -> null);
assertNull(concurrentMap.get("test"));
}
}
@@ -0,0 +1,95 @@
package com.baeldung.java.concurrentmap;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ConcurrentMapPerformanceManualTest {
@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() throws Exception {
Map<String, Object> hashtable = new Hashtable<>();
Map<String, Object> synchronizedHashMap = Collections.synchronizedMap(new HashMap<>());
Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();
long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
long syncHashMapAvgRuntime = timeElapseForGetPut(synchronizedHashMap);
long concurrentHashMapAvgRuntime = timeElapseForGetPut(concurrentHashMap);
System.out.println(String.format("Hashtable: %s, syncHashMap: %s, ConcurrentHashMap: %s", hashtableAvgRuntime, syncHashMapAvgRuntime, concurrentHashMapAvgRuntime));
assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}
private long timeElapseForGetPut(Map<String, Object> map) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
long startTime = System.nanoTime();
for (int i = 0; i < 4; i++) {
executorService.execute(() -> {
for (int j = 0; j < 500_000; j++) {
int value = ThreadLocalRandom.current().nextInt(10000);
String key = String.valueOf(value);
map.put(key, value);
map.get(key);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
return (System.nanoTime() - startTime) / 500_000;
}
@Test
public void givenConcurrentMap_whenKeyWithSameHashCode_thenPerformanceDegrades() throws InterruptedException {
class SameHash {
@Override
public int hashCode() {
return 1;
}
}
int executeTimes = 5000;
Map<SameHash, Integer> mapOfSameHash = new ConcurrentHashMap<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
long sameHashStartTime = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
executorService.execute(() -> {
for (int j = 0; j < executeTimes; j++) {
mapOfSameHash.put(new SameHash(), 1);
}
});
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
long mapOfSameHashDuration = System.currentTimeMillis() - sameHashStartTime;
Map<Object, Integer> mapOfDefaultHash = new ConcurrentHashMap<>();
executorService = Executors.newFixedThreadPool(2);
long defaultHashStartTime = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
executorService.execute(() -> {
for (int j = 0; j < executeTimes; j++) {
mapOfDefaultHash.put(new Object(), 1);
}
});
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
long mapOfDefaultHashDuration = System.currentTimeMillis() - defaultHashStartTime;
assertEquals(executeTimes * 2, mapOfDefaultHash.size());
assertEquals(executeTimes * 2, mapOfSameHash.size());
System.out.println(String.format("same-hash: %s, default-hash: %s", mapOfSameHashDuration, mapOfDefaultHashDuration));
assertTrue("same hashCode() should greatly degrade performance", mapOfSameHashDuration > mapOfDefaultHashDuration * 10);
}
}
@@ -0,0 +1,79 @@
package com.baeldung.java.concurrentmap;
import org.junit.Test;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class ConcurrentNavigableMapManualTest {
@Test
public void givenSkipListMap_whenAccessInMultiThreads_thenOrderingStable() throws InterruptedException {
NavigableMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
updateMapConcurrently(skipListMap, 4);
Iterator<Integer> skipListIter = skipListMap.keySet().iterator();
int previous = skipListIter.next();
while (skipListIter.hasNext()) {
int current = skipListIter.next();
assertTrue(previous < current);
}
}
private void updateMapConcurrently(NavigableMap<Integer, String> navigableMap, int concurrencyLevel) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
for (int i = 0; i < concurrencyLevel; i++) {
executorService.execute(() -> {
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int j = 0; j < 10000; j++) {
navigableMap.put(random.nextInt(), "test");
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() throws InterruptedException {
NavigableMap<Integer, Integer> skipListMap = new ConcurrentSkipListMap<>();
int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
assertEquals(10000 * 4, count);
}
@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() throws InterruptedException {
NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
assertNotEquals(10000 * 4, count);
}
private int countMapElementByPollingFirstEntry(NavigableMap<Integer, Integer> navigableMap, int elementCount, int concurrencyLevel) throws InterruptedException {
for (int i = 0; i < elementCount * concurrencyLevel; i++) {
navigableMap.put(i, i);
}
AtomicInteger counter = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
for (int j = 0; j < concurrencyLevel; j++) {
executorService.execute(() -> {
for (int i = 0; i < elementCount; i++) {
if (navigableMap.pollFirstEntry() != null) {
counter.incrementAndGet();
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
return counter.get();
}
}
@@ -0,0 +1,60 @@
package com.baeldung.java.concurrentmap;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class ConcurretMapMemoryConsistencyManualTest {
@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect() throws Exception {
Map<String, Integer> map = new ConcurrentHashMap<>();
List<Integer> sumList = parallelSum100(map, 1000);
assertEquals(1, sumList.stream().distinct().count());
long wrongResultCount = sumList.stream().filter(num -> num != 100).count();
assertEquals(0, wrongResultCount);
}
@Test
public void givenHashtable_whenSumParallel_thenCorrect() throws Exception {
Map<String, Integer> map = new Hashtable<>();
List<Integer> sumList = parallelSum100(map, 1000);
assertEquals(1, sumList.stream().distinct().count());
long wrongResultCount = sumList.stream().filter(num -> num != 100).count();
assertEquals(0, wrongResultCount);
}
@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
Map<String, Integer> map = new HashMap<>();
List<Integer> sumList = parallelSum100(map, 100);
assertNotEquals(1, sumList.stream().distinct().count());
long wrongResultCount = sumList.stream().filter(num -> num != 100).count();
assertTrue(wrongResultCount > 0);
}
private List<Integer> parallelSum100(Map<String, Integer> map, int executionTimes) throws InterruptedException {
List<Integer> sumList = new ArrayList<>(1000);
for (int i = 0; i < executionTimes; i++) {
map.put("test", 0);
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int j = 0; j < 10; j++) {
executorService.execute(() -> {
for (int k = 0; k < 10; k++)
map.computeIfPresent("test", (key, value) -> value + 1);
});
}
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
sumList.add(map.get("test"));
}
return sumList;
}
}
@@ -0,0 +1,80 @@
package com.baeldung.java.concurrentmodification;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.util.Lists.newArrayList;
public class ConcurrentModificationUnitTest {
@Test(expected = ConcurrentModificationException.class)
public void givenIterating_whenRemoving_thenThrowException() throws InterruptedException {
List<Integer> integers = newArrayList(1, 2, 3);
for (Integer integer : integers) {
integers.remove(1);
}
}
@Test
public void givenIterating_whenUsingIteratorRemove_thenNoError() throws InterruptedException {
List<Integer> integers = newArrayList(1, 2, 3);
for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext();) {
Integer integer = iterator.next();
if(integer == 2) {
iterator.remove();
}
}
assertThat(integers).containsExactly(1, 3);
}
@Test
public void givenIterating_whenUsingRemovalList_thenNoError() throws InterruptedException {
List<Integer> integers = newArrayList(1, 2, 3);
List<Integer> toRemove = newArrayList();
for (Integer integer : integers) {
if(integer == 2) {
toRemove.add(integer);
}
}
integers.removeAll(toRemove);
assertThat(integers).containsExactly(1, 3);
}
@Test
public void whenUsingRemoveIf_thenRemoveElements() throws InterruptedException {
Collection<Integer> integers = newArrayList(1, 2, 3);
integers.removeIf(i -> i == 2);
assertThat(integers).containsExactly(1, 3);
}
@Test
public void whenUsingStream_thenRemoveElements() {
Collection<Integer> integers = newArrayList(1, 2, 3);
List<String> collected = integers
.stream()
.filter(i -> i != 2)
.map(Object::toString)
.collect(toList());
assertThat(collected).containsExactly("1", "3");
}
}

Some files were not shown because too many files have changed in this diff Show More