diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml
index 290b63a14d..b05b97198d 100644
--- a/apache-spark/pom.xml
+++ b/apache-spark/pom.xml
@@ -15,16 +15,76 @@
-
org.apache.spark
- spark-core_2.10
+ spark-core_2.11
${org.apache.spark.spark-core.version}
+ provided
+
+ org.apache.spark
+ spark-sql_2.11
+ ${org.apache.spark.spark-sql.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming_2.11
+ ${org.apache.spark.spark-streaming.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming-kafka-0-10_2.11
+ ${org.apache.spark.spark-streaming-kafka.version}
+
+
+ com.datastax.spark
+ spark-cassandra-connector_2.11
+ ${com.datastax.spark.spark-cassandra-connector.version}
+
+
+ com.datastax.spark
+ spark-cassandra-connector-java_2.11
+ ${com.datastax.spark.spark-cassandra-connector-java.version}
+
-
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.2
+
+ 1.8
+ 1.8
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
- 2.2.0
+ 2.3.0
+ 2.3.0
+ 2.3.0
+ 2.3.0
+ 2.3.0
+ 1.5.2
diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java
new file mode 100644
index 0000000000..b0caa468b1
--- /dev/null
+++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/Word.java
@@ -0,0 +1,25 @@
+package com.baeldung.data.pipeline;
+
+import java.io.Serializable;
+
+public class Word implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private String word;
+ private int count;
+ Word(String word, int count) {
+ this.word = word;
+ this.count = count;
+ }
+ public String getWord() {
+ return word;
+ }
+ public void setWord(String word) {
+ this.word = word;
+ }
+ public int getCount() {
+ return count;
+ }
+ public void setCount(int count) {
+ this.count = count;
+ }
+}
\ No newline at end of file
diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java
new file mode 100644
index 0000000000..08695b3631
--- /dev/null
+++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java
@@ -0,0 +1,116 @@
+package com.baeldung.data.pipeline;
+
+import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
+import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka010.ConsumerStrategies;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+
+import scala.Tuple2;
+
+public class WordCountingApp {
+
+ @SuppressWarnings("serial")
+ public static void main(String[] args) throws InterruptedException {
+ Logger.getLogger("org")
+ .setLevel(Level.OFF);
+ Logger.getLogger("akka")
+ .setLevel(Level.OFF);
+
+ Map kafkaParams = new HashMap<>();
+ kafkaParams.put("bootstrap.servers", "localhost:9092");
+ kafkaParams.put("key.deserializer", StringDeserializer.class);
+ kafkaParams.put("value.deserializer", StringDeserializer.class);
+ kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
+ kafkaParams.put("auto.offset.reset", "latest");
+ kafkaParams.put("enable.auto.commit", false);
+
+ Collection topics = Arrays.asList("messages");
+
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.setMaster("local[2]");
+ sparkConf.setAppName("WordCountingApp");
+ sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
+
+ JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+
+ JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
+
+ JavaPairDStream results = messages.mapToPair(new PairFunction, String, String>() {
+ @Override
+ public Tuple2 call(ConsumerRecord record) {
+ return new Tuple2<>(record.key(), record.value());
+ }
+ });
+
+ JavaDStream lines = results.map(new Function, String>() {
+ @Override
+ public String call(Tuple2 tuple2) {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream words = lines.flatMap(new FlatMapFunction() {
+ @Override
+ public Iterator call(String x) {
+ return Arrays.asList(x.split("\\s+"))
+ .iterator();
+ }
+ });
+
+ JavaPairDStream wordCounts = words.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(String s) {
+ return new Tuple2<>(s, 1);
+ }
+ })
+ .reduceByKey(new Function2() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.foreachRDD(new VoidFunction>() {
+ @Override
+ public void call(JavaPairRDD javaRdd) throws Exception {
+ Map wordCountMap = javaRdd.collectAsMap();
+ for (String key : wordCountMap.keySet()) {
+ List words = Arrays.asList(new Word(key, wordCountMap.get(key)));
+ JavaRDD rdd = streamingContext.sparkContext()
+ .parallelize(words);
+ javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
+ .saveToCassandra();
+ }
+ }
+ });
+
+ streamingContext.start();
+ streamingContext.awaitTermination();
+ }
+}
\ No newline at end of file
diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java
new file mode 100644
index 0000000000..e20b910635
--- /dev/null
+++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java
@@ -0,0 +1,140 @@
+package com.baeldung.data.pipeline;
+
+import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
+import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.Function3;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.State;
+import org.apache.spark.streaming.StateSpec;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka010.ConsumerStrategies;
+import org.apache.spark.streaming.kafka010.KafkaUtils;
+import org.apache.spark.streaming.kafka010.LocationStrategies;
+
+import scala.Tuple2;
+
+public class WordCountingAppWithCheckpoint {
+
+ public static JavaSparkContext sparkContext;
+
+ @SuppressWarnings("serial")
+ public static void main(String[] args) throws InterruptedException {
+
+ Logger.getLogger("org")
+ .setLevel(Level.OFF);
+ Logger.getLogger("akka")
+ .setLevel(Level.OFF);
+
+ Map kafkaParams = new HashMap<>();
+ kafkaParams.put("bootstrap.servers", "localhost:9092");
+ kafkaParams.put("key.deserializer", StringDeserializer.class);
+ kafkaParams.put("value.deserializer", StringDeserializer.class);
+ kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
+ kafkaParams.put("auto.offset.reset", "latest");
+ kafkaParams.put("enable.auto.commit", false);
+
+ Collection topics = Arrays.asList("messages");
+
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.setMaster("local[2]");
+ sparkConf.setAppName("WordCountingAppWithCheckpoint");
+ sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
+
+ JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+
+ sparkContext = streamingContext.sparkContext();
+
+ streamingContext.checkpoint("./.checkpoint");
+
+ JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
+
+ JavaPairDStream results = messages.mapToPair(new PairFunction, String, String>() {
+ @Override
+ public Tuple2 call(ConsumerRecord record) {
+ return new Tuple2<>(record.key(), record.value());
+ }
+ });
+
+ JavaDStream lines = results.map(new Function, String>() {
+ @Override
+ public String call(Tuple2 tuple2) {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream words = lines.flatMap(new FlatMapFunction() {
+ @Override
+ public Iterator call(String x) {
+ return Arrays.asList(x.split("\\s+"))
+ .iterator();
+ }
+ });
+
+ JavaPairDStream wordCounts = words.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(String s) {
+ return new Tuple2<>(s, 1);
+ }
+ })
+ .reduceByKey(new Function2() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ Function3, State, Tuple2> mappingFunc = (word, one, state) -> {
+ int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
+ Tuple2 output = new Tuple2<>(word, sum);
+ state.update(sum);
+ return output;
+ };
+
+ JavaPairRDD initialRDD = JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD());
+
+ JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function(mappingFunc)
+ .initialState(initialRDD));
+
+ cumulativeWordCounts.foreachRDD(new VoidFunction>>() {
+ @Override
+ public void call(JavaRDD> javaRdd) throws Exception {
+ List> wordCountList = javaRdd.collect();
+ for (Tuple2 tuple : wordCountList) {
+ List words = Arrays.asList(new Word(tuple._1, tuple._2));
+ JavaRDD rdd = sparkContext.parallelize(words);
+ javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
+ .saveToCassandra();
+ }
+ }
+ });
+
+ streamingContext.start();
+ streamingContext.awaitTermination();
+ }
+}
\ No newline at end of file