The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.
As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now create a python implementation named "splitword.py".
Trident spout is similar to Storm spout, with additional options to use the features of Trident. Actually, we can still use the IRichSpout, which we have used in Storm topology, but it will be non-transactional in nature and we won’t be able to use the advantages provided by Trident.
The basic spout having all the functionality to use the features of Trident is "ITridentSpout". It supports both transactional and opaque transactional semantics. The other spouts are IBatchSpout, IPartitionedTridentSpout, and IOpaquePartitionedTridentSpout.
In addition to these generic spouts, Trident has many sample implementation of trident spout. One of them is FeederBatchSpout spout, which we can use to send named list of trident tuples easily without worrying about batch processing, parallelism, etc.
FeederBatchSpout creation and data feeding can be done as shown below −
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation −