- 阅读权限
- 255
- 威望
- 1 级
- 论坛币
- 49635 个
- 通用积分
- 55.6937
- 学术水平
- 370 点
- 热心指数
- 273 点
- 信用等级
- 335 点
- 经验
- 57805 点
- 帖子
- 4005
- 精华
- 21
- 在线时间
- 582 小时
- 注册时间
- 2005-5-8
- 最后登录
- 2023-11-26
已卖:4897份资源
学术权威
还不是VIP/贵宾
TA的文库 其他... R资源总汇
Panel Data Analysis
Experimental Design
- 威望
- 1 级
- 论坛币
 - 49635 个
- 通用积分
- 55.6937
- 学术水平
- 370 点
- 热心指数
- 273 点
- 信用等级
- 335 点
- 经验
- 57805 点
- 帖子
- 4005
- 精华
- 21
- 在线时间
- 582 小时
- 注册时间
- 2005-5-8
- 最后登录
- 2023-11-26
 | 开心 2017-10-21 10:25:33 |
|---|
签到天数: 1 天 连续签到: 1 天 [LV.1]初来乍到
|
经管之家送您一份
应届毕业生专属福利!
求职就业群
感谢您参与论坛问题回答
经管之家送您两个论坛币!
+2 论坛币
- package org.apache.spark.examples.streaming;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Arrays;
- import java.util.regex.Pattern;
- import scala.Tuple2;
- import com.google.common.collect.Lists;
- import kafka.serializer.StringDecoder;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.function.*;
- import org.apache.spark.streaming.api.java.*;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import org.apache.spark.streaming.Durations;
- /**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: DirectKafkaWordCount <brokers> <topics>
- * <brokers> is a list of one or more Kafka brokers
- * <topics> is a list of one or more kafka topics to consume from
- *
- * Example:
- * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
- */
- public final class JavaDirectKafkaWordCount {
- private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
- " <brokers> is a list of one or more Kafka brokers\n" +
- " <topics> is a list of one or more kafka topics to consume from\n\n");
- System.exit(1);
- }
- StreamingExamples.setStreamingLogLevels();
- String brokers = args[0];
- String topics = args[1];
- // Create context with 2 second batch interval
- SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
- HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
- HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("metadata.broker.list", brokers);
- // Create direct kafka stream with brokers and topics
- JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
- jssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topicsSet
- );
- // Get the lines, split them into words, count the words and print
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
- wordCounts.print();
- // Start the computation
- jssc.start();
- jssc.awaitTermination();
- }
- }
复制代码
扫码加我 拉你入群
请注明:姓名-公司-职位
以便审核进群资格,未注明则拒绝
|
|
|