In this article I want to show you how to work with the Complex Event Processing (CEP) engine of Apache Flink.
The Apache Flink documentation describes FlinkCEP as:
FlinkCEP is the complex event processing library for Flink. It allows you to easily detect complex event patterns in a stream of endless data. Complex events can then be constructed from matching sequences. This gives you the opportunity to quickly get hold of what's really important in your data.
What we are going to build
Imagine we are designing an application to generate warnings based on certain weather events.
The application should generate weather warnings from a Stream of incoming measurements:
Extreme Cold (less than -46 °C for three days)
Severe Heat (above 30 °C for two days)
Excessive Heat (above 41°C for two days)
High Wind (wind speed between 39 mph and 110 mph)
Extreme Wind (wind speed above 110 mph)
Source Code
You can find the full source code for the example in my git repository at:
https://github.com/bytefish/FlinkExperiments
Warnings and Patterns
First of all we are designing the model for Warnings and their associated patterns.
All warnings in the application derive from the marker Interface IWarning.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package app.cep.model;
/**
* Marker interface used for Warnings.
*/
public interface IWarning {
}
A warning is always generated by a certain pattern, so we create an interface IWarningPattern for it. The actual patterns for Apache Flink will be defined with the Pattern API.
Once a pattern has been matched, Apache Flink emits a Map<String, TEventType> to the environment, which contains the names and events of the match. So implementations of the IWarningPattern also define how to map between the Apache Flink result and a certain warning.
And finally to simplify reflection when building the Apache Flink stream processing pipeline, the IWarningPattern also returns the type of the warning.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package app.cep.model;
import org.apache.flink.cep.pattern.Pattern;
import java.io.Serializable;
import java.util.Map;
/**
* A Warning Pattern describes the pattern of a Warning, which is triggered by an Event.
*
* @param <TEventType> Event Type
* @param <TWarningType> Warning Type
*/
public interface IWarningPattern<TEventType, TWarningType extends IWarning> extends Serializable {
/**
* Implements the mapping between the pattern matching result and the warning.
*
* @param pattern Pattern, which has been matched by Apache Flink.
* @return The warning created from the given match result.
* Implementes the Apache Flink CEP Event Pattern which triggers a warning.
*
* @return The Apache Flink CEP Pattern definition.
*/
Pattern<TEventType, ?> getEventPattern();
/**
* Returns the Warning Class for simplifying reflection.
*
* @return Class Type of the Warning.
*/
Class<TWarningType> getWarningTargetType();
}
Excessive Heat Warning
Now we can implement the weather warnings and their patterns. The patterns are highly simplified in this article.
One of the warnings could be a warning for Excessive Heat, which is described on Wikipedia as:
Excessive Heat Warning – Extreme Heat Index (HI) values forecast to meet or exceed locally defined warning criteria for at least two days. Specific criteria varies among local Weather Forecast Offices, due to climate variability and the effect of excessive heat on the local population.
Typical HI values are maximum daytime temperatures above 105 to 110 °F (41 to 43 °C) and minimum nighttime temperatures above 75 °F (24 °C).
Warning Model
The warning should be issued if we expect daytime temperatures above 41 °C for at least two days. So the ExcessiveHeatWarning class takes two LocalWeatherData measurements, and also provides a short summary in its toString method.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package app.cep.model.warnings.temperature;
import app.cep.model.IWarning;
import model.LocalWeatherData;
public class ExcessiveHeatWarning implements IWarning {
private final LocalWeatherData localWeatherData0;
private final LocalWeatherData localWeatherData1;
public ExcessiveHeatWarning(LocalWeatherData localWeatherData0, LocalWeatherData localWeatherData1) {
this.localWeatherData0 = localWeatherData0;
this.localWeatherData1 = localWeatherData1;
}
public LocalWeatherData getLocalWeatherData0() {
return localWeatherData0;
}
public LocalWeatherData getLocalWeatherData1() {
return localWeatherData1;
}
@Override
public String toString() {
return String.format("ExcessiveHeatWarning (WBAN = %s, First Measurement = (%s), Second Measurement = (%s))",
Now comes the interesting part, the Pattern. The ExcessiveHeatWarningPattern implements the IWarningPattern interface and uses the Pattern API to define the matching pattern. You can see, that we are using strict contiguity for the events, using the the next operator. The events should occur for the maximum temperature of 2 days, so we expect these events to be within 2 days.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
public class ExcessiveHeatWarningPattern implements IWarningPattern<LocalWeatherData, ExcessiveHeatWarning> {
public ExcessiveHeatWarningPattern() {}
@Override
public ExcessiveHeatWarning create(Map<String, LocalWeatherData> pattern) {
LocalWeatherData first = pattern.get("First Event");
LocalWeatherData second = pattern.get("Second Event");
return new ExcessiveHeatWarning(first, second);
}
@Override
public Pattern<LocalWeatherData, ?> getEventPattern() {
return Pattern
.<LocalWeatherData>begin("First Event")
.subtype(LocalWeatherData.class)
.where(evt -> evt.getTemperature() >= 41.0f)
.next("Second Event")
.subtype(LocalWeatherData.class)
.where(evt -> evt.getTemperature() >= 41.0f)
.within(Time.days(2));
}
@Override
public Class<ExcessiveHeatWarning> getWarningTargetType() {
return ExcessiveHeatWarning.class;
}
}
Converting a Stream into a Stream of Warnings
Now it's time to apply these patterns on a DataStream<TEventType>. In this example we are operating on the Stream of historical weather measurements, which have been used in previous articles. These historical values could easily be exchanged with forecasts, so it makes a nice example.
I have written a method toWarningStream, which will take a DataStream<LocalWeatherData> and generate a DataStream with the warnings.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.