楼主: ReneeBK
1878 14

Apache Beam Programming Guide [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.6937
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2017-3-11 04:24:24 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
Apache Beam Programming Guide

The Beam Programming Guide is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to implement Beam concepts in your pipelines.

Adapt for:
  • Java SDK
  • Python SDK
Contents
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Programming Program apache Guide gram

本帖被以下文库推荐

沙发
ReneeBK 发表于 2017-3-11 04:25:12
  1. Creating the pipeline
  2. The Pipeline abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a Pipeline object, and then using that object as the basis for creating the pipeline’s data sets as PCollections and its operations as Transforms.

  3. To use Beam, your driver program must first create an instance of the Beam SDK class Pipeline (typically in the main() function). When you create your Pipeline, you’ll also need to set some configuration options. You can set your pipeline’s configuration options programatically, but it’s often easier to set the options ahead of time (or read them from the command line) and pass them to the Pipeline object when you create the object.

  4. The pipeline configuration options determine, among other things, the PipelineRunner that determines where the pipeline gets executed: locally, or using a distributed back-end of your choice. Depending on where your pipeline gets executed and what your specifed Runner requires, the options can also help you specify other aspects of execution.

  5. To set your pipeline’s configuration options and create the pipeline, create an object of type PipelineOptions and pass it to Pipeline.Create(). The most common way to do this is by parsing arguments from the command-line:

  6. Java
  7. Python
  8. public static void main(String[] args) {
  9.    // Will parse the arguments passed into the application and construct a PipelineOptions
  10.    // Note that --help will print registered options, and --help=PipelineOptionsClassName
  11.    // will print out usage for the specific class.
  12.    PipelineOptions options =
  13.        PipelineOptionsFactory.fromArgs(args).create();

  14.    Pipeline p = Pipeline.create(options);
  15. The Beam SDKs contain various subclasses of PipelineOptions that correspond to different Runners. For example, DirectPipelineOptions contains options for the Direct (local) pipeline runner, while DataflowPipelineOptions contains options for using the runner for Google Cloud Dataflow. You can also define your own custom PipelineOptions by creating an interface that extends the Beam SDKs’ PipelineOptions class.
复制代码

藤椅
ReneeBK 发表于 2017-3-11 04:25:34
  1. Reading from an external source

  2. To read from an external source, you use one of the Beam-provided I/O adapters. The adapters vary in their exact usage, but all of them from some external data source and return a PCollection whose elements represent the data records in that source.

  3. Each data source adapter has a Read transform; to read, you must apply that transform to the Pipeline object itself. TextIO.Read, for example, reads from an external text file and returns a PCollection whose elements are of type String, each String represents one line from the text file. Here’s how you would apply TextIO.Read to your Pipeline to create a PCollection:

  4. Java
  5. Python
  6. public static void main(String[] args) {
  7.     // Create the pipeline.
  8.     PipelineOptions options =
  9.         PipelineOptionsFactory.fromArgs(args).create();
  10.     Pipeline p = Pipeline.create(options);

  11.     PCollection<String> lines = p.apply(
  12.       "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
  13. }
复制代码

板凳
ReneeBK 发表于 2017-3-11 04:26:11
  1. Creating a PCollection from in-memory data

  2. To create a PCollection from an in-memory Java Collection, you use the Beam-provided Create transform. Much like a data adapter’s Read, you apply Create directly to your Pipeline object itself.

  3. As parameters, Create accepts the Java Collection and a Coder object. The Coder specifies how the elements in the Collection should be encoded.

  4. The following example code shows how to create a PCollection from an in-memory List:

  5. public static void main(String[] args) {
  6.     // Create a Java Collection, in this case a List of Strings.
  7.     static final List<String> LINES = Arrays.asList(
  8.       "To be, or not to be: that is the question: ",
  9.       "Whether 'tis nobler in the mind to suffer ",
  10.       "The slings and arrows of outrageous fortune, ",
  11.       "Or to take arms against a sea of troubles, ");

  12.     // Create the pipeline.
  13.     PipelineOptions options =
  14.         PipelineOptionsFactory.fromArgs(args).create();
  15.     Pipeline p = Pipeline.create(options);

  16.     // Apply Create, passing the list and the coder, to create the PCollection.
  17.     p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
  18. }
复制代码

报纸
ReneeBK 发表于 2017-3-11 04:27:36
  1. Applying ParDo

  2. Like all Beam transforms, you apply ParDo by calling the apply method on the input PCollection and passing ParDo as an argument, as shown in the following example code:

  3. // The input PCollection of Strings.
  4. PCollection<String> words = ...;

  5. // The DoFn to perform on each element in the input PCollection.
  6. static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

  7. // Apply a ParDo to the PCollection "words" to compute lengths for each word.
  8. PCollection<Integer> wordLengths = words.apply(
  9.     ParDo
  10.     .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
  11.                                             // we define above.
复制代码

地板
ReneeBK 发表于 2017-3-11 04:29:25
  1. Creating a DoFn

  2. The DoFn object that you pass to ParDo contains the processing logic that gets applied to the elements in the input collection. When you use Beam, often the most important pieces of code you’ll write are these DoFns–they’re what define your pipeline’s exact data processing tasks.

  3. Note: When you create your DoFn, be mindful of the General Requirements for Writing User Code for Beam Transforms and ensure that your code follows them.
  4. A DoFn processes one element at a time from the input PCollection. When you create a subclass of DoFn, you’ll need to provide type paraemters that match the types of the input and output elements. If your DoFn processes incoming String elements and produces Integer elements for the output collection (like our previous example, ComputeWordLengthFn), your class declaration would look like this:

  5. static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
复制代码

7
ReneeBK 发表于 2017-3-11 04:30:38
  1. Using Combine

  2. Combine is a Beam transform for combining collections of elements or values in your data. Combine has variants that work on entire PCollections, and some that combine the values for each key in PCollections of key/value pairs.

  3. When you apply a Combine transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.

  4. Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of CombineFn that has an accumulation type distinct from the input/output type.

  5. Simple combinations using simple functions

  6. The following example code shows a simple combine function.

  7. // Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
  8. public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
  9.   @Override
  10.   public Integer apply(Iterable<Integer> input) {
  11.     int sum = 0;
  12.     for (int item : input) {
  13.       sum += item;
  14.     }
  15.     return sum;
  16.   }
  17. }
复制代码

8
ReneeBK 发表于 2017-3-11 04:31:08
  1. The following example code shows how to define a CombineFn that computes a mean average:

  2. public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
  3.   public static class Accum {
  4.     int sum = 0;
  5.     int count = 0;
  6.   }

  7.   @Override
  8.   public Accum createAccumulator() { return new Accum(); }

  9.   @Override
  10.   public Accum addInput(Accum accum, Integer input) {
  11.       accum.sum += input;
  12.       accum.count++;
  13.       return accum;
  14.   }

  15.   @Override
  16.   public Accum mergeAccumulators(Iterable<Accum> accums) {
  17.     Accum merged = createAccumulator();
  18.     for (Accum accum : accums) {
  19.       merged.sum += accum.sum;
  20.       merged.count += accum.count;
  21.     }
  22.     return merged;
  23.   }

  24.   @Override
  25.   public Double extractOutput(Accum accum) {
  26.     return ((double) accum.sum) / accum.count;
  27.   }
  28. }
复制代码

9
ReneeBK 发表于 2017-3-11 04:31:36
  1. Combining a PCollection into a single value

  2. Use the global combine to transform all of the elements in a given PCollection into a single value, represented in your pipeline as a new PCollection containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a PCollection of integers.

  3. // Sum.SumIntegerFn() combines the elements in the input PCollection.
  4. // The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.
  5. PCollection<Integer> pc = ...;
  6. PCollection<Integer> sum = pc.apply(
  7.    Combine.globally(new Sum.SumIntegerFn()));
复制代码

10
ReneeBK 发表于 2017-3-11 04:32:36
  1. The following example shows how to apply a Flatten transform to merge multiple PCollection objects.

  2. // Flatten takes a PCollectionList of PCollection objects of a given type.
  3. // Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
  4. PCollection<String> pc1 = ...;
  5. PCollection<String> pc2 = ...;
  6. PCollection<String> pc3 = ...;
  7. PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  8. PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
复制代码

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-26 17:36