楼主: ReneeBK
1381 7

【Apache Flint】Flink DataStream API Programming Guide [推广有奖]

  • 1关注
  • 62粉丝

VIP

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49492 个
通用积分
53.3854
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57815 点
帖子
4006
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

本帖隐藏的内容

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html

  1. Example Program
  2. The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.

  3. Java
  4. Scala
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.windowing.time.Time

  7. object WindowWordCount {
  8.   def main(args: Array[String]) {

  9.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  10.     val text = env.socketTextStream("localhost", 9999)

  11.     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  12.       .map { (_, 1) }
  13.       .keyBy(0)
  14.       .timeWindow(Time.seconds(5))
  15.       .sum(1)

  16.     counts.print

  17.     env.execute("Window Stream WordCount")
  18.   }
  19. }
复制代码


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Programming datastream Program apache Stream

本帖被以下文库推荐

沙发
franky_sas 发表于 2017-3-10 09:54:36 |只看作者 |坛友微信交流群
Thanks.
已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

藤椅
ReneeBK 发表于 2017-3-10 09:54:55 |只看作者 |坛友微信交流群
  1. Running an example
  2. In order to run a Flink example, we assume you have a running Flink instance available. The “Quickstart” and “Setup” tabs in the navigation describe various ways of starting Flink.

  3. The easiest way is running the ./bin/start-local.sh script, which will start a JobManager locally.

  4. Each binary release of Flink contains an examples directory with jar files for each of the examples on this page.

  5. To run the WordCount example, issue the following command:

  6. ./bin/flink run ./examples/batch/WordCount.jar
  7. The other examples can be started in a similar way.

  8. Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:

  9. ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
  10. Note that non-local file systems require a schema prefix, such as hdfs://.
复制代码

使用道具

板凳
ReneeBK 发表于 2017-3-10 09:55:24 |只看作者 |坛友微信交流群
  1. Word Count
  2. WordCount is the “Hello World” of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.

  3. Java
  4. Scala
  5. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  6. DataSet<String> text = env.readTextFile("/path/to/file");

  7. DataSet<Tuple2<String, Integer>> counts =
  8.         // split up the lines in pairs (2-tuples) containing: (word,1)
  9.         text.flatMap(new Tokenizer())
  10.         // group by the tuple field "0" and sum up tuple field "1"
  11.         .groupBy(0)
  12.         .sum(1);

  13. counts.writeAsCsv(outputPath, "\n", " ");

  14. // User-defined functions
  15. public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

  16.     @Override
  17.     public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  18.         // normalize and split the line
  19.         String[] tokens = value.toLowerCase().split("\\W+");

  20.         // emit the pairs
  21.         for (String token : tokens) {
  22.             if (token.length() > 0) {
  23.                 out.collect(new Tuple2<String, Integer>(token, 1));
  24.             }   
  25.         }
  26.     }
  27. }
  28. The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.
复制代码

使用道具

报纸
ReneeBK 发表于 2017-3-10 09:56:09 |只看作者 |坛友微信交流群
  1. Page Rank
  2. The PageRank algorithm computes the “importance” of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.

  3. In this simple example, PageRank is implemented with a bulk iteration and a fixed number of iterations.

  4. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  5. // read the pages and initial ranks by parsing a CSV file
  6. DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
  7.                                                    .types(Long.class, Double.class)

  8. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
  9. DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);

  10. // set iterative data set
  11. IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

  12. DataSet<Tuple2<Long, Double>> newRanks = iteration
  13.         // join pages with outgoing edges and distribute rank
  14.         .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
  15.         // collect and sum ranks
  16.         .groupBy(0).sum(1)
  17.         // apply dampening factor
  18.         .map(new Dampener(DAMPENING_FACTOR, numPages));

  19. DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
  20.         newRanks,
  21.         newRanks.join(iteration).where(0).equalTo(0)
  22.         // termination condition
  23.         .filter(new EpsilonFilter()));

  24. finalPageRanks.writeAsCsv(outputPath, "\n", " ");

  25. // User-defined functions

  26. public static final class JoinVertexWithEdgesMatch
  27.                     implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
  28.                                             Tuple2<Long, Double>> {

  29.     @Override
  30.     public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
  31.                         Collector<Tuple2<Long, Double>> out) {
  32.         Long[] neighbors = adj.f1;
  33.         double rank = page.f1;
  34.         double rankToDistribute = rank / ((double) neigbors.length);

  35.         for (int i = 0; i < neighbors.length; i++) {
  36.             out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
  37.         }
  38.     }
  39. }

  40. public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
  41.     private final double dampening, randomJump;

  42.     public Dampener(double dampening, double numVertices) {
  43.         this.dampening = dampening;
  44.         this.randomJump = (1 - dampening) / numVertices;
  45.     }

  46.     @Override
  47.     public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
  48.         value.f1 = (value.f1 * dampening) + randomJump;
  49.         return value;
  50.     }
  51. }

  52. public static final class EpsilonFilter
  53.                 implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

  54.     @Override
  55.     public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
  56.         return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
  57.     }
  58. }
复制代码

使用道具

地板
auirzxp 学生认证  发表于 2017-3-10 09:56:15 |只看作者 |坛友微信交流群
感谢分享
已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

7
ReneeBK 发表于 2017-3-10 10:00:41 |只看作者 |坛友微信交流群
  1. Connected Components
  2. The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.

  3. This implementation uses a delta iteration: Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.

  4. Java
  5. Scala
  6. // read vertex and edge data
  7. DataSet<Long> vertices = getVertexDataSet(env);
  8. DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());

  9. // assign the initial component IDs (equal to the vertex ID)
  10. DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

  11. // open a delta iteration
  12. DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
  13.         verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);

  14. // apply the step logic:
  15. DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
  16.         // join with the edges
  17.         .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
  18.         // select the minimum neighbor component ID
  19.         .groupBy(0).aggregate(Aggregations.MIN, 1)
  20.         // update if the component ID of the candidate is smaller
  21.         .join(iteration.getSolutionSet()).where(0).equalTo(0)
  22.         .flatMap(new ComponentIdFilter());

  23. // close the delta iteration (delta and new workset are identical)
  24. DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

  25. // emit result
  26. result.writeAsCsv(outputPath, "\n", " ");

  27. // User-defined functions

  28. public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {

  29.     @Override
  30.     public Tuple2<T, T> map(T vertex) {
  31.         return new Tuple2<T, T>(vertex, vertex);
  32.     }
  33. }

  34. public static final class UndirectEdge
  35.                     implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  36.     Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();

  37.     @Override
  38.     public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
  39.         invertedEdge.f0 = edge.f1;
  40.         invertedEdge.f1 = edge.f0;
  41.         out.collect(edge);
  42.         out.collect(invertedEdge);
  43.     }
  44. }

  45. public static final class NeighborWithComponentIDJoin
  46.                 implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

  47.     @Override
  48.     public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
  49.         return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
  50.     }
  51. }

  52. public static final class ComponentIdFilter
  53.                     implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
  54.                                             Tuple2<Long, Long>> {

  55.     @Override
  56.     public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
  57.                         Collector<Tuple2<Long, Long>> out) {
  58.         if (value.f0.f1 < value.f1.f1) {
  59.             out.collect(value.f0);
  60.         }
  61.     }
  62. }
复制代码

使用道具

8
钱学森64 发表于 2017-3-10 13:53:56 |只看作者 |坛友微信交流群
谢谢分享
已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-9-17 14:55