From b1352b58e08542038ab4973a16617a205e1b3226 Mon Sep 17 00:00:00 2001 From: Kumar Chandrakant Date: Sat, 5 Jan 2019 17:52:23 +0530 Subject: [PATCH] Adding files for the tutorial BAEL-2301 (#6066) --- apache-spark/pom.xml | 68 ++++++++- .../java/com/baeldung/data/pipeline/Word.java | 25 ++++ .../data/pipeline/WordCountingApp.java | 116 +++++++++++++++ .../WordCountingAppWithCheckpoint.java | 140 ++++++++++++++++++ 4 files changed, 345 insertions(+), 4 deletions(-) create mode 100644 apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java create mode 100644 apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java create mode 100644 apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 290b63a14d..b05b97198d 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -15,16 +15,76 @@ - org.apache.spark - spark-core_2.10 + spark-core_2.11 ${org.apache.spark.spark-core.version} + provided + + org.apache.spark + spark-sql_2.11 + ${org.apache.spark.spark-sql.version} + provided + + + org.apache.spark + spark-streaming_2.11 + ${org.apache.spark.spark-streaming.version} + provided + + + org.apache.spark + spark-streaming-kafka-0-10_2.11 + ${org.apache.spark.spark-streaming-kafka.version} + + + com.datastax.spark + spark-cassandra-connector_2.11 + ${com.datastax.spark.spark-cassandra-connector.version} + + + com.datastax.spark + spark-cassandra-connector-java_2.11 + ${com.datastax.spark.spark-cassandra-connector-java.version} + - + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.8 + 1.8 + + + + maven-assembly-plugin + + + package + + single + + + + + + jar-with-dependencies + + + + + - 2.2.0 + 2.3.0 + 2.3.0 + 2.3.0 + 2.3.0 + 2.3.0 + 1.5.2 diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java new file mode 100644 index 0000000000..b0caa468b1 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java @@ -0,0 +1,25 @@ +package com.baeldung.data.pipeline; + +import java.io.Serializable; + +public class Word implements Serializable { + private static final long serialVersionUID = 1L; + private String word; + private int count; + Word(String word, int count) { + this.word = word; + this.count = count; + } + public String getWord() { + return word; + } + public void setWord(String word) { + this.word = word; + } + public int getCount() { + return count; + } + public void setCount(int count) { + this.count = count; + } +} \ No newline at end of file diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java new file mode 100644 index 0000000000..08695b3631 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java @@ -0,0 +1,116 @@ +package com.baeldung.data.pipeline; + +import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; +import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka010.ConsumerStrategies; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; + +import scala.Tuple2; + +public class WordCountingApp { + + @SuppressWarnings("serial") + public static void main(String[] args) throws InterruptedException { + Logger.getLogger("org") + .setLevel(Level.OFF); + Logger.getLogger("akka") + .setLevel(Level.OFF); + + Map kafkaParams = new HashMap<>(); + kafkaParams.put("bootstrap.servers", "localhost:9092"); + kafkaParams.put("key.deserializer", StringDeserializer.class); + kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); + kafkaParams.put("auto.offset.reset", "latest"); + kafkaParams.put("enable.auto.commit", false); + + Collection topics = Arrays.asList("messages"); + + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster("local[2]"); + sparkConf.setAppName("WordCountingApp"); + sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); + + JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + + JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams)); + + JavaPairDStream results = messages.mapToPair(new PairFunction, String, String>() { + @Override + public Tuple2 call(ConsumerRecord record) { + return new Tuple2<>(record.key(), record.value()); + } + }); + + JavaDStream lines = results.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterator call(String x) { + return Arrays.asList(x.split("\\s+")) + .iterator(); + } + }); + + JavaPairDStream wordCounts = words.mapToPair(new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2<>(s, 1); + } + }) + .reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.foreachRDD(new VoidFunction>() { + @Override + public void call(JavaPairRDD javaRdd) throws Exception { + Map wordCountMap = javaRdd.collectAsMap(); + for (String key : wordCountMap.keySet()) { + List words = Arrays.asList(new Word(key, wordCountMap.get(key))); + JavaRDD rdd = streamingContext.sparkContext() + .parallelize(words); + javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)) + .saveToCassandra(); + } + } + }); + + streamingContext.start(); + streamingContext.awaitTermination(); + } +} \ No newline at end of file diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java new file mode 100644 index 0000000000..e20b910635 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java @@ -0,0 +1,140 @@ +package com.baeldung.data.pipeline; + +import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; +import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.Function3; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.State; +import org.apache.spark.streaming.StateSpec; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka010.ConsumerStrategies; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; + +import scala.Tuple2; + +public class WordCountingAppWithCheckpoint { + + public static JavaSparkContext sparkContext; + + @SuppressWarnings("serial") + public static void main(String[] args) throws InterruptedException { + + Logger.getLogger("org") + .setLevel(Level.OFF); + Logger.getLogger("akka") + .setLevel(Level.OFF); + + Map kafkaParams = new HashMap<>(); + kafkaParams.put("bootstrap.servers", "localhost:9092"); + kafkaParams.put("key.deserializer", StringDeserializer.class); + kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); + kafkaParams.put("auto.offset.reset", "latest"); + kafkaParams.put("enable.auto.commit", false); + + Collection topics = Arrays.asList("messages"); + + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster("local[2]"); + sparkConf.setAppName("WordCountingAppWithCheckpoint"); + sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); + + JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + + sparkContext = streamingContext.sparkContext(); + + streamingContext.checkpoint("./.checkpoint"); + + JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams)); + + JavaPairDStream results = messages.mapToPair(new PairFunction, String, String>() { + @Override + public Tuple2 call(ConsumerRecord record) { + return new Tuple2<>(record.key(), record.value()); + } + }); + + JavaDStream lines = results.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterator call(String x) { + return Arrays.asList(x.split("\\s+")) + .iterator(); + } + }); + + JavaPairDStream wordCounts = words.mapToPair(new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2<>(s, 1); + } + }) + .reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + Function3, State, Tuple2> mappingFunc = (word, one, state) -> { + int sum = one.orElse(0) + (state.exists() ? state.get() : 0); + Tuple2 output = new Tuple2<>(word, sum); + state.update(sum); + return output; + }; + + JavaPairRDD initialRDD = JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD()); + + JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function(mappingFunc) + .initialState(initialRDD)); + + cumulativeWordCounts.foreachRDD(new VoidFunction>>() { + @Override + public void call(JavaRDD> javaRdd) throws Exception { + List> wordCountList = javaRdd.collect(); + for (Tuple2 tuple : wordCountList) { + List words = Arrays.asList(new Word(tuple._1, tuple._2)); + JavaRDD rdd = sparkContext.parallelize(words); + javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)) + .saveToCassandra(); + } + } + }); + + streamingContext.start(); + streamingContext.awaitTermination(); + } +} \ No newline at end of file