楼主: ReneeBK
958 0

[Philipp Wagner]Complex Event Processing with Apache Flink [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.6937
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2017-5-17 00:58:27 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. Complex Event Processing with Apache Flink
  2. By Philipp Wagner | July 10, 2016

  3. In this article I want to show you how to work with the Complex Event Processing (CEP) engine of Apache Flink.

  4. The Apache Flink documentation describes FlinkCEP as:

  5. FlinkCEP is the complex event processing library for Flink. It allows you to easily detect complex event patterns in a stream of endless data. Complex events can then be constructed from matching sequences. This gives you the opportunity to quickly get hold of what's really important in your data.

  6. What we are going to build

  7. Imagine we are designing an application to generate warnings based on certain weather events.

  8. The application should generate weather warnings from a Stream of incoming measurements:

  9. Extreme Cold (less than -46 °C for three days)
  10. Severe Heat (above 30 °C for two days)
  11. Excessive Heat (above 41°C for two days)
  12. High Wind (wind speed between 39 mph and 110 mph)
  13. Extreme Wind (wind speed above 110 mph)
  14. Source Code

  15. You can find the full source code for the example in my git repository at:

  16. https://github.com/bytefish/FlinkExperiments
  17. Warnings and Patterns

  18. First of all we are designing the model for Warnings and their associated patterns.

  19. All warnings in the application derive from the marker Interface IWarning.

  20. // Copyright (c) Philipp Wagner. All rights reserved.
  21. // Licensed under the MIT license. See LICENSE file in the project root for full license information.

  22. package app.cep.model;

  23. /**
  24. * Marker interface used for Warnings.
  25. */
  26. public interface IWarning {

  27. }
  28. A warning is always generated by a certain pattern, so we create an interface IWarningPattern for it. The actual patterns for Apache Flink will be defined with the Pattern API.

  29. Once a pattern has been matched, Apache Flink emits a Map<String, TEventType> to the environment, which contains the names and events of the match. So implementations of the IWarningPattern also define how to map between the Apache Flink result and a certain warning.

  30. And finally to simplify reflection when building the Apache Flink stream processing pipeline, the IWarningPattern also returns the type of the warning.

  31. // Copyright (c) Philipp Wagner. All rights reserved.
  32. // Licensed under the MIT license. See LICENSE file in the project root for full license information.

  33. package app.cep.model;

  34. import org.apache.flink.cep.pattern.Pattern;

  35. import java.io.Serializable;
  36. import java.util.Map;

  37. /**
  38. * A Warning Pattern describes the pattern of a Warning, which is triggered by an Event.
  39. *
  40. * @param <TEventType> Event Type
  41. * @param <TWarningType> Warning Type
  42. */
  43. public interface IWarningPattern<TEventType, TWarningType extends IWarning> extends Serializable {

  44.     /**
  45.      * Implements the mapping between the pattern matching result and the warning.
  46.      *
  47.      * @param pattern Pattern, which has been matched by Apache Flink.
  48.      * @return The warning created from the given match result.
  49.      */
  50.     TWarningType create(Map<String, TEventType> pattern);

  51.     /**
  52.      * Implementes the Apache Flink CEP Event Pattern which triggers a warning.
  53.      *
  54.      * @return The Apache Flink CEP Pattern definition.
  55.      */
  56.     Pattern<TEventType, ?> getEventPattern();

  57.     /**
  58.      * Returns the Warning Class for simplifying reflection.
  59.      *
  60.      * @return Class Type of the Warning.
  61.      */
  62.     Class<TWarningType> getWarningTargetType();

  63. }
  64. Excessive Heat Warning

  65. Now we can implement the weather warnings and their patterns. The patterns are highly simplified in this article.

  66. One of the warnings could be a warning for Excessive Heat, which is described on Wikipedia as:

  67. Excessive Heat Warning – Extreme Heat Index (HI) values forecast to meet or exceed locally defined warning criteria for at least two days. Specific criteria varies among local Weather Forecast Offices, due to climate variability and the effect of excessive heat on the local population.

  68. Typical HI values are maximum daytime temperatures above 105 to 110 °F (41 to 43 °C) and minimum nighttime temperatures above 75 °F (24 °C).

  69. Warning Model

  70. The warning should be issued if we expect daytime temperatures above 41 °C for at least two days. So the ExcessiveHeatWarning class takes two LocalWeatherData measurements, and also provides a short summary in its toString method.

  71. // Copyright (c) Philipp Wagner. All rights reserved.
  72. // Licensed under the MIT license. See LICENSE file in the project root for full license information.

  73. package app.cep.model.warnings.temperature;

  74. import app.cep.model.IWarning;
  75. import model.LocalWeatherData;

  76. public class ExcessiveHeatWarning implements IWarning {

  77.     private final LocalWeatherData localWeatherData0;
  78.     private final LocalWeatherData localWeatherData1;

  79.     public ExcessiveHeatWarning(LocalWeatherData localWeatherData0, LocalWeatherData localWeatherData1) {
  80.         this.localWeatherData0 = localWeatherData0;
  81.         this.localWeatherData1 = localWeatherData1;
  82.     }

  83.     public LocalWeatherData getLocalWeatherData0() {
  84.         return localWeatherData0;
  85.     }

  86.     public LocalWeatherData getLocalWeatherData1() {
  87.         return localWeatherData1;
  88.     }

  89.     @Override
  90.     public String toString() {
  91.         return String.format("ExcessiveHeatWarning (WBAN = %s, First Measurement = (%s), Second Measurement = (%s))",
  92.                 localWeatherData0.getStation().getWban(),
  93.                 getEventSummary(localWeatherData0),
  94.                 getEventSummary(localWeatherData1));
  95.     }

  96.     private String getEventSummary(LocalWeatherData localWeatherData) {

  97.         return String.format("Date = %s, Time = %s, Temperature = %f",
  98.                 localWeatherData.getDate(), localWeatherData.getTime(), localWeatherData.getTemperature());
  99.     }
  100. }
  101. Warning Pattern

  102. Now comes the interesting part, the Pattern. The ExcessiveHeatWarningPattern implements the IWarningPattern interface and uses the Pattern API to define the matching pattern. You can see, that we are using strict contiguity for the events, using the the next operator. The events should occur for the maximum temperature of 2 days, so we expect these events to be within 2 days.

  103. // Copyright (c) Philipp Wagner. All rights reserved.
  104. // Licensed under the MIT license. See LICENSE file in the project root for full license information.

  105. package app.cep.model.patterns.temperature;

  106. import app.cep.model.IWarningPattern;
  107. import app.cep.model.warnings.temperature.ExcessiveHeatWarning;
  108. import model.LocalWeatherData;
  109. import org.apache.flink.cep.pattern.Pattern;
  110. import org.apache.flink.streaming.api.windowing.time.Time;

  111. import java.util.Map;

  112. public class ExcessiveHeatWarningPattern implements IWarningPattern<LocalWeatherData, ExcessiveHeatWarning> {

  113.     public ExcessiveHeatWarningPattern() {}

  114.     @Override
  115.     public ExcessiveHeatWarning create(Map<String, LocalWeatherData> pattern) {
  116.         LocalWeatherData first = pattern.get("First Event");
  117.         LocalWeatherData second = pattern.get("Second Event");

  118.         return new ExcessiveHeatWarning(first, second);
  119.     }

  120.     @Override
  121.     public Pattern<LocalWeatherData, ?> getEventPattern() {
  122.         return Pattern
  123.                 .<LocalWeatherData>begin("First Event")
  124.                 .subtype(LocalWeatherData.class)
  125.                 .where(evt -> evt.getTemperature() >= 41.0f)
  126.                 .next("Second Event")
  127.                 .subtype(LocalWeatherData.class)
  128.                 .where(evt -> evt.getTemperature() >= 41.0f)
  129.                 .within(Time.days(2));
  130.     }

  131.     @Override
  132.     public Class<ExcessiveHeatWarning> getWarningTargetType() {
  133.         return ExcessiveHeatWarning.class;
  134.     }
  135. }
  136. Converting a Stream into a Stream of Warnings

  137. Now it's time to apply these patterns on a DataStream<TEventType>. In this example we are operating on the Stream of historical weather measurements, which have been used in previous articles. These historical values could easily be exchanged with forecasts, so it makes a nice example.

  138. I have written a method toWarningStream, which will take a DataStream<LocalWeatherData> and generate a DataStream with the warnings.

  139. // Copyright (c) Philipp Wagner. All rights reserved.
  140. // Licensed under the MIT license. See LICENSE file in the project root for full license information.

  141. package app.cep;

  142. import app.cep.model.IWarning;
  143. import app.cep.model.IWarningPattern;
  144. import app.cep.model.patterns.temperature.SevereHeatWarningPattern;
  145. import app.cep.model.warnings.temperature.SevereHeatWarning;
  146. import model.LocalWeatherData;
  147. import org.apache.flink.api.common.functions.FilterFunction;
  148. import org.apache.flink.api.java.functions.KeySelector;
  149. import org.apache.flink.api.java.typeutils.GenericTypeInfo;
  150. import org.apache.flink.cep.CEP;
  151. import org.apache.flink.cep.PatternSelectFunction;
  152. import org.apache.flink.cep.PatternStream;
  153. import org.apache.flink.streaming.api.TimeCharacteristic;
  154. import org.apache.flink.streaming.api.datastream.DataStream;
  155. import org.apache.flink.streaming.api.datastream.KeyedStream;
  156. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  157. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  158. import org.apache.flink.streaming.api.windowing.time.Time;
  159. import stream.sources.csv.LocalWeatherDataSourceFunction;
  160. import utils.DateUtilities;

  161. import java.time.ZoneOffset;
  162. import java.util.Date;
  163. import java.util.Map;

  164. public class WeatherDataComplexEventProcessingExample {

  165.     public static void main(String[] args) throws Exception {

  166.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  167.         // Use the Measurement Timestamp of the Event:
  168.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

  169.         // We are sequentially reading the historic data from a CSV file:
  170.         env.setParallelism(1);

  171.         // Path to read the CSV data from:
  172.         final String csvStationDataFilePath = "C:\\Users\\philipp\\Downloads\\csv\\201503station.txt";
  173.         final String csvLocalWeatherDataFilePath = "C:\\Users\\philipp\\Downloads\\csv\\201503hourly_sorted.txt";


  174.         // Add the CSV Data Source and assign the Measurement Timestamp:
  175.         DataStream<model.LocalWeatherData> localWeatherDataDataStream = env
  176.                 .addSource(new LocalWeatherDataSourceFunction(csvStationDataFilePath, csvLocalWeatherDataFilePath))
  177.                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LocalWeatherData>() {
  178.                     @Override
  179.                     public long extractAscendingTimestamp(LocalWeatherData localWeatherData) {
  180.                         Date measurementTime = DateUtilities.from(localWeatherData.getDate(), localWeatherData.getTime(), ZoneOffset.ofHours(0));

  181.                         return measurementTime.getTime();
  182.                     }
  183.                 });

  184.         // First build a KeyedStream over the Data with LocalWeather:
  185.         KeyedStream<LocalWeatherData, String> localWeatherDataByStation = localWeatherDataDataStream
  186.                 // Filte for Non-Null Temperature Values, because we might have missing data:
  187.                 .filter(new FilterFunction<LocalWeatherData>() {
  188.                     @Override
  189.                     public boolean filter(LocalWeatherData localWeatherData) throws Exception {
  190.                         return localWeatherData.getTemperature() != null;
  191.                     }
  192.                 })
  193.                 // Now create the keyed stream by the Station WBAN identifier:
  194.                 .keyBy(new KeySelector<LocalWeatherData, String>() {
  195.                     @Override
  196.                     public String getKey(LocalWeatherData localWeatherData) throws Exception {
  197.                         return localWeatherData.getStation().getWban();
  198.                     }
  199.                 });

  200.         // Now take the Maximum Temperature per day from the KeyedStream:
  201.         DataStream<LocalWeatherData> maxTemperaturePerDay =
  202.                 localWeatherDataByStation
  203.                         // Use non-overlapping tumbling window with 1 day length:
  204.                         .timeWindow(Time.days(1))
  205.                         // And use the maximum temperature:
  206.                         .maxBy("temperature");

  207.         // Now apply the SevereHeatWarningPattern on the Stream:
  208.         DataStream<SevereHeatWarning> warnings =  toWarningStream(maxTemperaturePerDay, new SevereHeatWarningPattern());

  209.         // Print the warning to the Console for now:
  210.         warnings.print();

  211.        // Finally execute the Stream:
  212.         env.execute("CEP Weather Warning Example");
  213.     }

  214.     private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
  215.         PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
  216.                 localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
  217.                     @Override
  218.                     public String getKey(LocalWeatherData localWeatherData) throws Exception {
  219.                         return localWeatherData.getStation().getWban();
  220.                     }
  221.                 }),
  222.                 warningPattern.getEventPattern());

  223.         DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
  224.             @Override
  225.             public TWarningType select(Map<String, LocalWeatherData> map) throws Exception {
  226.                 return warningPattern.create(map);
  227.             }
  228.         }, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));

  229.         return warnings;
  230.     }

  231. }
复制代码


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Processing processI Complex Philipp Process describes matching Complex article complex

本帖被以下文库推荐

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-29 06:19