[BAEL-8401] - Added new module core-java-concurrency-collections and moved code and github references from core-java-concurrency module
This commit is contained in:
+27
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.concurrent.blockingqueue;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
public class BlockingQueueUsage {
|
||||
public static void main(String[] args) {
|
||||
int BOUND = 10;
|
||||
int N_PRODUCERS = 4;
|
||||
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
|
||||
int poisonPill = Integer.MAX_VALUE;
|
||||
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
|
||||
int mod = N_CONSUMERS % N_PRODUCERS;
|
||||
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
|
||||
|
||||
for (int i = 1; i < N_PRODUCERS; i++) {
|
||||
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
|
||||
}
|
||||
|
||||
for (int j = 0; j < N_CONSUMERS; j++) {
|
||||
new Thread(new NumbersConsumer(queue, poisonPill)).start();
|
||||
}
|
||||
|
||||
new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer+mod)).start();
|
||||
|
||||
}
|
||||
}
|
||||
+28
@@ -0,0 +1,28 @@
|
||||
package com.baeldung.concurrent.blockingqueue;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
public class NumbersConsumer implements Runnable {
|
||||
private final BlockingQueue<Integer> queue;
|
||||
private final int poisonPill;
|
||||
|
||||
NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
|
||||
this.queue = queue;
|
||||
this.poisonPill = poisonPill;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
Integer number = queue.take();
|
||||
if (number.equals(poisonPill)) {
|
||||
return;
|
||||
}
|
||||
String result = number.toString();
|
||||
System.out.println(Thread.currentThread().getName() + " result: " + result);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
+36
@@ -0,0 +1,36 @@
|
||||
package com.baeldung.concurrent.blockingqueue;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class NumbersProducer implements Runnable {
|
||||
|
||||
private final BlockingQueue<Integer> numbersQueue;
|
||||
private final int poisonPill;
|
||||
private final int poisonPillPerProducer;
|
||||
|
||||
NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
|
||||
this.numbersQueue = numbersQueue;
|
||||
this.poisonPill = poisonPill;
|
||||
this.poisonPillPerProducer = poisonPillPerProducer;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
generateNumbers();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread()
|
||||
.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private void generateNumbers() throws InterruptedException {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
numbersQueue.put(ThreadLocalRandom.current()
|
||||
.nextInt(100));
|
||||
}
|
||||
for (int j = 0; j < poisonPillPerProducer; j++) {
|
||||
numbersQueue.put(poisonPill);
|
||||
}
|
||||
}
|
||||
}
|
||||
+32
@@ -0,0 +1,32 @@
|
||||
package com.baeldung.concurrent.delayqueue;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DelayObject implements Delayed {
|
||||
private String data;
|
||||
private long startTime;
|
||||
|
||||
DelayObject(String data, long delayInMilliseconds) {
|
||||
this.data = data;
|
||||
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
long diff = startTime - System.currentTimeMillis();
|
||||
return unit.convert(diff, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
return Ints.saturatedCast(this.startTime - ((DelayObject) o).startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{" + "data='" + data + '\'' + ", startTime=" + startTime + '}';
|
||||
}
|
||||
}
|
||||
+30
@@ -0,0 +1,30 @@
|
||||
package com.baeldung.concurrent.delayqueue;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
public class DelayQueueConsumer implements Runnable {
|
||||
private BlockingQueue<DelayObject> queue;
|
||||
private final Integer numberOfElementsToTake;
|
||||
final AtomicInteger numberOfConsumedElements = new AtomicInteger();
|
||||
|
||||
DelayQueueConsumer(BlockingQueue<DelayObject> queue, Integer numberOfElementsToTake) {
|
||||
this.queue = queue;
|
||||
this.numberOfElementsToTake = numberOfElementsToTake;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < numberOfElementsToTake; i++) {
|
||||
try {
|
||||
DelayObject object = queue.take();
|
||||
numberOfConsumedElements.incrementAndGet();
|
||||
System.out.println("Consumer take: " + object);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
+34
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.concurrent.delayqueue;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
|
||||
public class DelayQueueProducer implements Runnable {
|
||||
private BlockingQueue<DelayObject> queue;
|
||||
private final Integer numberOfElementsToProduce;
|
||||
private final Integer delayOfEachProducedMessageMilliseconds;
|
||||
|
||||
DelayQueueProducer(BlockingQueue<DelayObject> queue,
|
||||
Integer numberOfElementsToProduce,
|
||||
Integer delayOfEachProducedMessageMilliseconds) {
|
||||
this.queue = queue;
|
||||
this.numberOfElementsToProduce = numberOfElementsToProduce;
|
||||
this.delayOfEachProducedMessageMilliseconds = delayOfEachProducedMessageMilliseconds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < numberOfElementsToProduce; i++) {
|
||||
DelayObject object
|
||||
= new DelayObject(UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
|
||||
System.out.println("Put object = " + object);
|
||||
try {
|
||||
queue.put(object);
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ie) {
|
||||
ie.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
+83
@@ -0,0 +1,83 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Stack;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class ReentrantLockWithCondition {
|
||||
|
||||
private static Logger LOG = LoggerFactory.getLogger(ReentrantLockWithCondition.class);
|
||||
|
||||
private Stack<String> stack = new Stack<>();
|
||||
private static final int CAPACITY = 5;
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
private Condition stackEmptyCondition = lock.newCondition();
|
||||
private Condition stackFullCondition = lock.newCondition();
|
||||
|
||||
private void pushToStack(String item) throws InterruptedException {
|
||||
try {
|
||||
lock.lock();
|
||||
if (stack.size() == CAPACITY) {
|
||||
LOG.info(Thread.currentThread().getName() + " wait on stack full");
|
||||
stackFullCondition.await();
|
||||
}
|
||||
LOG.info("Pushing the item " + item);
|
||||
stack.push(item);
|
||||
stackEmptyCondition.signalAll();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String popFromStack() throws InterruptedException {
|
||||
try {
|
||||
lock.lock();
|
||||
if (stack.size() == 0) {
|
||||
LOG.info(Thread.currentThread().getName() + " wait on stack empty");
|
||||
stackEmptyCondition.await();
|
||||
}
|
||||
return stack.pop();
|
||||
} finally {
|
||||
stackFullCondition.signalAll();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
final int threadCount = 2;
|
||||
ReentrantLockWithCondition object = new ReentrantLockWithCondition();
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
service.execute(() -> {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
object.pushToStack("Item " + i);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
service.execute(() -> {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
LOG.info("Item popped " + object.popFromStack());
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
}
|
||||
+88
@@ -0,0 +1,88 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class SharedObjectWithLock {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SharedObjectWithLock.class);
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock(true);
|
||||
|
||||
private int counter = 0;
|
||||
|
||||
void perform() {
|
||||
|
||||
lock.lock();
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " acquired the lock");
|
||||
try {
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " processing");
|
||||
counter++;
|
||||
} catch (Exception exception) {
|
||||
LOG.error(" Interrupted Exception ", exception);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " released the lock");
|
||||
}
|
||||
}
|
||||
|
||||
private void performTryLock() {
|
||||
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " attempting to acquire the lock");
|
||||
try {
|
||||
boolean isLockAcquired = lock.tryLock(2, TimeUnit.SECONDS);
|
||||
if (isLockAcquired) {
|
||||
try {
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " acquired the lock");
|
||||
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " processing");
|
||||
sleep(1000);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " released the lock");
|
||||
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException exception) {
|
||||
LOG.error(" Interrupted Exception ", exception);
|
||||
}
|
||||
LOG.info("Thread - " + Thread.currentThread().getName() + " could not acquire the lock");
|
||||
}
|
||||
|
||||
public ReentrantLock getLock() {
|
||||
return lock;
|
||||
}
|
||||
|
||||
boolean isLocked() {
|
||||
return lock.isLocked();
|
||||
}
|
||||
|
||||
boolean hasQueuedThreads() {
|
||||
return lock.hasQueuedThreads();
|
||||
}
|
||||
|
||||
int getCounter() {
|
||||
return counter;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
final int threadCount = 2;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
final SharedObjectWithLock object = new SharedObjectWithLock();
|
||||
|
||||
service.execute(object::perform);
|
||||
service.execute(object::performTryLock);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
+104
@@ -0,0 +1,104 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class StampedLockDemo {
|
||||
private Map<String, String> map = new HashMap<>();
|
||||
private Logger logger = LoggerFactory.getLogger(StampedLockDemo.class);
|
||||
|
||||
private final StampedLock lock = new StampedLock();
|
||||
|
||||
public void put(String key, String value) throws InterruptedException {
|
||||
long stamp = lock.writeLock();
|
||||
|
||||
try {
|
||||
logger.info(Thread.currentThread().getName() + " acquired the write lock with stamp " + stamp);
|
||||
map.put(key, value);
|
||||
} finally {
|
||||
lock.unlockWrite(stamp);
|
||||
logger.info(Thread.currentThread().getName() + " unlocked the write lock with stamp " + stamp);
|
||||
}
|
||||
}
|
||||
|
||||
public String get(String key) throws InterruptedException {
|
||||
long stamp = lock.readLock();
|
||||
logger.info(Thread.currentThread().getName() + " acquired the read lock with stamp " + stamp);
|
||||
try {
|
||||
sleep(5000);
|
||||
return map.get(key);
|
||||
|
||||
} finally {
|
||||
lock.unlockRead(stamp);
|
||||
logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String readWithOptimisticLock(String key) throws InterruptedException {
|
||||
long stamp = lock.tryOptimisticRead();
|
||||
String value = map.get(key);
|
||||
|
||||
if (!lock.validate(stamp)) {
|
||||
stamp = lock.readLock();
|
||||
try {
|
||||
sleep(5000);
|
||||
return map.get(key);
|
||||
|
||||
} finally {
|
||||
lock.unlock(stamp);
|
||||
logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp);
|
||||
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
final int threadCount = 4;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
StampedLockDemo object = new StampedLockDemo();
|
||||
|
||||
Runnable writeTask = () -> {
|
||||
|
||||
try {
|
||||
object.put("key1", "value1");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
Runnable readTask = () -> {
|
||||
|
||||
try {
|
||||
object.get("key1");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
Runnable readOptimisticTask = () -> {
|
||||
|
||||
try {
|
||||
object.readWithOptimisticLock("key1");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
service.submit(writeTask);
|
||||
service.submit(writeTask);
|
||||
service.submit(readTask);
|
||||
service.submit(readOptimisticTask);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
+120
@@ -0,0 +1,120 @@
|
||||
package com.baeldung.concurrent.locks;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
||||
public class SynchronizedHashMapWithRWLock {
|
||||
|
||||
private static Map<String, String> syncHashMap = new HashMap<>();
|
||||
private Logger logger = LoggerFactory.getLogger(SynchronizedHashMapWithRWLock.class);
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final Lock readLock = lock.readLock();
|
||||
private final Lock writeLock = lock.writeLock();
|
||||
|
||||
public void put(String key, String value) throws InterruptedException {
|
||||
|
||||
try {
|
||||
writeLock.lock();
|
||||
logger.info(Thread.currentThread().getName() + " writing");
|
||||
syncHashMap.put(key, value);
|
||||
sleep(1000);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public String get(String key) {
|
||||
try {
|
||||
readLock.lock();
|
||||
logger.info(Thread.currentThread().getName() + " reading");
|
||||
return syncHashMap.get(key);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public String remove(String key) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
return syncHashMap.remove(key);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean containsKey(String key) {
|
||||
try {
|
||||
readLock.lock();
|
||||
return syncHashMap.containsKey(key);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean isReadLockAvailable() {
|
||||
return readLock.tryLock();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
final int threadCount = 3;
|
||||
final ExecutorService service = Executors.newFixedThreadPool(threadCount);
|
||||
SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock();
|
||||
|
||||
service.execute(new Thread(new Writer(object), "Writer"));
|
||||
service.execute(new Thread(new Reader(object), "Reader1"));
|
||||
service.execute(new Thread(new Reader(object), "Reader2"));
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
private static class Reader implements Runnable {
|
||||
|
||||
SynchronizedHashMapWithRWLock object;
|
||||
|
||||
Reader(SynchronizedHashMapWithRWLock object) {
|
||||
this.object = object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
object.get("key" + i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class Writer implements Runnable {
|
||||
|
||||
SynchronizedHashMapWithRWLock object;
|
||||
|
||||
public Writer(SynchronizedHashMapWithRWLock object) {
|
||||
this.object = object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
object.put("key" + i, "value" + i);
|
||||
sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.concurrent.skiplist;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
class Event {
|
||||
private final ZonedDateTime eventTime;
|
||||
private final String content;
|
||||
|
||||
Event(ZonedDateTime eventTime, String content) {
|
||||
this.eventTime = eventTime;
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
ZonedDateTime getEventTime() {
|
||||
return eventTime;
|
||||
}
|
||||
|
||||
String getContent() {
|
||||
return content;
|
||||
}
|
||||
}
|
||||
+29
@@ -0,0 +1,29 @@
|
||||
package com.baeldung.concurrent.skiplist;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
class EventWindowSort {
|
||||
private final ConcurrentSkipListMap<ZonedDateTime, String> events
|
||||
= new ConcurrentSkipListMap<>(Comparator.comparingLong(value -> value.toInstant().toEpochMilli()));
|
||||
|
||||
void acceptEvent(Event event) {
|
||||
events.put(event.getEventTime(), event.getContent());
|
||||
}
|
||||
|
||||
ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
|
||||
return events.tailMap(ZonedDateTime
|
||||
.now()
|
||||
.minusMinutes(1));
|
||||
}
|
||||
|
||||
ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
|
||||
return events.headMap(ZonedDateTime
|
||||
.now()
|
||||
.minusMinutes(1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+41
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.transferqueue;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TransferQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Consumer implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
|
||||
|
||||
private final TransferQueue<String> transferQueue;
|
||||
private final String name;
|
||||
final int numberOfMessagesToConsume;
|
||||
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
|
||||
|
||||
Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
|
||||
this.transferQueue = transferQueue;
|
||||
this.name = name;
|
||||
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < numberOfMessagesToConsume; i++) {
|
||||
try {
|
||||
LOG.debug("Consumer: " + name + " is waiting to take element...");
|
||||
String element = transferQueue.take();
|
||||
longProcessing(element);
|
||||
LOG.debug("Consumer: " + name + " received element: " + element);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void longProcessing(String element) throws InterruptedException {
|
||||
numberOfConsumedMessages.incrementAndGet();
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
+41
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.transferqueue;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TransferQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Producer implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
|
||||
|
||||
private final TransferQueue<String> transferQueue;
|
||||
private final String name;
|
||||
final Integer numberOfMessagesToProduce;
|
||||
final AtomicInteger numberOfProducedMessages = new AtomicInteger();
|
||||
|
||||
Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
|
||||
this.transferQueue = transferQueue;
|
||||
this.name = name;
|
||||
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < numberOfMessagesToProduce; i++) {
|
||||
try {
|
||||
LOG.debug("Producer: " + name + " is waiting to transfer...");
|
||||
boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
|
||||
if (added) {
|
||||
numberOfProducedMessages.incrementAndGet();
|
||||
LOG.debug("Producer: " + name + " transferred element: A" + i);
|
||||
} else {
|
||||
LOG.debug("can not add an element due to the timeout");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>web - %date [%thread] %-5level %logger{36} - %message%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="org.springframework" level="WARN" />
|
||||
<logger name="org.springframework.transaction" level="WARN" />
|
||||
|
||||
<!-- in order to debug some marshalling issues, this needs to be TRACE -->
|
||||
<logger name="org.springframework.web.servlet.mvc" level="WARN" />
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user