楼主: ReneeBK
1638 18

Apache Storm Tutorial [推广有奖]

11
ReneeBK 发表于 2017-3-25 09:19:32
  1. Aggregation
  2. Aggregation is an object used to perform aggregation operations on an input batch or partition or stream. Trident has three types of aggregation. They are as follows −

  3. aggregate − Aggregates each batch of trident tuple in isolation. During the aggregate process, the tuples are initially repartitioned using the global grouping to combine all partitions of the same batch into a single partition.

  4. partitionAggregate − Aggregates each partition instead of the entire batch of trident tuple. The output of the partition aggregate completely replaces the input tuple. The output of the partition aggregate contains a single field tuple.

  5. persistentaggregate − Aggregates on all trident tuple across all batch and stores the result in either memory or database.

  6. TridentTopology topology = new TridentTopology();

  7. // aggregate operation
  8. topology.newStream("spout", spout)
  9.    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  10.    .aggregate(new Count(), new Fields(“count”))
  11.        
  12. // partitionAggregate operation
  13. topology.newStream("spout", spout)
  14.    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  15.    .partitionAggregate(new Count(), new Fields(“count"))
  16.        
  17. // persistentAggregate - saving the count to memory
  18. topology.newStream("spout", spout)
  19.    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  20.    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
复制代码

12
ReneeBK 发表于 2017-3-25 09:20:41
  1. Grouping
  2. Grouping operation is an inbuilt operation and can be called by the groupBy method. The groupBy method repartitions the stream by doing a partitionBy on the specified fields, and then within each partition, it groups tuples together whose group fields are equal. Normally, we use “groupBy” along with “persistentAggregate” to get the grouped aggregation. The sample code is as follows −

  3. TridentTopology topology = new TridentTopology();

  4. // persistentAggregate - saving the count to memory
  5. topology.newStream("spout", spout)
  6.    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
  7.    .groupBy(new Fields(“d”)
  8.    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
复制代码

13
ReneeBK 发表于 2017-3-25 09:21:36
  1. Merging and Joining
  2. Merging and joining can be done by using “merge” and “join” method respectively. Merging combines one or more streams. Joining is similar to merging, except the fact that joining uses trident tuple field from both sides to check and join two streams. Moreover, joining will work under batch level only. The sample code is as follows −

  3. TridentTopology topology = new TridentTopology();
  4. topology.merge(stream1, stream2, stream3);
  5. topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
  6.    new Fields("key", "a", "b", "c"));
复制代码

14
ReneeBK 发表于 2017-3-25 09:31:19
  1. Formatting the call information
  2. The purpose of the FormatCall class is to format the call information comprising “Caller number” and “Receiver number”. The complete program code is as follows −

  3. Coding: FormatCall.java
  4. import backtype.storm.tuple.Values;

  5. import storm.trident.operation.BaseFunction;
  6. import storm.trident.operation.TridentCollector;
  7. import storm.trident.tuple.TridentTuple;

  8. public class FormatCall extends BaseFunction {
  9.    @Override
  10.    public void execute(TridentTuple tuple, TridentCollector collector) {
  11.       String fromMobileNumber = tuple.getString(0);
  12.       String toMobileNumber = tuple.getString(1);
  13.       collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
  14.    }
  15. }
复制代码

15
ReneeBK 发表于 2017-3-25 09:33:42
  1. Hashtag Reader Bolt
  2. The tweet emitted by spout will be forwarded to HashtagReaderBolt, which will process the tweet and emit all the available hashtags. HashtagReaderBolt uses getHashTagEntities method provided by twitter4j. getHashTagEntities reads the tweet and returns the list of hashtag. The complete program code is as follows −

  3. Coding: HashtagReaderBolt.java
  4. import java.util.HashMap;
  5. import java.util.Map;

  6. import twitter4j.*;
  7. import twitter4j.conf.*;

  8. import backtype.storm.tuple.Fields;
  9. import backtype.storm.tuple.Values;

  10. import backtype.storm.task.OutputCollector;
  11. import backtype.storm.task.TopologyContext;
  12. import backtype.storm.topology.IRichBolt;
  13. import backtype.storm.topology.OutputFieldsDeclarer;
  14. import backtype.storm.tuple.Tuple;

  15. public class HashtagReaderBolt implements IRichBolt {
  16.    private OutputCollector collector;

  17.    @Override
  18.    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  19.       this.collector = collector;
  20.    }

  21.    @Override
  22.    public void execute(Tuple tuple) {
  23.       Status tweet = (Status) tuple.getValueByField("tweet");
  24.       for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
  25.          System.out.println("Hashtag: " + hashtage.getText());
  26.          this.collector.emit(new Values(hashtage.getText()));
  27.       }
  28.    }

  29.    @Override
  30.    public void cleanup() {}

  31.    @Override
  32.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
  33.       declarer.declare(new Fields("hashtag"));
  34.    }
  35.        
  36.    @Override
  37.    public Map<String, Object> getComponentConfiguration() {
  38.       return null;
  39.    }
  40.        
  41. }
