From 99a840fd03c3ea72bdb0abf34a98d65b85d27f04 Mon Sep 17 00:00:00 2001 From: DomWos Date: Sat, 13 Oct 2018 23:55:53 +0200 Subject: [PATCH 1/5] BAEL-1463: Apache Storm Introduction --- libraries-data/pom.xml | 6 ++ .../com/baeldung/storm/TopologyRunner.java | 34 ++++++++++ .../baeldung/storm/bolt/AggregatingBolt.java | 39 ++++++++++++ .../baeldung/storm/bolt/FileWritingBolt.java | 63 +++++++++++++++++++ .../baeldung/storm/bolt/FilteringBolt.java | 22 +++++++ .../com/baeldung/storm/bolt/PrintingBolt.java | 18 ++++++ .../storm/model/AggregatedWindow.java | 16 +++++ .../java/com/baeldung/storm/model/User.java | 40 ++++++++++++ .../storm/serialization/UserSerializer.java | 30 +++++++++ .../baeldung/storm/spout/RandomIntSpout.java | 35 +++++++++++ .../storm/spout/RandomNumberSpout.java | 40 ++++++++++++ 11 files changed, 343 insertions(+) create mode 100644 libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/model/User.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java create mode 100644 libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java diff --git a/libraries-data/pom.xml b/libraries-data/pom.xml index bbf0c7832f..54d24edbf6 100644 --- a/libraries-data/pom.xml +++ b/libraries-data/pom.xml @@ -259,6 +259,11 @@ slf4j-api ${slf4j.version} + + org.apache.storm + storm-core + ${storm.version} + ch.qos.logback @@ -432,6 +437,7 @@ + 1.2.2 4.0.1 1.4.196 16.5.1 diff --git a/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java b/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java new file mode 100644 index 0000000000..326f53c0b8 --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java @@ -0,0 +1,34 @@ +package com.baeldung.storm; + +import com.baeldung.storm.bolt.AggregatingBolt; +import com.baeldung.storm.bolt.FileWritingBolt; +import com.baeldung.storm.bolt.FilteringBolt; +import com.baeldung.storm.bolt.PrintingBolt; +import com.baeldung.storm.spout.RandomNumberSpout; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; + +public class TopologyRunner { + public static void main(String[] args) { + runTopology(); + } + + public static void runTopology() { + String filePath = "./src/main/resources/operations.txt"; + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("randomNumberSpout", new RandomNumberSpout()); + builder.setBolt("filteringBolt", new FilteringBolt()).shuffleGrouping("randomNumberSpout"); + builder.setBolt("aggregatingBolt", new AggregatingBolt() + .withTimestampField("timestamp") + .withLag(BaseWindowedBolt.Duration.seconds(1)) + .withWindow(BaseWindowedBolt.Duration.seconds(5))).shuffleGrouping("filteringBolt"); + builder.setBolt("fileBolt", new FileWritingBolt(filePath)).shuffleGrouping("aggregatingBolt"); + + Config config = new Config(); + config.setDebug(false); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("Test", config, builder.createTopology()); + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java new file mode 100644 index 0000000000..555ba7e692 --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java @@ -0,0 +1,39 @@ +package com.baeldung.storm.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class AggregatingBolt extends BaseWindowedBolt { + OutputCollector outputCollector; + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.outputCollector = collector; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); + } + + @Override + public void execute(TupleWindow tupleWindow) { + List tuples = tupleWindow.get(); + tuples.sort(Comparator.comparing(a -> a.getLongByField("timestamp"))); + //This is safe since the window is calculated basing on Tuple's timestamp, thus it can't really be empty + Long beginningTimestamp = tuples.get(0).getLongByField("timestamp"); + Long endTimestamp = tuples.get(tuples.size() - 1).getLongByField("timestamp"); + int sumOfOperations = tuples.stream().mapToInt(tuple -> tuple.getIntegerByField("operation")).sum(); + Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); + outputCollector.emit(values); + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java new file mode 100644 index 0000000000..a35ff3aaf5 --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java @@ -0,0 +1,63 @@ +package com.baeldung.storm.bolt; + +import com.baeldung.storm.model.AggregatedWindow; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Map; + +public class FileWritingBolt extends BaseRichBolt { + public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); + BufferedWriter writer; + String filePath; + ObjectMapper objectMapper; + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { + objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + try { + writer = new BufferedWriter(new FileWriter(filePath)); + } catch (IOException e) { + logger.error("Failed to open a file for writing.", e); + } + } + + @Override + public void execute(Tuple tuple) { + int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); + long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); + long endTimestamp = tuple.getLongByField("endTimestamp"); + + if(sumOfOperations > 200) { + AggregatedWindow aggregatedWindow = new AggregatedWindow(sumOfOperations, beginningTimestamp, endTimestamp); + try { + writer.write(objectMapper.writeValueAsString(aggregatedWindow)); + writer.write("\n"); + writer.flush(); + } catch (IOException e) { + logger.error("Failed to write data to file.", e); + } + } + } + + public FileWritingBolt(String filePath) { + this.filePath = filePath; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java new file mode 100644 index 0000000000..a2e80deb33 --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java @@ -0,0 +1,22 @@ +package com.baeldung.storm.bolt; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; + +public class FilteringBolt extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + int operation = tuple.getIntegerByField("operation"); + if(operation >= 0 ) { + basicOutputCollector.emit(tuple.getValues()); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java new file mode 100644 index 0000000000..efd2c9b1d9 --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java @@ -0,0 +1,18 @@ +package com.baeldung.storm.bolt; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +public class PrintingBolt extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + System.out.println(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java b/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java new file mode 100644 index 0000000000..beaf54d34c --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java @@ -0,0 +1,16 @@ +package com.baeldung.storm.model; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@JsonSerialize +public class AggregatedWindow { + int sumOfOperations; + long beginningTimestamp; + long endTimestamp; + + public AggregatedWindow(int sumOfOperations, long beginningTimestamp, long endTimestamp) { + this.sumOfOperations = sumOfOperations; + this.beginningTimestamp = beginningTimestamp; + this.endTimestamp = endTimestamp; + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/User.java b/libraries-data/src/main/java/com/baeldung/storm/model/User.java new file mode 100644 index 0000000000..62b9ac639b --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/model/User.java @@ -0,0 +1,40 @@ +package com.baeldung.storm.model; + +public class User { + String username; + String password; + String email; + int age; + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java b/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java new file mode 100644 index 0000000000..6199a203da --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java @@ -0,0 +1,30 @@ +package com.baeldung.storm.serialization; + + +import com.baeldung.storm.model.User; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class UserSerializer extends Serializer{ + @Override + public void write(Kryo kryo, Output output, User user) { + output.writeString(user.getEmail()); + output.writeString(user.getUsername()); + output.write(user.getAge()); + } + + @Override + public User read(Kryo kryo, Input input, Class aClass) { + User user = new User(); + String email = input.readString(); + String name = input.readString(); + int age = input.read(); + user.setAge(age); + user.setEmail(email); + user.setUsername(name); + + return user; + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java new file mode 100644 index 0000000000..669eb4f897 --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java @@ -0,0 +1,35 @@ +package com.baeldung.storm.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.Map; +import java.util.Random; + +public class RandomIntSpout extends BaseRichSpout { + + Random random; + SpoutOutputCollector outputCollector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + random = new Random(); + outputCollector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + Utils.sleep(1000); + outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); + } +} diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java new file mode 100644 index 0000000000..5d7d3cc53e --- /dev/null +++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java @@ -0,0 +1,40 @@ +package com.baeldung.storm.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.Map; +import java.util.Random; + +public class RandomNumberSpout extends BaseRichSpout { + Random random; + SpoutOutputCollector collector; + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + random = new Random(); + collector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + Utils.sleep(1000); + //This will select random int from the range (-1000, 1000) + int operation = random.nextInt(1000 + 1 + 1000) - 1000; + long timestamp = System.currentTimeMillis(); + + Values values = new Values(operation, timestamp); + collector.emit(values); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); + } +} From a0a393cdfc7e9e080e812f041bbf5a0f1fa127fb Mon Sep 17 00:00:00 2001 From: DomWos Date: Tue, 16 Oct 2018 10:39:18 +0200 Subject: [PATCH 2/5] BAEL-1463: Privatize Everything. --- .../java/com/baeldung/storm/bolt/AggregatingBolt.java | 2 +- .../java/com/baeldung/storm/bolt/FileWritingBolt.java | 7 +++---- .../src/main/java/com/baeldung/storm/model/User.java | 8 ++++---- .../java/com/baeldung/storm/spout/RandomIntSpout.java | 4 ++-- .../java/com/baeldung/storm/spout/RandomNumberSpout.java | 4 ++-- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java index 555ba7e692..c7263cd8d5 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.Map; public class AggregatingBolt extends BaseWindowedBolt { - OutputCollector outputCollector; + private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java index a35ff3aaf5..40ed72164d 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java @@ -4,7 +4,6 @@ import com.baeldung.storm.model.AggregatedWindow; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -20,9 +19,9 @@ import java.util.Map; public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); - BufferedWriter writer; - String filePath; - ObjectMapper objectMapper; + private BufferedWriter writer; + private String filePath; + private ObjectMapper objectMapper; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/User.java b/libraries-data/src/main/java/com/baeldung/storm/model/User.java index 62b9ac639b..eafbf0e1eb 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/model/User.java +++ b/libraries-data/src/main/java/com/baeldung/storm/model/User.java @@ -1,10 +1,10 @@ package com.baeldung.storm.model; public class User { - String username; - String password; - String email; - int age; + private String username; + private String password; + private String email; + private int age; public String getUsername() { return username; diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java index 669eb4f897..4a8ef76598 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java +++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java @@ -13,8 +13,8 @@ import java.util.Random; public class RandomIntSpout extends BaseRichSpout { - Random random; - SpoutOutputCollector outputCollector; + private Random random; + private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java index 5d7d3cc53e..371a61720a 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java +++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java @@ -13,8 +13,8 @@ import java.util.Map; import java.util.Random; public class RandomNumberSpout extends BaseRichSpout { - Random random; - SpoutOutputCollector collector; + private Random random; + private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { From 6fa2361283915ab562115dc7bc0b28b563206e41 Mon Sep 17 00:00:00 2001 From: DomWos Date: Mon, 22 Oct 2018 11:51:37 +0200 Subject: [PATCH 3/5] Fixes according to comments. --- .../java/com/baeldung/storm/bolt/FileWritingBolt.java | 10 ++++++++++ .../com/baeldung/storm/spout/RandomNumberSpout.java | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java index 40ed72164d..339e0dc055 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java @@ -22,6 +22,7 @@ public class FileWritingBolt extends BaseRichBolt { private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; + @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); @@ -55,6 +56,15 @@ public class FileWritingBolt extends BaseRichBolt { this.filePath = filePath; } + @Override + public void cleanup() { + try { + writer.close(); + } catch (IOException e) { + logger.error("Failed to close the writer!"); + } + } + @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java index 371a61720a..26fb1d82c0 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java +++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java @@ -26,7 +26,7 @@ public class RandomNumberSpout extends BaseRichSpout { public void nextTuple() { Utils.sleep(1000); //This will select random int from the range (-1000, 1000) - int operation = random.nextInt(1000 + 1 + 1000) - 1000; + int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); From aed1514c0b73bd0e00e11e7de6b4fa65842935e2 Mon Sep 17 00:00:00 2001 From: DomWos Date: Mon, 22 Oct 2018 11:54:39 +0200 Subject: [PATCH 4/5] Filtering fix. --- .../src/main/java/com/baeldung/storm/bolt/FilteringBolt.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java index a2e80deb33..564076a1df 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java +++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java @@ -10,7 +10,7 @@ public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); - if(operation >= 0 ) { + if(operation > 0 ) { basicOutputCollector.emit(tuple.getValues()); } } From 6f34ca1de2e1edb79c7751514446eda991500a0b Mon Sep 17 00:00:00 2001 From: DomWos Date: Tue, 23 Oct 2018 12:09:46 +0200 Subject: [PATCH 5/5] Changed the comment about random range to proper values. --- .../main/java/com/baeldung/storm/spout/RandomNumberSpout.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java index 26fb1d82c0..c9291cdc9d 100644 --- a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java +++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java @@ -25,7 +25,7 @@ public class RandomNumberSpout extends BaseRichSpout { @Override public void nextTuple() { Utils.sleep(1000); - //This will select random int from the range (-1000, 1000) + //This will select random int from the range (0, 100) int operation = random.nextInt(101); long timestamp = System.currentTimeMillis();