楼主: ReneeBK
1144 8

Learning Apache Flink [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.6937
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2017-5-11 10:48:01 |AI写论文
1论坛币
  • Learning Apache Flink
  • By: Tanmay Deshpande

  • Publisher: Packt Publishing

  • Pub. Date: February 20, 2017

  • Web ISBN-13: 978-1-78646-726-3

  • Print ISBN-13: 978-1-78646-622-8

  • Pages in Print Edition: 280

  • Subscriber Rating: [0 Ratings]



关键词:Learning earning apache Learn link

本帖被以下文库推荐

沙发
ReneeBK 发表于 2017-5-11 10:49:12
  1. Map

  2. This is one of the simplest transformations, where the input is one data stream and the output is also one data stream.

  3. In Java:

  4. inputStream.map(new MapFunction<Integer, Integer>() {
  5.   @Override
  6.   public Integer map(Integer value) throws Exception {
  7.         return 5 * value;
  8.       }
  9.     });
  10. In Scala:

  11. inputStream.map { x => x * 5
复制代码

藤椅
ReneeBK 发表于 2017-5-11 10:49:46
  1. FlatMap

  2. FlatMap takes one record and outputs zero, one, or more than one record.

  3. In Java:

  4. inputStream.flatMap(new FlatMapFunction<String, String>() {
  5.     @Override
  6.     public void flatMap(String value, Collector<String> out)
  7.         throws Exception {
  8.         for(String word: value.split(" ")){
  9.             out.collect(word);
  10.         }
  11.     }
  12. });
  13. In Scala:

  14. inputStream.flatMap { str => str.split(" ") }
复制代码

板凳
ReneeBK 发表于 2017-5-11 10:50:07
  1. Filter

  2. Filter functions evaluate the conditions and then, if they result as true, only emit the record. Filter functions can output zero records.

  3. In Java:

  4. inputStream.filter(new FilterFunction<Integer>() {
  5.     @Override
  6.     public boolean filter(Integer value) throws Exception {
  7.         return value != 1;
  8.     }
  9. });
  10. In Scala:

  11. inputStream.filter { _ != 1 }
复制代码

报纸
ReneeBK 发表于 2017-5-11 10:50:30
  1. KeyBy

  2. KeyBy logically partitions the stream-based on the key. Internally it uses hash functions to partition the stream. It returns KeyedDataStream.

  3. In Java:

  4. inputStream.keyBy("someKey");
  5. In Scala:

  6. inputStream.keyBy("someKey")
复制代码

地板
ReneeBK 发表于 2017-5-11 10:50:54
  1. Reduce

  2. Reduce rolls out the KeyedDataStream by reducing the last reduced value with the current value. The following code does the sum reduce of a KeyedDataStream.

  3. In Java:

  4. keyedInputStream. reduce(new ReduceFunction<Integer>() {
  5.     @Override
  6.     public Integer reduce(Integer value1, Integer value2)
  7.     throws Exception {
  8.         return value1 + value2;
  9.     }
  10. });
  11. In Scala:

  12. keyedInputStream. reduce { _ + _ }
复制代码

7
ReneeBK 发表于 2017-5-11 10:51:42
  1. Fold

  2. Fold rolls out the KeyedDataStream by combining the last folder stream with the current record. It emits a data stream back.

  3. In Java:

  4. keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer, String>() {
  5.     @Override
  6.     public String fold(String current, Integer value) {
  7.         return current + "=" + value;
  8.     }
  9.   });
  10. In Scala:

  11. keyedInputStream.fold("Start")((str, i) => { str + "=" + i })
  12. The preceding given function when applied on a stream of (1,2,3,4,5) would emit a stream like this: Start=1=2=3=4=5
复制代码

8
Lisrelchen 发表于 2017-5-11 10:59:18
  1. package com.demo.chapter06

  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.ml.math.Vector
  4. import org.apache.flink.ml.common.LabeledVector
  5. import org.apache.flink.ml.classification.SVM
  6. import org.apache.flink.ml.RichExecutionEnvironment

  7. object MySVMApp {
  8.   def main(args: Array[String]) {
  9.     // set up the execution environment
  10.     val pathToTrainingFile: String = "iris-train.txt"
  11.     val pathToTestingFile: String = "iris-train.txt"
  12.     val env = ExecutionEnvironment.getExecutionEnvironment

  13.     // Read the training dataset, from a LibSVM formatted file
  14.     val trainingDS: DataSet[LabeledVector] =
  15.     env.readLibSVM(pathToTrainingFile)

  16.     // Create the SVM learner
  17.     val svm = SVM()
  18.       .setBlocks(10)

  19.     // Learn the SVM model
  20.     svm.fit(trainingDS)

  21.     // Read the testing dataset
  22.     val testingDS: DataSet[Vector] =
  23.     env.readLibSVM(pathToTestingFile).map(_.vector)

  24.     // Calculate the predictions for the testing dataset
  25.     val predictionDS: DataSet[(Vector, Double)] =
  26.     svm.predict(testingDS)
  27.     predictionDS.writeAsText("out")

  28.     env.execute("Flink SVM App")
  29.   }
  30. }
复制代码

9
Lisrelchen 发表于 2017-5-11 11:01:16
  1. #!/usr/bin/env python

  2. """
  3. Convert CSV file to libsvm format. Works only with numeric variables.
  4. Put -1 as label index (argv[3]) if there are no labels in your file.
  5. Expecting no headers. If present, headers can be skipped with argv[4] == 1.

  6. """

  7. import sys
  8. import csv
  9. from collections import defaultdict

  10. def construct_line( label, line ):
  11.         new_line = []
  12.         if float( label ) == 0.0:
  13.                 label = "0"
  14.         new_line.append( label )

  15.         for i, item in enumerate( line ):
  16.                 if item == '' or float( item ) == 0.0:
  17.                         continue
  18.                 new_item = "%s:%s" % ( i + 1, item )
  19.                 new_line.append( new_item )
  20.         new_line = " ".join( new_line )
  21.         new_line += "\n"
  22.         return new_line

  23. # ---

  24. input_file = sys.argv[1]
  25. output_file = sys.argv[2]

  26. try:
  27.         label_index = int( sys.argv[3] )
  28. except IndexError:
  29.         label_index = 0

  30. try:
  31.         skip_headers = sys.argv[4]
  32. except IndexError:
  33.         skip_headers = 0

  34. i = open( input_file, 'rb' )
  35. o = open( output_file, 'wb' )

  36. reader = csv.reader( i )

  37. if skip_headers:
  38.         headers = reader.next()

  39. for line in reader:
  40.         if label_index == -1:
  41.                 label = '1'
  42.         else:
  43.                 label = line.pop( label_index )

  44.         new_line = construct_line( label, line )
  45.         o.write( new_line )
复制代码

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-30 12:05