楼主: ReneeBK
1637 18

Apache Storm Tutorial [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.7537
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2017-3-25 09:04:15 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. Apache Storm Tutorial
  2. Apache Storm - Home
  3. Apache Storm - Introduction
  4. Apache Storm - Core Concepts
  5. Apache Storm - Cluster Architecture
  6. Apache Storm - Workflow
  7. Storm - Distributed Msging System
  8. Apache Storm - Installation
  9. Apache Storm - Working Example
  10. Apache Storm - Trident
  11. Apache Storm in Twitter
  12. Apache Storm in Yahoo! Finance
  13. Apache Storm - Applications
  14. Apache Storm Useful Resources
  15. Apache Storm - Quick Guide
  16. Apache Storm - Useful Resources
  17. Apache Storm - Discussion
  18. Selected Reading
  19. Developer's Best Practices
  20. Questions and Answers
  21. Effective Resume Writing
  22. HR Interview Questions
  23. Computer Glossary
  24. Who is Who
复制代码


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Tutorial apache Storm tutor Tori

沙发
ReneeBK 发表于 2017-3-25 09:05:54
  1. Coding − FakeCallLogReaderSpout.java
  2. import java.util.*;
  3. //import storm tuple packages
  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Values;

  6. //import Spout interface packages
  7. import backtype.storm.topology.IRichSpout;
  8. import backtype.storm.topology.OutputFieldsDeclarer;
  9. import backtype.storm.spout.SpoutOutputCollector;
  10. import backtype.storm.task.TopologyContext;

  11. //Create a class FakeLogReaderSpout which implement IRichSpout interface
  12.    to access functionalities
  13.        
  14. public class FakeCallLogReaderSpout implements IRichSpout {
  15.    //Create instance for SpoutOutputCollector which passes tuples to bolt.
  16.    private SpoutOutputCollector collector;
  17.    private boolean completed = false;
  18.        
  19.    //Create instance for TopologyContext which contains topology data.
  20.    private TopologyContext context;
  21.        
  22.    //Create instance for Random class.
  23.    private Random randomGenerator = new Random();
  24.    private Integer idx = 0;

  25.    @Override
  26.    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  27.       this.context = context;
  28.       this.collector = collector;
  29.    }

  30.    @Override
  31.    public void nextTuple() {
  32.       if(this.idx <= 1000) {
  33.          List<String> mobileNumbers = new ArrayList<String>();
  34.          mobileNumbers.add("1234123401");
  35.          mobileNumbers.add("1234123402");
  36.          mobileNumbers.add("1234123403");
  37.          mobileNumbers.add("1234123404");

  38.          Integer localIdx = 0;
  39.          while(localIdx++ < 100 && this.idx++ < 1000) {
  40.             String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
  41.             String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
  42.                                
  43.             while(fromMobileNumber == toMobileNumber) {
  44.                toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
  45.             }
  46.                                
  47.             Integer duration = randomGenerator.nextInt(60);
  48.             this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
  49.          }
  50.       }
  51.    }

  52.    @Override
  53.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
  54.       declarer.declare(new Fields("from", "to", "duration"));
  55.    }

  56.    //Override all the interface methods
  57.    @Override
  58.    public void close() {}

  59.    public boolean isDistributed() {
  60.       return false;
  61.    }

  62.    @Override
  63.    public void activate() {}

  64.    @Override
  65.    public void deactivate() {}

  66.    @Override
  67.    public void ack(Object msgId) {}

  68.    @Override
  69.    public void fail(Object msgId) {}

  70.    @Override
  71.    public Map<String, Object> getComponentConfiguration() {
  72.       return null;
  73.    }
  74. }
复制代码

藤椅
ReneeBK 发表于 2017-3-25 09:07:05
  1. Coding − CallLogCreatorBolt.java
  2. //import util packages
  3. import java.util.HashMap;
  4. import java.util.Map;

  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Values;
  7. import backtype.storm.task.OutputCollector;
  8. import backtype.storm.task.TopologyContext;

  9. //import Storm IRichBolt package
  10. import backtype.storm.topology.IRichBolt;
  11. import backtype.storm.topology.OutputFieldsDeclarer;
  12. import backtype.storm.tuple.Tuple;

  13. //Create a class CallLogCreatorBolt which implement IRichBolt interface
  14. public class CallLogCreatorBolt implements IRichBolt {
  15.    //Create instance for OutputCollector which collects and emits tuples to produce output
  16.    private OutputCollector collector;

  17.    @Override
  18.    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  19.       this.collector = collector;
  20.    }

  21.    @Override
  22.    public void execute(Tuple tuple) {
  23.       String from = tuple.getString(0);
  24.       String to = tuple.getString(1);
  25.       Integer duration = tuple.getInteger(2);
  26.       collector.emit(new Values(from + " - " + to, duration));
  27.    }

  28.    @Override
  29.    public void cleanup() {}

  30.    @Override
  31.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
  32.       declarer.declare(new Fields("call", "duration"));
  33.    }
  34.        
  35.    @Override
  36.    public Map<String, Object> getComponentConfiguration() {
  37.       return null;
  38.    }
  39. }
复制代码

