楼主: Lisrelchen
3658 11

Learning Storm Packt 2014 [推广有奖]

11
Lisrelchen 发表于 2016-4-17 02:50:21 |只看作者 |坛友微信交流群
  1. package com.learningstorm.monitoring;

  2. import java.util.Iterator;
  3. import java.util.Map;

  4. import backtype.storm.generated.ExecutorSpecificStats;
  5. import backtype.storm.generated.ExecutorStats;
  6. import backtype.storm.generated.ExecutorSummary;
  7. import backtype.storm.generated.Nimbus.Client;
  8. import backtype.storm.generated.SpoutStats;
  9. import backtype.storm.generated.TopologyInfo;

  10. public class SpoutStatistics {

  11.         private static final String DEFAULT = "default";
  12.         private static final String ALL_TIME = ":all-time";

  13.         public void printSpoutStatistics(String topologyId) {
  14.                 try {
  15.                 ThriftClient thriftClient = new ThriftClient();
  16.                 // Get the nimbus thrift client
  17.                 Client client = thriftClient.getClient();
  18.                 // Get the information of given topology
  19.                 TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);               
  20.                 Iterator<ExecutorSummary> executorSummaryIterator = topologyInfo
  21.                                 .get_executors_iterator();
  22.                 while (executorSummaryIterator.hasNext()) {
  23.                         ExecutorSummary executorSummary = executorSummaryIterator.next();
  24.                         ExecutorStats executorStats = executorSummary.get_stats();
  25.                         if(executorStats !=null) {
  26.                         ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
  27.                         String componentId = executorSummary.get_component_id();
  28.                         //
  29.                         if (executorSpecificStats.is_set_spout()) {
  30.                                 SpoutStats spoutStats = executorSpecificStats.get_spout();
  31.                                 System.out.println("*************************************");
  32.                                 System.out.println("Component ID of Spout:- " + componentId);
  33.                                 System.out.println("Transferred:- "
  34.                                                 + getAllTimeStat(executorStats.get_transferred(),ALL_TIME));
  35.                                 System.out.println("Total tuples emitted:- "
  36.                                                 + getAllTimeStat(executorStats.get_emitted(), ALL_TIME));
  37.                                 System.out.println("Acked: "
  38.                                                 + getAllTimeStat(spoutStats.get_acked(),
  39.                                                                 ALL_TIME));
  40.                                 System.out.println("Failed: "
  41.                                                 + getAllTimeStat(spoutStats.get_failed(),
  42.                                                                 ALL_TIME));
  43.                                 System.out.println("*************************************");
  44.                         }
  45.                         }
  46.                 }
  47.                 }catch (Exception exception) {
  48.                         throw new RuntimeException("Error occurred while fetching the spout information : "+exception);
  49.                 }
  50.         }

  51.         private static Long getAllTimeStat(Map<String, Map<String, Long>> map,
  52.                         String statName) {
  53.                 if (map != null) {
  54.                         Long statValue = null;
  55.                         Map<String, Long> tempMap = map.get(statName);
  56.                         statValue = tempMap.get(DEFAULT);
  57.                         return statValue;
  58.                 }
  59.                 return 0L;
  60.         }
  61.        
  62.         public static void main(String[] args) {
  63.                 new SpoutStatistics().printSpoutStatistics("LearningStormClusterTopology-1-1393847956");
  64.         }
  65. }
复制代码

使用道具

12
Lisrelchen 发表于 2016-4-17 02:51:43 |只看作者 |坛友微信交流群
  1. package com.learningstorm.monitoring;

  2. import java.util.Iterator;

  3. import backtype.storm.generated.ClusterSummary;
  4. import backtype.storm.generated.Nimbus.Client;
  5. import backtype.storm.generated.SupervisorSummary;

  6. public class SupervisorStatistics {
  7.        
  8.         public void printSupervisorStatistics()  {
  9.                 try {
  10.                 ThriftClient thriftClient = new ThriftClient();
  11.                 Client client = thriftClient.getClient();
  12.                 // Get the cluster information.
  13.                 ClusterSummary clusterSummary = client.getClusterInfo();
  14.                 // Get the SupervisorSummary interator
  15.                 Iterator<SupervisorSummary> supervisorsIterator = clusterSummary.get_supervisors_iterator();
  16.                
  17.                 while (supervisorsIterator.hasNext()) {
  18.                         // Print the information of supervisor node
  19.                         SupervisorSummary supervisorSummary = (SupervisorSummary) supervisorsIterator.next();
  20.                         System.out.println("*************************************");
  21.                         System.out.println("Supervisor Host IP : "+supervisorSummary.get_host());
  22.                         System.out.println("Number of used workers : "+supervisorSummary.get_num_used_workers());
  23.                         System.out.println("Number of workers : "+supervisorSummary.get_num_workers());
  24.                         System.out.println("Supervisor ID : "+supervisorSummary.get_supervisor_id());
  25.                         System.out.println("Supervisor uptime in seconds : "+supervisorSummary.get_uptime_secs());
  26.                         System.out.println("*************************************");
  27.                 }
  28.                
  29.                 }catch (Exception e) {
  30.                         throw new RuntimeException("Error occurred while getting cluster info : ");
  31.                 }
  32.         }
  33.        
  34.         public static void main(String[] args) {
  35.                 new SupervisorStatistics().printSupervisorStatistics();
  36.         }
  37.        
  38. }
复制代码

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-25 12:52