楼主: ReneeBK
831 0

[Case Study]Word Count using Java [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.6937
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2015-11-16 00:20:10 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. package org.apache.spark.examples.streaming;

  2. import java.util.HashMap;
  3. import java.util.HashSet;
  4. import java.util.Arrays;
  5. import java.util.regex.Pattern;

  6. import scala.Tuple2;

  7. import com.google.common.collect.Lists;
  8. import kafka.serializer.StringDecoder;

  9. import org.apache.spark.SparkConf;
  10. import org.apache.spark.api.java.function.*;
  11. import org.apache.spark.streaming.api.java.*;
  12. import org.apache.spark.streaming.kafka.KafkaUtils;
  13. import org.apache.spark.streaming.Durations;

  14. /**
  15. * Consumes messages from one or more topics in Kafka and does wordcount.
  16. * Usage: DirectKafkaWordCount <brokers> <topics>
  17. *   <brokers> is a list of one or more Kafka brokers
  18. *   <topics> is a list of one or more kafka topics to consume from
  19. *
  20. * Example:
  21. *    $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
  22. */

  23. public final class JavaDirectKafkaWordCount {
  24.   private static final Pattern SPACE = Pattern.compile(" ");

  25.   public static void main(String[] args) {
  26.     if (args.length < 2) {
  27.       System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
  28.           "  <brokers> is a list of one or more Kafka brokers\n" +
  29.           "  <topics> is a list of one or more kafka topics to consume from\n\n");
  30.       System.exit(1);
  31.     }

  32.     StreamingExamples.setStreamingLogLevels();

  33.     String brokers = args[0];
  34.     String topics = args[1];

  35.     // Create context with 2 second batch interval
  36.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
  37.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

  38.     HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
  39.     HashMap<String, String> kafkaParams = new HashMap<String, String>();
  40.     kafkaParams.put("metadata.broker.list", brokers);

  41.     // Create direct kafka stream with brokers and topics
  42.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
  43.         jssc,
  44.         String.class,
  45.         String.class,
  46.         StringDecoder.class,
  47.         StringDecoder.class,
  48.         kafkaParams,
  49.         topicsSet
  50.     );

  51.     // Get the lines, split them into words, count the words and print
  52.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  53.       @Override
  54.       public String call(Tuple2<String, String> tuple2) {
  55.         return tuple2._2();
  56.       }
  57.     });
  58.     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  59.       @Override
  60.       public Iterable<String> call(String x) {
  61.         return Lists.newArrayList(SPACE.split(x));
  62.       }
  63.     });
  64.     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  65.       new PairFunction<String, String, Integer>() {
  66.         @Override
  67.         public Tuple2<String, Integer> call(String s) {
  68.           return new Tuple2<String, Integer>(s, 1);
  69.         }
  70.       }).reduceByKey(
  71.         new Function2<Integer, Integer, Integer>() {
  72.         @Override
  73.         public Integer call(Integer i1, Integer i2) {
  74.           return i1 + i2;
  75.         }
  76.       });
  77.     wordCounts.print();

  78.     // Start the computation
  79.     jssc.start();
  80.     jssc.awaitTermination();
  81.   }
  82. }
复制代码

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Case study Using Count study Case package import Java Word

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
jg-xs1
拉您进交流群
GMT+8, 2026-1-1 00:34