板凳
ReneeBK 发表于 2017-3-25 09:08:04
  1. Coding − CallLogCounterBolt.java
  2. import java.util.HashMap;
  3. import java.util.Map;

  4. import backtype.storm.tuple.Fields;
  5. import backtype.storm.tuple.Values;
  6. import backtype.storm.task.OutputCollector;
  7. import backtype.storm.task.TopologyContext;
  8. import backtype.storm.topology.IRichBolt;
  9. import backtype.storm.topology.OutputFieldsDeclarer;
  10. import backtype.storm.tuple.Tuple;

  11. public class CallLogCounterBolt implements IRichBolt {
  12.    Map<String, Integer> counterMap;
  13.    private OutputCollector collector;

  14.    @Override
  15.    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  16.       this.counterMap = new HashMap<String, Integer>();
  17.       this.collector = collector;
  18.    }

  19.    @Override
  20.    public void execute(Tuple tuple) {
  21.       String call = tuple.getString(0);
  22.       Integer duration = tuple.getInteger(1);
  23.                
  24.       if(!counterMap.containsKey(call)){
  25.          counterMap.put(call, 1);
  26.       }else{
  27.          Integer c = counterMap.get(call) + 1;
  28.          counterMap.put(call, c);
  29.       }
  30.                
  31.       collector.ack(tuple);
  32.    }

  33.    @Override
  34.    public void cleanup() {
  35.       for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
  36.          System.out.println(entry.getKey()+" : " + entry.getValue());
  37.       }
  38.    }

  39.    @Override
  40.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
  41.       declarer.declare(new Fields("call"));
  42.    }
  43.        
  44.    @Override
  45.    public Map<String, Object> getComponentConfiguration() {
  46.       return null;
  47.    }
  48.        
  49. }
复制代码

报纸
ReneeBK 发表于 2017-3-25 09:09:22
  1. Creating Topology
  2. The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −

  3. TopologyBuilder builder = new TopologyBuilder();

  4. builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

  5. builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
  6.    .shuffleGrouping("call-log-reader-spout");

  7. builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
  8.    .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
复制代码

地板
ReneeBK 发表于 2017-3-25 09:10:29
  1. Coding − LogAnalyserStorm.java
  2. import backtype.storm.tuple.Fields;
  3. import backtype.storm.tuple.Values;

  4. //import storm configuration packages
  5. import backtype.storm.Config;
  6. import backtype.storm.LocalCluster;
  7. import backtype.storm.topology.TopologyBuilder;

  8. //Create main class LogAnalyserStorm submit topology.
  9. public class LogAnalyserStorm {
  10.    public static void main(String[] args) throws Exception{
  11.       //Create Config instance for cluster configuration
  12.       Config config = new Config();
  13.       config.setDebug(true);
  14.                
  15.       //
  16.       TopologyBuilder builder = new TopologyBuilder();
  17.       builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

  18.       builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
  19.          .shuffleGrouping("call-log-reader-spout");

  20.       builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
  21.          .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
  22.                        
  23.       LocalCluster cluster = new LocalCluster();
  24.       cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
  25.       Thread.sleep(10000);
  26.                
  27.       //Stop the topology
  28.                
  29.       cluster.shutdown();
  30.    }
  31. }
复制代码

7
ReneeBK 发表于 2017-3-25 09:13:28
  1. Python Binding
  2. Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.

  3. As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.

  4. public static class WordCount implements IRichBolt {
  5.    public WordSplit() {
  6.       super("python", "splitword.py");
  7.    }
  8.        
  9.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
  10.       declarer.declare(new Fields("word"));
  11.    }
  12. }
  13. Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now create a python implementation named "splitword.py".

  14. import storm
  15.    class WordCountBolt(storm.BasicBolt):
  16.       def process(self, tup):
  17.          words = tup.values[0].split(" ")
  18.          for word in words:
  19.          storm.emit([word])
  20. WordCountBolt().run()
复制代码

8
ReneeBK 发表于 2017-3-25 09:16:52
  1. Trident Spout
  2. Trident spout is similar to Storm spout, with additional options to use the features of Trident. Actually, we can still use the IRichSpout, which we have used in Storm topology, but it will be non-transactional in nature and we won’t be able to use the advantages provided by Trident.

  3. The basic spout having all the functionality to use the features of Trident is "ITridentSpout". It supports both transactional and opaque transactional semantics. The other spouts are IBatchSpout, IPartitionedTridentSpout, and IOpaquePartitionedTridentSpout.

  4. In addition to these generic spouts, Trident has many sample implementation of trident spout. One of them is FeederBatchSpout spout, which we can use to send named list of trident tuples easily without worrying about batch processing, parallelism, etc.

  5. FeederBatchSpout creation and data feeding can be done as shown below −

  6. TridentTopology topology = new TridentTopology();
  7. FeederBatchSpout testSpout = new FeederBatchSpout(
  8.    ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
  9. topology.newStream("fixed-batch-spout", testSpout)
  10. testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
复制代码

9
ReneeBK 发表于 2017-3-25 09:17:44
  1. Filter
  2. Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation −

  3. public class MyFilter extends BaseFilter {
  4.    public boolean isKeep(TridentTuple tuple) {
  5.       return tuple.getInteger(1) % 2 == 0;
  6.    }
  7. }

  8. input

  9. [1, 2]
  10. [1, 3]
  11. [1, 4]

  12. output

  13. [1, 2]
  14. [1, 4]
复制代码

10
ReneeBK 发表于 2017-3-25 09:18:38
  1. Function basically inherits from the BaseFunction class and implements the execute method. A sample implementation is given below −

  2. public class MyFunction extends BaseFunction {
  3.    public void execute(TridentTuple tuple, TridentCollector collector) {
  4.       int a = tuple.getInteger(0);
  5.       int b = tuple.getInteger(1);
  6.       collector.emit(new Values(a + b));
  7.    }
  8. }

  9. input

  10. [1, 2]
  11. [1, 3]
  12. [1, 4]

  13. output

  14. [1, 2, 3]
  15. [1, 3, 4]
  16. [1, 4, 5]
  17. Just like Filter operation, Function operation can be called in a topology using the each method. The sample code is as follows −

  18. TridentTopology topology = new TridentTopology();
  19. topology.newStream("spout", spout)
  20.    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
复制代码

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2026-1-3 01:32