楼主: ReneeBK
1879 14

Apache Beam Programming Guide [推广有奖]

11
ReneeBK 发表于 2017-3-11 04:33:01
  1. The following example divides a PCollection into percentile groups.

  2. // Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
  3. // In this example, we define the PartitionFn in-line.
  4. // Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
  5. PCollection<Student> students = ...;
  6. // Split students up into 10 partitions, by percentile:
  7. PCollectionList<Student> studentsByPercentile =
  8.     students.apply(Partition.of(10, new PartitionFn<Student>() {
  9.         public int partitionFor(Student student, int numPartitions) {
  10.             return student.getPercentile()  // 0..99
  11.                  * numPartitions / 100;
  12.         }}));

  13. // You can extract each partition from the PCollectionList using the get method, as follows:
  14. PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
复制代码

12
ReneeBK 发表于 2017-3-11 04:33:47
  1. Passing side inputs to ParDo:

  2.   // Pass side inputs to your ParDo transform by invoking .withSideInputs.
  3.   // Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.

  4.   // The input PCollection to ParDo.
  5.   PCollection<String> words = ...;

  6.   // A PCollection of word lengths that we'll combine into a single value.
  7.   PCollection<Integer> wordLengths = ...; // Singleton PCollection

  8.   // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  9.   final PCollectionView<Integer> maxWordLengthCutOffView =
  10.      wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());


  11.   // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
  12.   PCollection<String> wordsBelowCutOff =
  13.   words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
  14.                     .of(new DoFn<String, String>() {
  15.       public void processElement(ProcessContext c) {
  16.         String word = c.element();
  17.         // In our DoFn, access the side input.
  18.         int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
  19.         if (word.length() <= lengthCutOff) {
  20.           c.output(word);
  21.         }
  22.   }}));
复制代码

13
钱学森64 发表于 2017-3-11 10:33:02
谢谢分享

14
钱学森64 发表于 2017-3-11 14:31:28
谢谢分享

15
franky_sas 发表于 2017-3-11 22:13:35
Thanks.

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-26 21:23