参考文献:
FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink.
It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.
一个或多个由简单事件构成的事件流通过简单的规则匹配,然后输出用户想得到的数据–满足规则的复杂事件。
The Pattern API
The pattern API allows you to define complex pattern sequences that you want to extract from your input stream.
Each pattern must have a unique name, which you use later to identify the matched events.
Pattern names CANNOT contain the character
":"
.
上图中,蓝色方框代表的是一个个单独的模式;浅黄色的椭圆代表的是这个模式上可以添加的属性,包括模式可以发生的循环次数,或者这个模式是贪婪的还是可选的;橘色的椭圆代表的是模式间的关系,定义了多个模式之间是怎么样串联起来的。通过定义模式,添加相应的属性,将多个模式串联起来三步,就可以构成了一个完整的Flink CEP程序。
下面是示例代码:
1 | pattern.next("start").where( |
定义模式主要有如下5个部分组成:
pattern:前一个模式
**next/followedBy/…**:开始一个新的模式
start:模式名称
where:模式的内容
filter:核心处理逻辑
Individual Patterns
A Pattern can be either a singleton or a looping pattern. Singleton patterns accept a single event, while looping patterns can accept more than one
Quantifiers
1 | // expecting 4 occurrences |
Combining Patterns
To apply them between consecutive patterns, you can use:
next()
, for strict,followedBy()
, for relaxed, andfollowedByAny()
, for non-deterministic relaxed contiguity.
or
notNext()
, if you do not want an event type to directly follow anothernotFollowedBy()
, if you do not want an event type to be anywhere between two other event types.
Detecting Patterns #
After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect potential matches. To run a stream of events against your pattern sequence, you have to create a PatternStream
. Given an input stream input
, a pattern pattern
and an optional comparator comparator
used to sort events with the same timestamp in case of EventTime or that arrived at the same moment, you create the PatternStream
by calling:
Java
1 | DataStream<Event> input = ... |
Scala
The input stream can be keyed or non-keyed depending on your use-case.
Selecting from Patterns
Once you have obtained a PatternStream
you can apply transformation to detected event sequences. The suggested way of doing that is by PatternProcessFunction
.
A PatternProcessFunction
has a processMatch
method which is called for each matching event sequence. It receives a match in the form of Map<String, List<IN>>
where the key is the name of each pattern in your pattern sequence and the value is a list of all accepted events for that pattern (IN
is the type of your input elements). The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each pattern is that when using looping patterns (e.g. oneToMany()
and times()
), more than one event may be accepted for a given pattern.
1 | class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> { |
Time in CEP library
Handling Lateness in Event Time
当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能够处理这些超时的部分匹配,select和flflatSelect API调用允许指定超时处理程序。
Flink CEP 开发流程:
DataSource 中的数据转换为 DataStream;
定义 Pattern,并将 DataStream 和 Pattern 组合转换为 PatternStream;
PatternStream 经过 select、process 等算子转换为 DataStraem;
再次转换的 DataStream 经过处理后,sink 到目标库。
1 | PatternStream<Event> patternStream = CEP.pattern(input, pattern); |
具体的例子见这篇文章中的部分模块,链接
本文链接: http://woaixiaoyuyu.github.io/2021/07/26/FlinkCEP%20-%20Complex%20event%20processing%20for%20Flink/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!