请选择 进入手机版 | 继续访问电脑版
楼主: Lisrelchen
3652 11

Learning Storm Packt 2014 [推广有奖]

  • 0关注
  • 62粉丝

VIP

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
49957 个
通用积分
79.5487
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

Lisrelchen 发表于 2014-10-3 20:21:59 |显示全部楼层 |坛友微信交流群
相似文件 换一批

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

About This Book
  • Integrate Storm with other Big Data technologies like Hadoop, HBase, and Apache Kafka
  • Explore log processing and machine learning using Storm
  • Step-by-step and easy-to-understand guide to effortlessly create applications with Storm
Who This Book Is For

If you are a Java developer who wants to enter into the world of real-time stream processing applications using Apache Storm, then this book is for you. No previous experience in Storm is required as this book starts from the basics. After finishing this book, you will be able to develop not-so-complex Storm applications.

In Detail

Starting with the very basics of Storm, you will learn how to set up Storm on a single machine and move on to deploying Storm on your cluster. You will understand how Kafka can be integrated with Storm using the Kafka spout.

You will then proceed to explore the Trident abstraction tool with Storm to perform stateful stream processing, guaranteeing single message processing in every topology. You will move ahead to learn how to integrate Hadoop with Storm. Next, you will learn how to integrate Storm with other well-known Big Data technologies such as HBase, Redis, and Kafka to realize the full potential of Storm.

Finally, you will perform in-depth case studies on Apache log processing and machine learning with a focus on Storm, and through these case studies, you will discover Storm's realm of possibilities


Editorial ReviewsAbout the AuthorAnkit Jain
Ankit Jain holds a Bachelor's degree in Computer Science Engineering. He has 4 years of experience in designing and architecting solutions for the Big Data domain and has been involved with several complex engagements. His technical strengths include Hadoop, Storm, S4, HBase, Hive, Sqoop, Flume, ElasticSearch, Machine Learning, Kafka, Spring, Java, and J2EE. He is currently employed with Impetus Infotech Pvt. Ltd. He also shares his thoughts on his personal blog at http://ankitasblogger.blogspot.in/. You can follow him on Twitter at @mynameisanky. He spends most of his time reading books and playing with different technologies. When not at work, he spends time with his family and friends watching movies and playing games.


Product Details






二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Learning earning Learn Storm ning 2014 experience developer learning previous

本帖被以下文库推荐

jacky陈2183 在职认证  学生认证  发表于 2015-1-24 23:19:57 |显示全部楼层 |坛友微信交流群
很好的书

使用道具

Nicolle 学生认证  发表于 2016-4-17 02:41:53 |显示全部楼层 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

Lisrelchen 发表于 2016-4-17 02:43:39 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.storm_example;

  2. import backtype.storm.Config;
  3. import backtype.storm.StormSubmitter;
  4. import backtype.storm.generated.AlreadyAliveException;
  5. import backtype.storm.generated.InvalidTopologyException;
  6. import backtype.storm.topology.TopologyBuilder;

  7. public class LearningStormSingleNodeTopology {
  8.         public static void main(String[] args) {
  9.                 TopologyBuilder builder = new TopologyBuilder();
  10.                 builder.setSpout("LearningStormSpout", new LearningStormSpout(), 2);
  11.                 builder.setBolt("LearningStormBolt", new LearningStormBolt(), 4).shuffleGrouping("LearningStormSpout");

  12.                 Config conf = new Config();
  13.                 conf.setNumWorkers(3);
  14.                
  15.                 try {
  16.                         // args[0] is the name of submitted topology
  17.                         StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  18.                 }catch(AlreadyAliveException alreadyAliveException) {
  19.                         System.out.println(alreadyAliveException);
  20.                 } catch (InvalidTopologyException invalidTopologyException) {
  21.                         System.out.println(invalidTopologyException);
  22.                 }
  23.         }
  24. }
复制代码

使用道具

Lisrelchen 发表于 2016-4-17 02:44:51 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.storm_example;

  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Random;

  5. import backtype.storm.spout.SpoutOutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichSpout;
  9. import backtype.storm.tuple.Fields;
  10. import backtype.storm.tuple.Values;

  11. public class LearningStormSpout extends BaseRichSpout{
  12.         private static final long serialVersionUID = 1L;
  13.         private SpoutOutputCollector spoutOutputCollector;
  14.         private static final Map<Integer, String> map = new HashMap<Integer, String>();
  15.             static {
  16.                 map.put(0, "google");
  17.                 map.put(1, "facebook");
  18.                 map.put(2, "twitter");
  19.                 map.put(3, "youtube");
  20.                 map.put(4, "linkedin");
  21.     }
  22.         public void open(Map conf, TopologyContext context,
  23.                         SpoutOutputCollector spoutOutputCollector) {
  24.                 // Open the spout
  25.                 this.spoutOutputCollector = spoutOutputCollector;
  26.         }
  27.                
  28.         public void nextTuple() {
  29.                 // Storm cluster repeatedly call this method to emit the continuous
  30.                 // stream of tuples.
  31.                 final Random rand = new Random();
  32.                 // generate the random number from 0 to 4.
  33.                 int randomNumber = rand.nextInt(5);
  34.                 spoutOutputCollector.emit(new Values(map.get(randomNumber)));

  35.         }

  36.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37.                 // emit the tuple with field name "site"
  38.                 declarer.declare(new Fields("site"));
  39.         }
  40. }
