From 2648a4ec25aacd6a9d88648ec47e5e99a75a67ff Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Thu, 20 Apr 2017 23:05:44 +0200 Subject: [PATCH] Bael 845 transfer queue (#1697) * BAEL-845 tranferqueue article * BAEL-845 m to m example * BAEL-845 move code to test * BAEL-845 use tryTransfer * BAEL-845 proper if logic * BAEL-845 proper test * BAEL-845 robust test --- .../com/baeldung/transferqueue/Consumer.java | 5 ++++- .../com/baeldung/transferqueue/Producer.java | 3 +++ .../transferqueue/TransferQueueTest.java | 16 ++++++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java b/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java index 81a24ea5a8..a498d08041 100644 --- a/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java +++ b/core-java/src/main/java/com/baeldung/transferqueue/Consumer.java @@ -1,11 +1,13 @@ package com.baeldung.transferqueue; import java.util.concurrent.TransferQueue; +import java.util.concurrent.atomic.AtomicInteger; public class Consumer implements Runnable { private final TransferQueue transferQueue; private final String name; private final int numberOfMessagesToConsume; + public final AtomicInteger numberOfConsumedMessages = new AtomicInteger(); public Consumer(TransferQueue transferQueue, String name, int numberOfMessagesToConsume) { this.transferQueue = transferQueue; @@ -28,6 +30,7 @@ public class Consumer implements Runnable { } private void longProcessing(String element) throws InterruptedException { - Thread.sleep(1_000); + numberOfConsumedMessages.incrementAndGet(); + Thread.sleep(500); } } \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/transferqueue/Producer.java b/core-java/src/main/java/com/baeldung/transferqueue/Producer.java index 0aa2e32866..c9edc69e33 100644 --- a/core-java/src/main/java/com/baeldung/transferqueue/Producer.java +++ b/core-java/src/main/java/com/baeldung/transferqueue/Producer.java @@ -2,11 +2,13 @@ package com.baeldung.transferqueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; +import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { private final TransferQueue transferQueue; private final String name; private final Integer numberOfMessagesToProduce; + public final AtomicInteger numberOfProducedMessages = new AtomicInteger(); public Producer(TransferQueue transferQueue, String name, Integer numberOfMessagesToProduce) { this.transferQueue = transferQueue; @@ -23,6 +25,7 @@ public class Producer implements Runnable { if (!added) { System.out.println("can not add an element due to the timeout"); } else { + numberOfProducedMessages.incrementAndGet(); System.out.println("Producer: " + name + " transferred element: A" + i); } } catch (InterruptedException e) { diff --git a/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java b/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java index 056a9f9c33..4a1a663ce4 100644 --- a/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java +++ b/core-java/src/test/java/com/baeldung/transferqueue/TransferQueueTest.java @@ -1,9 +1,14 @@ package com.baeldung.transferqueue; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; import java.util.concurrent.*; +import static junit.framework.TestCase.assertEquals; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class TransferQueueTest { @Test @@ -25,6 +30,9 @@ public class TransferQueueTest { //then exService.awaitTermination(10_000, TimeUnit.MILLISECONDS); exService.shutdown(); + + assertEquals(producer1.numberOfProducedMessages.intValue(), 3); + assertEquals(producer2.numberOfProducedMessages.intValue(), 3); } @Test @@ -42,6 +50,9 @@ public class TransferQueueTest { //then exService.awaitTermination(5000, TimeUnit.MILLISECONDS); exService.shutdown(); + + assertEquals(producer.numberOfProducedMessages.intValue(), 3); + assertEquals(consumer.numberOfConsumedMessages.intValue(), 3); } @Test @@ -57,6 +68,7 @@ public class TransferQueueTest { //then exService.awaitTermination(5000, TimeUnit.MILLISECONDS); exService.shutdown(); - } -} \ No newline at end of file + assertEquals(producer.numberOfProducedMessages.intValue(), 0); + } +}