JAVA-2109: Move Guide to the Java TransferQueue to the core-java-concurrency-collections-2

This commit is contained in:
Krzysztof Woyke
2020-07-14 13:30:03 +02:00
parent 9481177e62
commit 81b412c6e7
5 changed files with 8 additions and 2 deletions
@@ -1,41 +0,0 @@
package com.baeldung.transferqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Consumer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final TransferQueue<String> transferQueue;
private final String name;
final int numberOfMessagesToConsume;
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
LOG.debug("Consumer: " + name + " is waiting to take element...");
String element = transferQueue.take();
longProcessing(element);
LOG.debug("Consumer: " + name + " received element: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element) throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(500);
}
}
@@ -1,41 +0,0 @@
package com.baeldung.transferqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
private final TransferQueue<String> transferQueue;
private final String name;
final Integer numberOfMessagesToProduce;
final AtomicInteger numberOfProducedMessages = new AtomicInteger();
Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
}
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
LOG.debug("Producer: " + name + " is waiting to transfer...");
boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if (added) {
numberOfProducedMessages.incrementAndGet();
LOG.debug("Producer: " + name + " transferred element: A" + i);
} else {
LOG.debug("can not add an element due to the timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}