diff --git a/apache-beam/pom.xml b/apache-beam/pom.xml new file mode 100644 index 0000000000..7a714ac480 --- /dev/null +++ b/apache-beam/pom.xml @@ -0,0 +1,43 @@ + + 4.0.0 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + com.baeldung.apache + apache-beam + 0.0.1-SNAPSHOT + + + + org.apache.beam + beam-sdks-java-core + ${beam.version} + + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + runtime + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + 2.19.0 + 3.6.1 + + + diff --git a/apache-beam/src/main/java/com/baeldung/apache/beam/intro/WordCount.java b/apache-beam/src/main/java/com/baeldung/apache/beam/intro/WordCount.java new file mode 100644 index 0000000000..f2dfb47810 --- /dev/null +++ b/apache-beam/src/main/java/com/baeldung/apache/beam/intro/WordCount.java @@ -0,0 +1,71 @@ +package com.baeldung.apache.beam.intro; + +import java.util.Arrays; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.FlatMapElements; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; + +public class WordCount { + + public static boolean wordCount(String inputFilePath, String outputFilePath) { + // We use default options + PipelineOptions options = PipelineOptionsFactory.create(); + // to create the pipeline + Pipeline p = Pipeline.create(options); + // Here is our workflow graph + PCollection> wordCount = p + .apply("(1) Read all lines", TextIO.read().from(inputFilePath)) + .apply("(2) Flatmap to a list of words", FlatMapElements.into(TypeDescriptors.strings()) + .via(line -> Arrays.asList(line.split("\\s")))) + .apply("(3) Lowercase all", MapElements.into(TypeDescriptors.strings()) + .via(word -> word.toLowerCase())) + .apply("(4) Trim punctuations", MapElements.into(TypeDescriptors.strings()) + .via(word -> trim(word))) + .apply("(5) Filter stopwords", Filter.by(word -> !isStopWord(word))) + .apply("(6) Count words", Count.perElement()); + // We convert the PCollection to String so that we can write it to file + wordCount.apply(MapElements.into(TypeDescriptors.strings()) + .via(count -> count.getKey() + " --> " + count.getValue())) + .apply(TextIO.write().to(outputFilePath)); + // Finally we must run the pipeline, otherwise it's only a definition + p.run().waitUntilFinish(); + return true; + } + + public static boolean isStopWord(String word) { + String[] stopwords = {"am", "are", "is", "i", "you", "me", + "he", "she", "they", "them", "was", + "were", "from", "in", "of", "to", "be", + "him", "her", "us", "and", "or"}; + for (String stopword : stopwords) { + if (stopword.compareTo(word) == 0) { + return true; + } + } + return false; + } + + public static String trim(String word) { + return word.replace("(","") + .replace(")", "") + .replace(",", "") + .replace(".", "") + .replace("\"", "") + .replace("'", "") + .replace(":", "") + .replace(";", "") + .replace("-", "") + .replace("?", "") + .replace("!", ""); + } + +} diff --git a/apache-beam/src/test/java/com/baeldung/apache/beam/intro/WordCountUnitTest.java b/apache-beam/src/test/java/com/baeldung/apache/beam/intro/WordCountUnitTest.java new file mode 100644 index 0000000000..f2558635dc --- /dev/null +++ b/apache-beam/src/test/java/com/baeldung/apache/beam/intro/WordCountUnitTest.java @@ -0,0 +1,19 @@ +package com.baeldung.apache.beam.intro; + +import static org.junit.Assert.assertTrue; + +import org.junit.Ignore; +import org.junit.Test; + +import com.baeldung.apache.beam.intro.WordCount; + +public class WordCountUnitTest { + + @Test + // @Ignore + public void givenInputFile_whenWordCountRuns_thenJobFinishWithoutError() { + boolean jobDone = WordCount.wordCount("src/test/resources/wordcount.txt", "target/output"); + assertTrue(jobDone); + } + +} diff --git a/apache-beam/src/test/resources/wordcount.txt b/apache-beam/src/test/resources/wordcount.txt new file mode 100644 index 0000000000..542385379b --- /dev/null +++ b/apache-beam/src/test/resources/wordcount.txt @@ -0,0 +1,16 @@ +We've all heard the scare stories about North Korea: the homemade nuclear arsenal built while their people starve and then aimed imprecisely at the rest of the world, a +leader so deluded he makes L Ron Hubbard look like a man excessively overburdened with self-doubt and their deep-seated belief that foreign capitalists will invade at any +moment and steal all their bauxite. +The popular portrayal of this Marxist nation is something like one of the more harrowing episodes of M*A*S*H, only with the cast of wacky characters replaced by twitchy, +heavily armed Stalinist meth addicts +Cracked would like to take a moment to celebrate the good things about North Korea though, the things that the country's enemies prefer to suppress as part of their politically +motivated jealousy. Like how no different to you and me, there's nothing every North Korean likes more after an 18 hour shift at the phosphorus plant than a nice beer to go with +his dried fish ration. Ever attentive to its people's needs and in the twinkling of a decade, North Korea's leadership bought, disassembled, transported and rebuilt a British +brewery in order to discover and reproduce the secrets of beer and then brew the sweet nectar for its hardworking people, up to 18 bottles at a time. And with minimal fatalities. +When was the last time YOUR leader got a beer for YOU, American? (NB do not answer this question if you are Henry Louis Gates). +Or how about the fried chicken restaurant that downtown Pyongyang boasts? Yes real chicken, fried and then delivered to your sleeping cube, with optional beer if you like! You +don't even have to remove the feathers or pull out the gizzard yourself. Mostly. Americans must eat their fried chicken from a bucket, like swine, sold by a company so secretive +that even the very blend of seasoning used is intentionally kept from them. And they call North Korea paranoid? +And how many nations would entertain the syphilitic, bourgeois ramblings of Bill Clinton let alone permit him anywhere near their proud womenfolk? Only wise Kim Jong Il could see +past Bill's many, many imperfections and treat him with the pity and kindness he deserves, accepting his feeble pleas to pardon the American spies rightly convicted of photographing +the nation's sensitive beetroot fields. diff --git a/pom.xml b/pom.xml index 0bcfac7963..2cefcc284f 100644 --- a/pom.xml +++ b/pom.xml @@ -862,6 +862,7 @@ antlr apache-avro + apache-beam apache-bval apache-curator apache-cxf