复制代码

使用道具

Lisrelchen 发表于 2016-4-17 02:45:41 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.storm_example;

  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.generated.AlreadyAliveException;
  5. import backtype.storm.generated.InvalidTopologyException;
  6. import backtype.storm.topology.TopologyBuilder;

  7. public class LearningStormTopology {
  8.         public static void main(String[] args) throws AlreadyAliveException,
  9.                         InvalidTopologyException {
  10.                 TopologyBuilder builder = new TopologyBuilder();
  11.                 // set the spout class
  12.                 builder.setSpout("LearningStormSpout", new LearningStormSpout(), 2);
  13.                 // set the bolt class
  14.                 builder.setBolt("LearningStormBolt", new LearningStormBolt(), 4).shuffleGrouping("LearningStormSpout");

  15.                 Config conf = new Config();
  16.                 conf.setDebug(true);
  17.                 // create an instance of LocalCluster class for
  18.                 // executing topology in local mode.
  19.                 LocalCluster cluster = new LocalCluster();

  20.                 // LearningStormTopolgy is the name of submitted topology.
  21.                 cluster.submitTopology("LearningStormToplogy", conf,
  22.                                 builder.createTopology());
  23.                 try {
  24.                         Thread.sleep(10000);
  25.                 } catch (Exception exception) {
  26.                         System.out.println("Thread interrupted exception : " + exception);
  27.                 }
  28.                 // kill the LearningStormTopology
  29.                 cluster.killTopology("LearningStormToplogy");
  30.                 // shutdown the storm test cluster
  31.                 cluster.shutdown();

  32.         }
  33. }
复制代码

使用道具

Lisrelchen 发表于 2016-4-17 02:46:50 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.grouping;

  2. import java.io.Serializable;
  3. import java.util.List;
  4. import java.util.Map;

  5. import backtype.storm.generated.GlobalStreamId;
  6. import backtype.storm.grouping.CustomStreamGrouping;
  7. import backtype.storm.task.WorkerTopologyContext;

  8. import com.google.common.collect.ImmutableList;
  9. import com.google.common.collect.ImmutableMap;

  10. public class CategoryGrouping implements CustomStreamGrouping, Serializable {
  11.         private static final Map<String, Integer> categories = ImmutableMap.of(
  12.                 "Financial", 0,
  13.                 "Medical", 1,
  14.                 "FMCG", 2,
  15.                 "Electornics", 3
  16.         );

  17.         private int tasks = 0;

  18.         public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
  19.                         List<Integer> targetTasks) {
  20.                 tasks = targetTasks.size();
  21.         }

  22.         public List<Integer> chooseTasks(int taskId, List<Object> values) {
  23.                 String category = (String) values.get(0);
  24.                 return ImmutableList.of(categories.get(category) % tasks);
  25.         }
  26. }
复制代码

使用道具