复制代码

16
ReneeBK 发表于 2017-3-25 09:37:21
  1. Coding: HashtagCounterBolt.java
  2. import java.util.HashMap;
  3. import java.util.Map;

  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Values;

  6. import backtype.storm.task.OutputCollector;
  7. import backtype.storm.task.TopologyContext;

  8. import backtype.storm.topology.IRichBolt;
  9. import backtype.storm.topology.OutputFieldsDeclarer;
  10. import backtype.storm.tuple.Tuple;

  11. public class HashtagCounterBolt implements IRichBolt {
  12.    Map<String, Integer> counterMap;
  13.    private OutputCollector collector;

  14.    @Override
  15.    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  16.       this.counterMap = new HashMap<String, Integer>();
  17.       this.collector = collector;
  18.    }

  19.    @Override
  20.    public void execute(Tuple tuple) {
  21.       String key = tuple.getString(0);

  22.       if(!counterMap.containsKey(key)){
  23.          counterMap.put(key, 1);
  24.       }else{
  25.          Integer c = counterMap.get(key) + 1;
  26.          counterMap.put(key, c);
  27.       }
  28.                
  29.       collector.ack(tuple);
  30.    }

  31.    @Override
  32.    public void cleanup() {
  33.       for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
  34.          System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
  35.       }
  36.    }

  37.    @Override
  38.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
  39.       declarer.declare(new Fields("hashtag"));
  40.    }
  41.        
  42.    @Override
  43.    public Map<String, Object> getComponentConfiguration() {
  44.       return null;
  45.    }
  46.        
  47. }
复制代码

17
ReneeBK 发表于 2017-3-25 09:40:16
  1. Submitting a Topology
  2. Submitting a topology is the main application. Twitter topology consists of TwitterSampleSpout, HashtagReaderBolt, and HashtagCounterBolt. The following program code shows how to submit a topology.

  3. Coding: TwitterHashtagStorm.java
  4. import java.util.*;

  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Values;
  7. import backtype.storm.Config;
  8. import backtype.storm.LocalCluster;
  9. import backtype.storm.topology.TopologyBuilder;

  10. public class TwitterHashtagStorm {
  11.    public static void main(String[] args) throws Exception{
  12.       String consumerKey = args[0];
  13.       String consumerSecret = args[1];
  14.                
  15.       String accessToken = args[2];
  16.       String accessTokenSecret = args[3];
  17.                
  18.       String[] arguments = args.clone();
  19.       String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
  20.                
  21.       Config config = new Config();
  22.       config.setDebug(true);
  23.                
  24.       TopologyBuilder builder = new TopologyBuilder();
  25.       builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
  26.          consumerSecret, accessToken, accessTokenSecret, keyWords));

  27.       builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
  28.          .shuffleGrouping("twitter-spout");

  29.       builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
  30.          .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
  31.                        
  32.       LocalCluster cluster = new LocalCluster();
  33.       cluster.submitTopology("TwitterHashtagStorm", config,
  34.          builder.createTopology());
  35.       Thread.sleep(10000);
  36.       cluster.shutdown();
  37.    }
  38. }
复制代码

18
ReneeBK 发表于 2017-3-25 09:41:55
  1. Submitting a Topology
  2. This is the main application where YahooFinanceSpout.java and PriceCutOffBolt.java are connected together and produce a topology. The following program code shows how you can submit a topology.

  3. Coding: YahooFinanceStorm.java
  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Values;

  6. import backtype.storm.Config;
  7. import backtype.storm.LocalCluster;
  8. import backtype.storm.topology.TopologyBuilder;

  9. public class YahooFinanceStorm {
  10.    public static void main(String[] args) throws Exception{
  11.       Config config = new Config();
  12.       config.setDebug(true);
  13.                
  14.       TopologyBuilder builder = new TopologyBuilder();
  15.       builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

  16.       builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
  17.          .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
  18.                        
  19.       LocalCluster cluster = new LocalCluster();
  20.       cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
  21.       Thread.sleep(10000);
  22.       cluster.shutdown();
  23.    }
  24. }
复制代码

19
franky_sas 发表于 2017-3-25 10:37:11
多谢分享!

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2026-1-3 03:28