From 3286018dd2535f70950c96fa3289e1418dd33fd2 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Tue, 11 Apr 2017 13:08:11 +0200 Subject: [PATCH] Bael 766 flink (#1632) * BAEL-756 code for flink article * reorder * simpler wordCount example * BAEL-766 changes according to PR * BAEL-766 change datasource to dataset * BAEL-766 add sorting example * BAEL-766 add simple streaming example * one missing change to dataSet * windowing example * add window example * add dependency explicitly * add plugin * add surefire plugin, change neme of the test to *IntegrationTest * fluent assertions --- libraries/pom.xml | 15 +++++++++++++ .../flink/WordCountIntegrationTest.java | 22 +++++-------------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/libraries/pom.xml b/libraries/pom.xml index 80ec0edfea..1d908c453a 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -152,6 +152,21 @@ flink-test-utils_2.10 ${flink.version} + + org.apache.flink + flink-core + ${flink.version} + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-test-utils_2.10 + ${flink.version} + diff --git a/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java index 91a75c78ba..3190debef8 100644 --- a/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java +++ b/libraries/src/test/java/com/baeldung/flink/WordCountIntegrationTest.java @@ -1,6 +1,5 @@ package com.baeldung.flink; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -35,14 +34,10 @@ public class WordCountIntegrationTest { //then List> collect = result.collect(); - assertThat(collect.size()).isEqualTo(9); - assertThat(collect.contains(new Tuple2<>("a", 3))).isTrue(); - assertThat(collect.contains(new Tuple2<>("sentence", 2))).isTrue(); - assertThat(collect.contains(new Tuple2<>("word", 1))).isTrue(); - assertThat(collect.contains(new Tuple2<>("is", 2))).isTrue(); - assertThat(collect.contains(new Tuple2<>("this", 2))).isTrue(); - assertThat(collect.contains(new Tuple2<>("second", 1))).isTrue(); - assertThat(collect.contains(new Tuple2<>("first", 1))).isTrue(); + assertThat(collect).containsExactlyInAnyOrder( + new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1), + new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1), + new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1)); } @Test @@ -54,7 +49,7 @@ public class WordCountIntegrationTest { //when List collect = amounts .filter(a -> a > threshold) - .reduce((ReduceFunction) (integer, t1) -> integer + t1) + .reduce((integer, t1) -> integer + t1) .collect(); //then @@ -92,12 +87,7 @@ public class WordCountIntegrationTest { .collect(); //then - assertThat(sorted.size()).isEqualTo(4); - assertThat(sorted.get(0)).isEqualTo(firstPerson); - assertThat(sorted.get(1)).isEqualTo(secondPerson); - assertThat(sorted.get(2)).isEqualTo(thirdPerson); - assertThat(sorted.get(3)).isEqualTo(fourthPerson); - + assertThat(sorted).containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson); }