Lisrelchen 发表于 2016-4-17 02:48:38 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.monitoring;

  2. import java.util.Iterator;
  3. import java.util.Map;
  4. import java.util.Set;

  5. import backtype.storm.generated.BoltStats;
  6. import backtype.storm.generated.ExecutorSpecificStats;
  7. import backtype.storm.generated.ExecutorStats;
  8. import backtype.storm.generated.ExecutorSummary;
  9. import backtype.storm.generated.GlobalStreamId;
  10. import backtype.storm.generated.Nimbus.Client;
  11. import backtype.storm.generated.TopologyInfo;

  12. public class BoltStatistics {

  13.         private static final String DEFAULT = "default";
  14.         private static final String ALL_TIME = ":all-time";

  15.         public void printBoltStatistics(String topologyId) {

  16.                 try {
  17.                         ThriftClient thriftClient = new ThriftClient();
  18.                         // Get the Nimbus thrift server client
  19.                         Client client = thriftClient.getClient();
  20.                        
  21.                         // Get the information of given topology
  22.                         TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
  23.                         Iterator<ExecutorSummary> executorSummaryIterator = topologyInfo
  24.                                         .get_executors_iterator();
  25.                         while (executorSummaryIterator.hasNext()) {
  26.                                 // get the executor
  27.                                 ExecutorSummary executorSummary = executorSummaryIterator.next();
  28.                                 ExecutorStats executorStats = executorSummary.get_stats();
  29.                                 if (executorStats != null) {
  30.                                         ExecutorSpecificStats executorSpecificStats = executorStats
  31.                                                         .get_specific();
  32.                                         String componentId = executorSummary.get_component_id();
  33.                                         if (executorSpecificStats.is_set_bolt()) {
  34.                                                 BoltStats boltStats = executorSpecificStats.get_bolt();
  35.                                                 System.out.println("*************************************");
  36.                                                 System.out.println("Component ID of Bolt " + componentId);
  37.                                                 System.out.println("Transferred: "
  38.                                                                 + getAllTimeStat(
  39.                                                                                 executorStats.get_transferred(),
  40.                                                                                 ALL_TIME));
  41.                                                 System.out.println("Emitted: "
  42.                                                                 + getAllTimeStat(executorStats.get_emitted(),
  43.                                                                                 ALL_TIME));
  44.                                                 System.out.println("Acked: "
  45.                                                                 + getBoltStats(
  46.                                                                                 boltStats.get_acked(), ALL_TIME));
  47.                                                 System.out.println("Failed: "
  48.                                                                 + getBoltStats(
  49.                                                                                 boltStats.get_failed(), ALL_TIME));
  50.                                                 System.out.println("Executed : "
  51.                                                                 + getBoltStats(
  52.                                                                                 boltStats.get_executed(), ALL_TIME));
  53.                                                 System.out.println("*************************************");
  54.                                         }
  55.                                 }
  56.                         }
  57.                 } catch (Exception exception) {
  58.                         throw new RuntimeException("Error occurred while fetching the bolt information :"+exception);
  59.                 }
  60.         }

  61.         private static Long getAllTimeStat(Map<String, Map<String, Long>> map,
  62.                         String statName) {
  63.                 if (map != null) {
  64.                         Long statValue = null;
  65.                         Map<String, Long> tempMap = map.get(statName);
  66.                         statValue = tempMap.get(DEFAULT);
  67.                         return statValue;
  68.                 }
  69.                 return 0L;
  70.         }

  71.         public static Long getBoltStats(
  72.                         Map<String, Map<GlobalStreamId, Long>> map, String statName) {
  73.                 if (map != null) {
  74.                         Long statValue = null;
  75.                         Map<GlobalStreamId, Long> tempMap = map.get(statName);
  76.                         Set<GlobalStreamId> key = tempMap.keySet();
  77.                         if (key.size() > 0) {
  78.                                 Iterator<GlobalStreamId> iterator = key.iterator();
  79.                                 statValue = tempMap.get(iterator.next());
  80.                         }
  81.                         return statValue;
  82.                 }
  83.                 return 0L;
  84.         }
  85.        
  86.         public static void main(String[] args) {
  87.                 new BoltStatistics().printBoltStatistics("LearningStormClusterTopology-1-1393847956");
  88.         }
  89.        
  90. }
复制代码

使用道具

Lisrelchen 发表于 2016-4-17 02:49:12 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.monitoring;

  2. import backtype.storm.generated.Nimbus.Client;

  3. public class killTopology {
  4.        
  5.         public void kill(String topologyId) {
  6.                 try {
  7.                 ThriftClient thriftClient = new ThriftClient();
  8.                 // Get the nimbus thrift client
  9.                 Client client = thriftClient.getClient();
  10.                 // kill the given topology
  11.                 client.killTopology(topologyId);
  12.                
  13.                 }catch (Exception exception) {
  14.                         throw new RuntimeException("Error occurred while killing the topology : "+exception);
  15.                 }
  16.         }
  17.        
  18.         public static void main(String[] args) {
  19.                 new killTopology().kill("topologyId");
  20.         }
  21. }
复制代码

使用道具

Lisrelchen 发表于 2016-4-17 02:49:50 |显示全部楼层 |坛友微信交流群
  1. package com.learningstorm.monitoring;

  2. import backtype.storm.generated.Nimbus.Client;

  3. public class NimbusConfiguration {
  4.        
  5.         public void printNimbusStats() {
  6.                 try {
  7.                 ThriftClient thriftClient = new ThriftClient();
  8.                 Client client = thriftClient.getClient();
  9.                 String nimbusConiguration = client.getNimbusConf();
  10.                 System.out.println("*************************************");
  11.                 System.out.println("Nimbus Configuration : "+nimbusConiguration);
  12.                 System.out.println("*************************************");
  13.                 }catch(Exception exception) {
  14.                         throw new RuntimeException("Error occurred while fetching the Nimbus statistics : ");
  15.                 }
  16.         }
  17.        
  18.         public static void main(String[] args) {
  19.                 new NimbusConfiguration().printNimbusStats();
  20.         }       
  21. }
复制代码

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-18 16:14