Merge remote-tracking branch 'eugenp/master'

This commit is contained in:
DOHA
2017-08-25 17:45:23 +02:00
234 changed files with 1887 additions and 1691 deletions
@@ -19,13 +19,15 @@ public class NumbersProducer implements Runnable {
try {
generateNumbers();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Thread.currentThread()
.interrupt();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
numbersQueue.put(ThreadLocalRandom.current()
.nextInt(100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put(poisonPill);
@@ -27,9 +27,6 @@ public class DelayObject implements Delayed {
@Override
public String toString() {
return "{" +
"data='" + data + '\'' +
", startTime=" + startTime +
'}';
return "{" + "data='" + data + '\'' + ", startTime=" + startTime + '}';
}
}
@@ -15,7 +15,6 @@ public class CompletableFutureLongRunningUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(CompletableFutureLongRunningUnitTest.class);
@Test
public void whenRunningCompletableFutureAsynchronously_thenGetMethodWaitsForResult() throws InterruptedException, ExecutionException {
Future<String> completableFuture = calculateAsync();
@@ -27,11 +26,12 @@ public class CompletableFutureLongRunningUnitTest {
private Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
Executors.newCachedThreadPool()
.submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
@@ -47,11 +47,12 @@ public class CompletableFutureLongRunningUnitTest {
private Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
Executors.newCachedThreadPool()
.submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
@@ -98,21 +99,24 @@ public class CompletableFutureLongRunningUnitTest {
@Test
public void whenUsingThenCompose_thenFuturesExecuteSequentially() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello").thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenCombine_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello").thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
assertEquals("Hello World", completableFuture.get());
}
@Test
public void whenUsingThenAcceptBoth_thenWaitForExecutionOfBothFutures() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> "Hello").thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> LOG.debug(s1 + s2));
CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> LOG.debug(s1 + s2));
}
@Test
@@ -131,7 +135,9 @@ public class CompletableFutureLongRunningUnitTest {
assertTrue(future2.isDone());
assertTrue(future3.isDone());
String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
}
@@ -147,7 +153,8 @@ public class CompletableFutureLongRunningUnitTest {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
})
.handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
}
@@ -47,7 +47,8 @@ public class ConcurrentMapAggregateStatusManualTest {
executorService.awaitTermination(1, TimeUnit.MINUTES);
for (int i = 1; i <= MAX_SIZE; i++) {
assertEquals("map size should be consistently reliable", i, mapSizes.get(i - 1).intValue());
assertEquals("map size should be consistently reliable", i, mapSizes.get(i - 1)
.intValue());
}
assertEquals(MAX_SIZE, concurrentMap.size());
}
@@ -69,7 +70,8 @@ public class ConcurrentMapAggregateStatusManualTest {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
assertNotEquals("map size collected with concurrent updates not reliable", MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
assertNotEquals("map size collected with concurrent updates not reliable", MAX_SIZE, mapSizes.get(MAX_SIZE - 1)
.intValue());
assertEquals(MAX_SIZE, concurrentMap.size());
}
@@ -16,8 +16,12 @@ public class ConcurretMapMemoryConsistencyManualTest {
public void givenConcurrentMap_whenSumParallel_thenCorrect() throws Exception {
Map<String, Integer> map = new ConcurrentHashMap<>();
List<Integer> sumList = parallelSum100(map, 1000);
assertEquals(1, sumList.stream().distinct().count());
long wrongResultCount = sumList.stream().filter(num -> num != 100).count();
assertEquals(1, sumList.stream()
.distinct()
.count());
long wrongResultCount = sumList.stream()
.filter(num -> num != 100)
.count();
assertEquals(0, wrongResultCount);
}
@@ -25,8 +29,12 @@ public class ConcurretMapMemoryConsistencyManualTest {
public void givenHashtable_whenSumParallel_thenCorrect() throws Exception {
Map<String, Integer> map = new Hashtable<>();
List<Integer> sumList = parallelSum100(map, 1000);
assertEquals(1, sumList.stream().distinct().count());
long wrongResultCount = sumList.stream().filter(num -> num != 100).count();
assertEquals(1, sumList.stream()
.distinct()
.count());
long wrongResultCount = sumList.stream()
.filter(num -> num != 100)
.count();
assertEquals(0, wrongResultCount);
}
@@ -34,8 +42,12 @@ public class ConcurretMapMemoryConsistencyManualTest {
public void givenHashMap_whenSumParallel_thenError() throws Exception {
Map<String, Integer> map = new HashMap<>();
List<Integer> sumList = parallelSum100(map, 100);
assertNotEquals(1, sumList.stream().distinct().count());
long wrongResultCount = sumList.stream().filter(num -> num != 100).count();
assertNotEquals(1, sumList.stream()
.distinct()
.count());
long wrongResultCount = sumList.stream()
.filter(num -> num != 100)
.count();
assertTrue(wrongResultCount > 0);
}