Flink 入门案例,对官方文档进行了梳理和部分翻译。
Flink DataStream API 编程指南
官方文档永远是最好的老师
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/overview/
In order to create your own Flink DataStream program, we encourage you to start with anatomy of a Flink Program and gradually add your own stream transformations. The remaining sections act as references for additional operations and advanced features.
DataStream API主要分为3块:DataSource、Transformation、Sink
- DataSource是程序的数据源输入,可以通过StreamExecutionEnvironment.addSource(sourceFuntion)为程序添加一个数据源
- Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,比如Map、FlatMap和Filter等操作
- Sink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。
案例一:Anatomy of a Flink Program
Flink programs look like regular programs that transform DataStreams. Each program consists of the same basic parts:
- Obtain an execution environment, 获取执行环境
- Load/create the initial data, 加载/创建初始数据
- Specify transformations on this data, 指定对该数据的转换
- Specify where to put the results of your computations, 指定计算结果存放的位置
- Trigger the program execution, 触发程序的执行
We will now give an overview of each of those steps, please refer to the respective sections for more details. Note that all core classes of the Java DataStream API can be found in org.apache.flink.streaming.api.
The StreamExecutionEnvironment
is the basis for all Flink programs. You can obtain one using these static methods on StreamExecutionEnvironment
:
1 | getExecutionEnvironment() |
Typically, you only need to use getExecutionEnvironment()
, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line, the Flink cluster manager will execute your main method and getExecutionEnvironment()
will return an execution environment for executing your program on a cluster.
平时我们直接使用getExecutionEnvironment()即可
For specifying data sources the execution environment has several methods to read from files using various methods: you can just read them line by line, as CSV files, or using any of the other provided sources. To just read a text file as a sequence of lines, you can use:
这里展示的是text文件如何读取
1 | final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
This will give you a DataStream on which you can then apply transformations to create new derived DataStreams.
You apply transformations by calling methods on DataStream with a transformation functions. For example, a map transformation looks like this:
1 | DataStream<String> input = ...; |
This will create a new DataStream by converting every String in the original collection to an Integer.
Once you have a DataStream containing your final results, you can write it to an outside system by creating a sink. These are just some example methods for creating a sink:
1 | writeAsText(String path) |
Once you specified the complete program you need to trigger the program execution by calling execute()
on the StreamExecutionEnvironment
. Depending on the type of the ExecutionEnvironment
the execution will be triggered on your local machine or submit your program for execution on a cluster.
The execute()
method will wait for the job to finish and then return a JobExecutionResult
, this contains execution times and accumulator results.
If you don’t want to wait for the job to finish, you can trigger asynchronous job execution by calling executeAysnc()
on the StreamExecutionEnvironment
. It will return a JobClient
with which you can communicate with the job you just submitted. For instance, here is how to implement the semantics of execute()
by using executeAsync()
.
如果你不想等待作业完成,你可以通过调用StreamExecutionEnvironment的executeAysnc()来触发异步作业的执行。它将返回一个JobClient,你可以用它与你刚刚提交的作业进行通信。例如,这里是如何通过使用executeAsync()来实现execute()的语义。
1 | final JobClient jobClient = env.executeAsync(); |
That last part about program execution is crucial to understanding when and how Flink operations are executed. All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute()
call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment.
最后关于程序执行的部分对于理解Flink操作何时以及如何执行至关重要。所有的Flink程序都是懒散地执行的。当程序的主方法被执行时,数据加载和转换不会直接发生。相反,每个操作都被创建并添加到数据流图中。当执行环境上的execute()调用明确地触发执行时,这些操作才真正被执行。程序是在本地还是在集群上执行,取决于执行环境的类型。
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
改进后的完整程序如下
1 | public class Sample { |
案例二:统计单词个数的案例
这个案例很经典,几乎所有大数据的都会使用。
The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.
1 | public class WordCount { |
To run the example program, start the input stream with netcat first from a terminal:
1 | nc -lk 9999 |
可以从结果看出,准确的统计出五秒内的单词数量,且只统计五秒内的单词数量,之前时间窗口的单词不会统计。
这里的TumblingProcessingTimeWindows先不深究具体是什么,当作一个时间窗口即可,目前猜测是一种滚动广口,然后sum(1)中的参数1代表的就是sum第二个字段,keyBy(中的f0)就代表按照第一个字段聚合。
Data Sources
1)基于文件
readTextFile(path)
- Reads text files, i.e. files that respect theTextInputFormat
specification, line-by-line and returns them as Strings. 逐行读取文本文件,即遵守TextInputFormat规范的文件,并将其作为字符串返回。readFile(fileInputFormat, path)
- Reads (once) files as dictated by the specified file input format. 读取(一次)由指定的文件输入格式所决定的文件。readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- This is the method called internally by the two previous ones. It reads files in thepath
based on the givenfileInputFormat
. Depending on the providedwatchType
, this source may periodically monitor (everyinterval
ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY
), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE
). Using thepathFilter
, the user can further exclude files from being processed. 暂时用不到,前两个够了。
2)基于Socket
socketTextStream
从Socket中读取数据,元素可以通过一个分隔符分开,之前的例子已经使用过了。
3)基于集合
fromCollection(Collection)
通过Java的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的
如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用):
- 该类是共有且独立的(没有非静态内部类)
- 该类有共有的无参构造方法
- 类(及父类)中所有的不被static、transient修饰的属性要么有公有的(且不被fifinal修饰),要么是包含共有的getter和setter方法,这些方法遵循java bean命名规范。
4)自定义输入
可以使用StreamExecutionEnvironment.addSource(sourceFunction)将一个流式数据源加到程序中。
Flink提供了许多预先实现的源函数,但是也可以编写自己的自定义源,方法是为非并行源implements SourceFunction,或者为并行源 implements ParallelSourceFunction接口,或者extends RichParallelSourceFunction。
Flink也提供了一批内置的Connector(连接器),如下表列了几个主要的
连接器 | 是否提供Source支持 | 是否提供Sink支持 |
---|---|---|
Apache Kafka | yes | yes |
ElasticSearch | no | yes |
HDFS | no | yes |
Twitter Streaming PI | yes | no |
DataStream Transformations
数据流转换 #
把官方文档所有的算子都列了一遍,注意一些细节就好。
Map #
DataStream → DataStream #
Takes one element and produces one element. A map function that doubles the values of the input stream:
1 | DataStream<Integer> dataStream = //... |
FlatMap #
DataStream → DataStream #
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
1 | dataStream.flatMap(new FlatMapFunction<String, String>() { |
区别就在于返回的值不是一一对应的。
Filter #
DataStream → DataStream #
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
1 | dataStream.filter(new FilterFunction<Integer>() { |
保留判断条件返回true的值。
KeyBy #
DataStream → KeyedStream #
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.
1 | dataStream.keyBy(value -> value.getSomeKey()); |
A type cannot be a key if:
- it is a POJO type but does not override the
hashCode()
method and relies on theObject.hashCode()
implementation.- it is an array of any type.
需要注意哪些元素是可以keyBy的。
Reduce #
KeyedStream → DataStream #
A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
1 | keyedStream.reduce(new ReduceFunction<Integer>() { |
Window #
KeyedStream → WindowedStream #
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
1 | dataStream |
WindowAll #
DataStreamStream → AllWindowedStream #
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
1 | dataStream |
在keyby后数据分流,window是把不同的key分开聚合成窗口,而windowall则把所有的key都聚合起来所以windowall的并行度只能为1,而window可以有多个并行度。
Window Apply #
WindowedStream → DataStream #
AllWindowedStream → DataStream #
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.
If you are using a windowAll transformation, you need to use an
AllWindowFunction
instead.
1 | windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { |
WindowReduce #
WindowedStream → DataStream #
Applies a functional reduce function to the window and returns the reduced value.
1 | windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { |
Union #
DataStream → DataStream #
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
1 | dataStream.union(otherStream1, otherStream2, ...); |
本质上就是取并集且不去重,但是如果数据和自己本身作交集,每个元素就会出现两次。
Window Join #
DataStream,DataStream → DataStream #
Join two data streams on a given key and a common window.
1 | dataStream.join(otherStream) |
取交集。
按照相同的Key合并两个数据集input1.join(input2).where(0).equalTo(1)。
同时也可以选择进行合并的时候的策略, 是分区还是广播, 是基于排序的算法还是基于哈希的算法input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(1)。
Interval Join #
KeyedStream,KeyedStream → DataStream #
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
.
1 | // this will join the two streams so that |
Window CoGroup #
DataStream,DataStream → DataStream #
Cogroups two data streams on a given key and a common window.
1 | dataStream.coGroup(otherStream) |
在一个给定的键和一个共同的窗口上对两个数据流进行分组。
Connect #
DataStream,DataStream → ConnectedStream #
“Connects” two data streams retaining their types. Connect allowing for shared state between the two streams.
1 | DataStream<Integer> someStream = //... |
“连接 “两个数据流,保留其类型。连接允许两个数据流之间共享状态。
CoMap, CoFlatMap #
ConnectedStream → DataStream #
Similar to map and flatMap on a connected data stream
1 | connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { |
针对ConnectedStream的map和flatMap操作。
Iterate #
DataStream → IterativeStream → ConnectedStream #
Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.
基本概念:在流中创建“反馈(feedback)”循环,通过将一个算子的输出重定向到某个先前的算子。这对于定义不断更新模型的算法特别有用。
迭代的数据流向:DataStream → IterativeStream → DataStream
以下代码以流开始并连续应用迭代体。大于0的元素将被发送回反馈(feedback)通道,继续迭代,其余元素将向下游转发,离开迭代。
1 | IterativeStream<Long> iteration = initialStream.iterate(); |
物理分区 #
Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。
Custom Partitioning #
DataStream → DataStream #
Uses a user-defined Partitioner to select the target task for each element.
1 | dataStream.partitionCustom(partitioner, "someKey"); |
Random Partitioning #
DataStream → DataStream #
Partitions elements randomly according to a uniform distribution.
根据均匀分布随机地划分元素。
1 | dataStream.shuffle(); |
Rescaling #
DataStream → DataStream #
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don’t want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.
The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.
In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.
Please see this figure for a visualization of the connection pattern in the above example:
将元素以轮回方式分配给下游操作的子集。如果你想拥有这样的管道,例如,你从一个源的每个并行实例扇出到几个映射器的子集,以分配负载,但不想要rebalance()会产生的完全再平衡,那么这很有用。这将只需要本地数据传输,而不是通过网络传输数据,这取决于其他配置值,如任务管理器的槽数。
上游操作发送元素的下游操作的子集取决于上游和下游操作的并行程度。例如,如果上游操作的平行度为2,而下游操作的平行度为6,那么一个上游操作将把元素分配给三个下游操作,而另一个上游操作将分配给其他三个下游操作。另一方面,如果下游操作有平行度2,而上游操作有平行度6,那么三个上游操作将分配给一个下游操作,而其他三个上游操作将分配给另一个下游操作。
在不同的平行度不是彼此的倍数的情况下,一个或几个下游操作将有来自上游操作的不同数量的输入。
1 | dataStream.rescale(); |
Broadcasting #
DataStream → DataStream #
Broadcasts elements to every partition.
1 | dataStream.broadcast(); |
算子链和资源组 #
将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:
如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()
。下列方法还提供了更细粒度的控制。需要注 意的是, 这些方法只能在 DataStream
转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(...).startNewChain()
这样调用,而不能 someStream.startNewChain()这样。
一个资源组对应着 Flink 中的一个 slot 槽,更多细节请看slots 槽。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。
Start New Chain #
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
1 | someStream.filter(...).map(...).startNewChain().map(...); |
Disable Chaining #
Do not chain the map operator.
1 | someStream.map(...).disableChaining(); |
Set Slot Sharing Group #
Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don’t have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is “default”, operations can explicitly be put into this group by calling slotSharingGroup(“default”).
设置一个操作的槽位共享组。Flink会把具有相同槽位共享组的操作放入同一个槽位,而把不具有槽位共享组的操作放在其他槽位。这可以用来隔离槽。如果所有的输入操作都在同一个槽共享组中,那么槽共享组就会从输入操作中继承下来。默认的槽共享组的名称是 “default”,操作可以通过调用slotSharingGroup(“default”)明确地放入这个组。
1 | someStream.filter(...).slotSharingGroup("name"); |
Data Sinks
Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示
writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法
print()/pringToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
writeToSocket
- Writes elements to a socket according to aSerializationSchema
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions. 调用一个自定义的sink函数。Flink捆绑了与其他系统(如Apache Kafka)的连接器,这些连接器被实现为sink函数。Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持
Iterations
之前提到过相关的算子,这是一个比较重要的概念,举个例子来进一步了解。
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
例如,这里有一个程序,从一系列的整数中连续减去1,直到达到0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class IterationTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> longDataStreamSource = env.generateSequence(0, 100);
// define an IterativeStream
IterativeStream<Long> iterate = longDataStreamSource.iterate();
// minus 1
SingleOutputStreamOperator<Long> map = iterate.map(new MapFunction<Long, Long>() {
public Long map(Long aLong) throws Exception {
return aLong - 1;
}
});
// filter
SingleOutputStreamOperator<Long> filter = map.filter(new FilterFunction<Long>() {
public boolean filter(Long aLong) throws Exception {
return aLong > 0;
}
});
// To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream.
// The DataStream given to the closeWith function will be fed back to the iteration head.
// A common pattern is to use a filter to separate the part of the stream that is fed back, and the part of the stream which is propagated forward.
iterate.closeWith(filter);
SingleOutputStreamOperator<Long> filter1 = map.filter(new FilterFunction<Long>() {
public boolean filter(Long aLong) throws Exception {
return aLong <= 0;
}
});
filter1.print();
env.execute();
}
}可以看到我们对数据流map进行了分流,大于0的不断重复减一,否则则输出。
案例三:Filtering a Stream (Ride Cleansing)
什么能被转化成流?
Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有
- 基本类型,即 String、Long、Integer、Boolean、Array
- 复合类型:Tuples、POJOs 和 Scala case classes
而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。
对于 Java,Flink 自带有 Tuple0
到 Tuple25
类型。
1 | Tuple2<String, Integer> person = Tuple2.of("Fred", 35); |
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):
- 该类是公有且独立的(没有非静态内部类)
- 该类有公有的无参构造函数
- 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
示例:
1 | public class Person { |
这里其实对于我们类的要求比较高,不能满足序列化的要求的自定义类,必然会引起错误,需要注意。
Stream 执行环境
每个 Flink 应用都需要有执行环境,在该示例中为 env
。流式应用需要用到 StreamExecutionEnvironment
。
DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment
。当调用 env.execute()
时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
注意,如果没有调用 execute(),应用就不会运行。
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。
一个例子:过滤后只包含成年人
这里例子主要是为了体现pojo
1 | public class filter_people { |
Ride Cleansing
The task of the “Taxi Ride Cleansing” exercise is to cleanse a stream of TaxiRide events by removing events that start or end outside of New York City.
The GeoUtils
utility class provides a static method isInNYC(float lon, float lat)
to check if a location is within the NYC area.
https://www.jianshu.com/p/50561a8322f7
其实关键就是一个isNYC,具体数据已经下载不下来了,但是结合别人之前的博文其实可以了解到只是个简单的fliter。
1 | public class RideCleansingExercise extends ExerciseBase { |
流式分析
Event Time and Watermarks
概要
Flink 明确支持以下三种时间语义:
- 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间
- 摄取时间(ingestion time): Flink 读取事件时记录的时间
- 处理时间(processing time): Flink pipeline 中具体算子处理事件的时间
为了获得可重现的结果,例如在计算过去的特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数据或者测试新代码变得异常困难。
在流处理中,主要有两个时间概念
- 事件时间,即事件实际发生的时间。更准确地说,每一个事件都有一个与它相关的时间戳,并且时间戳是数据记录的一部分(比如手机或者服务器的记录)。事件时间其实就是时间戳。
- 处理时间,即事件被处理的时间。处理时间其实就是处理事件的机器所测量的时间。
- 通常还有第 3 个时间概念,即摄取时间,也叫作进入时间。它指的是事件进入流处理框架的时间。缺乏真实事件时间的数据会被流处理器附上时间戳,即流处理器第一次看到它的时间(这个操作由 source 函数完成,它是程序的第一个处理节点)。
Windows
在本节中,我们将学习:
- 如何使用窗口来计算无界流上的聚合,
- Flink 支持哪种类型的窗口,以及
- 如何使用窗口聚合来实现 DataStream 程序
概要
我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析:
- 每分钟的浏览量
- 每位用户每周的会话数
- 每个传感器每分钟的最高温度
用 Flink 计算窗口分析取决于两个主要的抽象操作:Window Assigners,将事件分配给窗口(根据需要创建新的窗口对象),以及 Window Functions,处理窗口内的数据。
Flink 的窗口 API 还具有 Triggers 和 Evictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。
举一个简单的例子,我们一般这样使用键控事件流(基于 key 分组的输入事件流):
1 | stream. |
您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 并行 处理。
1 | stream. |
窗口分配器
Flink 有一些内置的窗口分配器,如下所示:
通过一些示例来展示关于这些窗口如何使用,或者如何区分它们:
- 滚动时间窗口
- 每分钟页面浏览量
TumblingEventTimeWindows.of(Time.minutes(1))
- 滑动时间窗口
- 每10秒钟计算前1分钟的页面浏览量
SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
- 会话窗口
- 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
EventTimeSessionWindows.withGap(Time.minutes(30))
以下都是一些可以使用的间隔时间 Time.milliseconds(n)
, Time.seconds(n)
, Time.minutes(n)
, Time.hours(n)
, 和 Time.days(n)
。
基于时间的窗口分配器(包括会话时间)既可以处理 事件时间
,也可以处理 处理时间
。这两种基于时间的处理没有哪一个更好,我们必须折衷。使用 处理时间
,我们必须接受以下限制:
- 无法正确处理历史数据,
- 无法正确处理超过最大无序边界的数据,
- 结果将是不确定的,
但是有自己的优势,较低的延迟。
使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。
我们可能在有些场景下,想使用全局 window assigner 将每个事件(相同的 key)都分配给某一个指定的全局窗口。 很多情况下,一个比较好的建议是使用 ProcessFunction
,具体介绍在这里。
窗口应用函数
我们有三种最基本的操作窗口内的事件的选项:
- 像批量处理,
ProcessWindowFunction
会缓存Iterable
和窗口内容,供接下来全量计算; - 或者像流处理,每一次有事件被分配到窗口时,都会调用
ReduceFunction
或者AggregateFunction
来增量计算; - 或者结合两者,通过
ReduceFunction
或者AggregateFunction
预聚合的增量计算结果在触发窗口时, 提供给ProcessWindowFunction
做全量计算。
接下来展示一段 1 和 3 的示例,每一个实现都是计算传感器的最大值。在每一个一分钟大小的事件时间窗口内, 生成一个包含 (key,end-of-window-timestamp, max_value)
的一组结果。
ProcessWindowFunction 示例
1 | DataStream<SensorReading> input = ... |
在当前实现中有一些值得关注的地方:
- Flink 会缓存所有分配给窗口的事件流,直到触发窗口为止。这个操作可能是相当昂贵的。
- Flink 会传递给
ProcessWindowFunction
一个Context
对象,这个对象内包含了一些窗口信息。Context
接口 展示大致如下:
1 | public abstract class Context implements java.io.Serializable { |
windowState
和 globalState
可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。
增量聚合示例
1 | DataStream<SensorReading> input = ... |
请注意 Iterable<SensorReading>
将只包含一个读数 – MyReducingMax
计算出的预先汇总的最大值。
晚到的事件
默认场景下,超过最大无序边界的事件会被删除,但是 Flink 给了我们两个选择去控制这些事件。
您可以使用一种称为旁路输出 的机制来安排将要删除的事件收集到侧输出流中,这里是一个示例:
1 | OutputTag<Event> lateTag = new OutputTag<Event>("late"){}; |
我们还可以指定 允许的延迟(allowed lateness) 的间隔,在这个间隔时间内,延迟的事件将会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为 late firing )。
默认情况下,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。
举例说明:
1 | stream. |
当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。
数据管道 & ETL
有状态的转换
Flink 为什么要参与状态管理?
在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:
- 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
- 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
- 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
- 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
- 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。
在本节中你将学习如何使用 Flink 的 API 来管理 keyed state。
Rich Functions
至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunction
, MapFunction
,和 FlatMapFunction
。这些都是单一抽象方法模式。
对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction
,其中增加了以下方法,包括:
open(Configuration c)
close()
getRuntimeContext()
open()
仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
getRuntimeContext()
为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。
一个使用 Keyed State 的例子
在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为 Deduplicator
的 RichFlatMapFunction
:
1 | private static class Event { |
为了实现这个功能,Deduplicator
需要记录每个键是否已经有了相应的记录。它将通过使用 Flink 的 keyed state 接口来做这件事。
当你使用像这样的 keyed stream 的时候,Flink 会为每个状态中管理的条目维护一个键值存储。
Flink 支持几种不同方式的 keyed state,这个例子使用的是最简单的一个,叫做 ValueState
。意思是对于 每个键 ,Flink 将存储一个单一的对象 —— 在这个例子中,存储的是一个 Boolean
类型的对象。
我们的 Deduplicator
类有两个方法:open()
和 flatMap()
。open()
方法通过定义 ValueStateDescriptor<Boolean>
建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了信息(在这个例子中的 Types.BOOLEAN
)。
1 | public static class Deduplicator extends RichFlatMapFunction<Event, Event> { |
当 flatMap 方法调用 keyHasBeenSeen.value()
时,Flink 会在 当前键的上下文 中检索状态值,只有当状态为 null
时,才会输出当前事件。这种情况下,它同时也将更新 keyHasBeenSeen
为 true
。
这种访问和更新按键分区的状态的机制也许看上去很神奇,因为在 Deduplicator
的实现中,键不是明确可见的。当 Flink 运行时调用 RichFlatMapFunction
的 open
方法时, 是没有事件的,所以这个时候上下文中不含有任何键。但当它调用 flatMap
方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个 Flink 状态后端的入口。
部署在分布式集群时,将会有很多 Deduplicator
的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState
,比如
1 | ValueState<Boolean> keyHasBeenSeen; |
要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。
清理状态
上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink 会对每个使用过的键都存储一个 Boolean
类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调用 clear()
来实现,如下:
1 | keyHasBeenSeen.clear() |
对一个给定的键值,你也许想在它一段时间不使用后来做这件事。当学习 ProcessFunction
的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。
也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。
Connected Streams
相比于下面这种预先定义的转换:
有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。
connected stream 也可以被用来实现流的关联。
示例
在这个例子中,一个控制流是用来指定哪些词需要从 streamOfWords
里过滤掉的。 一个称为 ControlFunction
的 RichCoFlatMapFunction
作用于连接的流来实现这个功能。
1 | public static void main(String[] args) throws Exception { |
这里注意两个流只有键一致的时候才能连接。 keyBy
的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。
在这个例子中,两个流都是 DataStream<String>
类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction
在状态中存了一个布尔类型的变量,这个变量被两个流共享。
1 | public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> { |
RichCoFlatMapFunction
是一种可以被用于一对连接流的 FlatMapFunction
,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。
布尔变量 blocked
被用于记录在数据流 control
中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords
过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。
在 Flink 运行时中,flatMap1
和 flatMap2
在连接流有新元素到来时被调用 —— 在我们的例子中,control
流中的元素会进入 flatMap1
,streamOfWords
中的元素会进入 flatMap2
。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)
。
事件驱动应用
处理函数(Process Functions)
简介
ProcessFunction
将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction
十分相似, 但是增加了 Timer。
示例
如果你已经体验了 流式分析训练 的动手实践, 你应该记得,它是采用 TumblingEventTimeWindow
来计算每个小时内每个司机的小费总和, 像下面的示例这样:
1 | // 计算每个司机每小时的小费总和 |
使用 KeyedProcessFunction
去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:
1 | // 计算每个司机每小时的小费总和 |
在这个代码片段中,一个名为 PseudoWindow
的 KeyedProcessFunction
被应用于 KeyedStream, 其结果是一个 DataStream<Tuple3<Long, Long, Float>>
(与使用 Flink 内置时间窗口的实现生成的流相同)。
PseudoWindow
的总体轮廓示意如下:
1 | // 在时长跨度为一小时的窗口中计算每个司机的小费总和。 |
注意事项:
- 有几种类型的 ProcessFunctions – 不仅包括
KeyedProcessFunction
,还包括CoProcessFunctions
、BroadcastProcessFunctions
等. KeyedProcessFunction
是一种RichFunction
。作为RichFunction
,它可以访问使用 Managed Keyed State 所需的open
和getRuntimeContext
方法。- 有两个回调方法须要实现:
processElement
和onTimer
。每个输入事件都会调用processElement
方法; 当计时器触发时调用onTimer
。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElement
和onTimer
都提供了一个上下文对象,该对象可用于与TimerService
交互。 这两个回调还传递了一个可用于发出结果的Collector
。
open()
方法
1 | // 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。 |
由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState
来支持处理这一点,该 MapState
将每个窗口的结束时间戳映射到该窗口的小费总和。
processElement()
方法
1 | public void processElement( |
需要考虑的事项:
- 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
- 本例使用一个
MapState
,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。
onTimer()
方法
1 | public void onTimer( |
注意:
- 传递给
onTimer
的OnTimerContext context
可用于确定当前 key。 - 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用
onTimer
。 这个onTimer
方法从sumOfTips
中删除相关的条目,这样做的效果是不可能容纳延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。
本文链接: http://woaixiaoyuyu.github.io/2021/07/14/Flink%20%E5%85%A5%E9%97%A8%E6%A1%88%E4%BE%8B/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!