切勿浮沙筑高台
从S4到Storm(一):当分布式遇上实时计算
实时计算到底有多“实时”?
一般来说,我们的 MapReduce 都是定时执行的,比如每天运行一次,生成一个报表,或者频繁一点,每小时运行一次,计算上一个小时的点击率数据。但是,这个获得反馈数据的频率还是太慢了。每小时运行一次 MapReduce 程序,意味着我们的统计数据,平均要晚上半个小时。而半个小时里,低质量的广告或者搜索结果已经曝光了很多次了。举个例子,现在无论是什么样的社会热点新闻,很容易在微博热搜上出现。往往在新闻发生后的一两分钟,就已经有大量的搜索出现在微博热搜里了。如果我们要等待半个小时,才能统计到这些搜索,那么热搜功能就可以说形同虚设了。
采用频繁运行 MapReduce 程序的办法,我们至少会遇到两个问题。
一个是大量的“额外开销”。我们之前讲过,MapReduce 的额外开销不小,再小的任务也需要个十几秒到一分钟的运行时间。如果我们高频率每分钟运行 MapReduce 任务,那么“额外开销”占的时间比重和硬件资源会非常高,也很浪费。
二个是我们不得不让输入文件变得极其“碎片化”。无论是 GFS 还是 HDFS,都是把文件变成一个个 64MB 大小的 Block,然后 MapReduce 通过分布式并行读取来进行快速分析。但是如果我们需要每分钟都处理数据,那么对于输入的数据,就要按照分钟进行分割。每分钟我们都需要有很多个文件,分布到 GFS/HDFS 上不同的数据节点。这样,我们的文件都会变得很小,也就丧失了顺序读取大文件的性能优势。
其实,高频率地执行 MapReduce 还会有很多问题。而归根到底,是这两点:
- 第一,MapReduce 是为“高吞吐量”而设计的一个系统。在整个系统设计理念里,它没有考虑“低延时”这个需求。
- 第二,MapReduce 的数据,是一份“边界明确(bounded)”的数据。在进行数据处理之前,要处理的数据已经存放在存储系统上了。而我们想要进行的实时数据统计,想要处理的是一份“无边界(unbounded)”的数据,会不断地有新数据流入进来,永无停歇。
流式计算的逻辑模型
我们先来看一看 S4,是怎么抽象我们的流式计算的。S4 把所有的计算过程,都变成了一个个处理元素(Processing Element)对象,简称为 PE 对象。我这里特地加上了对象,就是因为在实现上,PE 就是一个面向对象编程里面一个实际的对象。
每一个 PE 对象,都有四部分要素组成,分别是:
- PE 本身的功能(functionality),这个体现为 PE 类里实现的业务逻辑函数,以及为了这个类配置的各种参数;
- PE 能够处理的事件类型(types of events);
- PE 能够处理的事件的键(keyed attribute);
- PE 处理的事件的键对应的值(value)。
对于流式的数据处理,就是由一个个 PE 组成的有向无环图(DAG)。有向无环图的起点,是一些特殊的被称为无键 PE(Keyless PE)的对象。这些对象的作用,其实就是接收外部发送来的事件流,这些外部发送过来的事件流,其实就是一条条的消息。
这些无键 PE 会解析对应的消息,变成一个个事件。然后给每个事件打上三个信息,分别是:
- 事件类型(Event Type);
- 事件的 Key;
- 事件的 Value。
然后可以把事件给发送出去。接着下游的其他 PE 对象,会根据自己定义的事件类型,和能处理的键来接收对应的消息,并且处理这个消息。如果当前系统里,没有对应的键的 PE,那么系统会创建一个新的 PE 对象。
处理数据的 PE 对象,可以选择处理完之后立刻发送一个新的事件出去;也可以选择在对象内部来维护一个状态,然后当处理了一定数量的消息之后,或者过了一个固定的事件间隔之后把消息发送出去。
最后,在整个有向无环图的终点,会有一系列的 PE 对象。这些对象,会把最终的计算结果发布(Publish)。这个发布的频率,也和其他 PE 发送消息的逻辑类似,可以在每收到一个事件就发送,也可以要求接收到一定数量的事件,或者每隔一个特定的时间间隔发送。
师从 MapReduce 的设计理念
其实 S4 的系统架构,和我们之前看过的 MapReduce 这样的框架一脉相承。PE 其实和 Map/Reduce 函数一样,只是一个抽象的概念。不过 S4 的系统设计,要更加激进一点,那就是 S4 选择了一个无中心的,完全对称的架构。
S4 和我们之前看过的所有系统都不一样,没有所谓的 Master 节点。如果一定要说有一个中心化的地方的话,S4 依赖于 Zookeeper,也就是一个类似于 Chubby 这样的分布式锁系统。S4 的所有服务器,都会作为一个处理节点(ProcessingNode),简称 PN 注册在 Zookeeper 上。具体如何分配负载,是由各个节点协商决定的,而不是由一个中心化的 Master 统一分配。
稍显“过时”的伸缩和容错能力
首先就是这里的海量对象的问题。由于每一个处理数据的 Key 都要是一个对象,系统里就会有海量的对象。而一个 Key 如果只出现一次,之后再也不出现了,也要占用内存。S4 对此的解决办法,是给 Key 设定 TTL,定期清理掉不需要的 Key。
其次,是 S4 里,没有时间窗口的概念。在我们进行实时数据处理的时候,我们需要统计的,常常是“过去一分钟的热搜”,或者“过去一小时的热搜”,这样有一个时间范围的数据。
第三,是 S4 的容错处理非常简单。S4 能够做到的容错,其实就是某一个计算节点挂掉了,我们重新再起一个计算节点承担它的工作。但是,原先节点里,所有 PE 维护的状态信息就都丢失了。我们既不知道目前的统计信息是什么,也不知道目前处理到哪些事件了。
最后一个问题,则是 S4 虽然是一个分布式系统,但是并不支持真正的动态扩容。在一开始论文的假设部分,就假设了运行中的集群不会增加或者减少节点。
从S4到Storm(二):位运算是个好东西
典型的 Master+Worker 架构
和 S4 不同,Storm 是一个典型的 Master+Worker 的分布式系统架构,并且将传送的消息和对应消息的处理逻辑做了分离。那么,接下来,我们就一起先来看一下 Storm 的整体架构。
基于 Topology 的逻辑模型
和 S4 类似,Storm 系统的抽象模型,也是一个有向无环图。在 Storm 里,这样一个有向无环图,叫做 Topology,也就是拓扑图。整个图里有这样几个元素:
- 首先是 Spouts,也就是数据源。Storm 并没有像 S4 一样,把一切东西都定义成 PE。Spout 负责从外部去读取数据或者接收数据。就和它的名字一样,Spout 好像一个出水管,一旦打开,就会源源不断地有外部的数据灌进来。在 S4 里,对应的就是无键 PE(Keyless PE)。
- 其次是 Tuple,也就是元组。它也是我们在 Topology 中,传输的所有的最小粒度的数据单元。一个 Tuple 是一个带命名的值的列表,你可以看成是一个个 KV 对,不过这个 Key 只是在定义 Tuple 的时候出现。但是在数据传输的时候,我们只需要传输对应的值。这个方式,其实有点像我们之前讲解过的 Thrift,字段的名称是定义在外部的,实际传输的时候只需要序号、类型和值。在 S4 里,Tuple 对应的就是事件(Event)。
- 然后是 Streams,也就是数据流,一个流包含了无限多个 Tuple 的序列,这些 Tuple 会被系统分布式地并行去处理。
- 最后就是 Bolts,也就是我们进行计算逻辑处理的地方。Bolt 可以处理任意数量的输入流,然后产生任意数量的输出流。对应地,我们要把计算结果写入到外部数据库,也是通过 Bolts 来进行处理。乍一看,Bolts 似乎很像 S4 里的 PE,用于处理对应的分布式计算逻辑,不过实际上,Bolts 和 PE 完全不一样。
Storm 的抽象模型里,和 S4 的最大不同就在 Bolts 上。S4 的 PE,不仅是一个功能逻辑的单元,也是一个 KV 对的数据。同样类型的事件下,所有相同的 Key 的数据,都会聚合到同一个 PE 下。这就使得整个系统里有大量的 PE 对象,也导致 S4 的整个系统有几个显著的设计问题。
- 首先就是内存占用和 GC 开销,大量的 PE 会占用大量的内存。
- 其次就是我们的业务逻辑代码里,混入了控制分布式数据分发的逻辑。比如sort n的n需要人为修改
而 Storm 的设计并不相同,Storm 里的 Bolt 更像是 MapReduce 里的 Map 或者 Reduce 函数。我们可以在 Topology 里面,去设置不同 Bolt 的并行度,以及设置数据流是如何分组的。但是,每个 Bolt 输出的 Tuple 本身,却不需要通过生成一个类似于(SortID, N)这样一个特殊的 Key,来定义下一层的 Bolt 的并行度。在 Storm 里面,对应的数据流可以进行很多种分组(Grouping)。
如果你对照着这里的示意图,可以看到在 Storm 下进行 Top K 的单词排序,两边的数据流向是不一样的,S4 里的一个 WordCountPE 的输出,只会给一个 SortPE;而 Storm 里的 WordCountBolt 的输出,会发送给多个不同的 SortCountBolt,因为同一个 WordCountBolt 下,会包含很多个不同的单词。
Master+Worker 的系统架构
我们说 Storm 的 Bolt 很像 MapReduce 的 Map 和 Reduce 函数,其实 Storm 本身的架构也和 MapReduce 非常相似。和我们上节课看过的无中心的 S4 不同,Storm 选择了一个典型的 Master+Worker 的架构设计。整个 Storm 集群里,也是由 Nimbus+Supervisor+Worker 这样三种类型的进程组成的。
首先是 Nimbus 进程,其实也就是 Storm 集群的 Master 节点。它的作用,类似于 Hadoop 里的 JobTracker,或者说 MapReduce 里的 Scheduler+Master,也就是负责资源的分配和任务的调度。
开发人员会直接提交一个 Topology 给 Master。这个 Topology,之前只是一个抽象的有向无环图。而在实际应用里,它就好像一个 MapReduce 的任务一样,是一个编译好的程序和对应的配置。只不过,MapReduce 的任务执行完了就结束了。而作为流式计算,Topology 这个任务如果我们不去终止它,它就会永不停歇地运行下去。
然后是 Supervisor 进程,这个类似于 Hadoop 里的 TaskTracker,也就是 MapReduce 里的 Worker。Supervisor 在每一个服务器上都会有一个,它本身不负责执行任务,但是会负责接收 Nimbus 分配的任务,然后管理本地的 Worker 进程,让 Worker 进程来实际执行任务。
最后是Worker 进程,一台服务器上会有多个 Worker 进程。Storm 是使用 Clojure 写的,跑在 JVM 上,所以每一个 Worker 进程就是一个独立的 JVM,Worker 里面还会通过 JVM 的 Executor 来维护一个线程池。然后实际的线程池里,会有很多个 Spout/Bolt 的任务。因为 Java 的 Executor 的实现里会复用线程,所以 Spout 和 Bolt 实际上会使用同一个线程。这个,也会大大减少整个系统的开销。
而把整个系统拆分成 Nimbus、Supervisor 和 Worker 三种进程,就使得 Storm 的容错能力也大大增强了。
Nimbus 和 Supervisor 之间,并不是直接通信的。因为如果这样的话,显然 Nimbus 会成为一个故障的“单点”。所以 Nimbus 是把对应的任务分配写到 Zookeeper 里,也就是一个类似于 Chubby 这样的分布式锁系统。所以我们的任务分配是持久化的,而且会由 Paxos 协议来保障容错能力。而 Supervisor 也是从 Zookeeper 里面,去读取对应的任务分配。
Nimbus 和 Supervisor 的职责都非常简单,Nimbus 只需要进行 Topology 的解析和任务调度,而 Supervisor 只需要接收任务,并且监控 Worker 进程是否存活。它们本身不处理数据,而且也不在内存里面保存数据。
Storm 的容错机制
相比于通过一个 RPC,消息队列有一个很大的优点,那就是高性能。上游节点不需要等待下游节点返回接收成功,就能发送下一条信息。不过,这也带来了一个问题,就是如果在消息发送之后,下游是否成功接收并处理了这条消息,上游是不知道的。可能因为网络超时、也可能因为下游节点的软硬件故障,在分布式系统里,“错误”是在所难免的。
而且,在流式数据处理里,我们可能不只有一层链路。就拿论文里的统计 Tweet 里的单词数量为例,我们先要从一个 TweetSpout 里,读取数据流里的 Tweet,随机发送给到一个 ParseTweetBolt,这个 Bolt 会解析 Tweet 成一个个单词,再发送给下游的多个 WordCountBolt。
Storm 选择的解决方案,是把从 Spout 发起的第一个 Tuple 作为一棵树的根。下游所有衍生出来发送的 Tuple,都是这棵树的一部分。任何一个 Tuple 处理失败或者超时了,那么就从 Spout 重新发送消息。
而要做到这一点,Storm 需要在系统里引入一个特殊的 Bolt,叫做 AckerBolt。Spout 发送出去的消息,同时会通知给到 AckerBolt。而 Bolt 一旦处理完根 Tuple 相关的消息,也会通知给到 Acker。
Bolt 会告诉 AckerBolt 两个信息,一个是我已经处理完了某一个 Tuple,另一个是这个 Tuple 衍生往下游的哪些 Tuple 我也已经发送出去了。这样,Acker 就有了一开始 Spout 发出的 Tuple 的整棵树的完整信息。等到最后一层的 Bolt 处理完对应的 Tuple,然后发送了对应的通知给到 AckerBolt,并且告诉它后面没有新的 Tuple 了,那么 AckerBolt 就知道,整棵 Tuple 树已经处理完成了。
Storm 采用了一个很巧妙的办法,那就是利用位运算里的异或(XOR)。Storm 给每一个发送出去的 Tuple 都会分配一个 64 位的 message id。当消息从 Spout 被发送出去的时候,Storm 会给 AckerBolt 发送这个 message-id,告诉它,你要开始追踪这个 Tuple 树了。Acker 里呢,则会维护一个 message-id 到校验码(checksum)的映射关系。这个校验码,一开始就是拿 0 和 message-id 去异或(XOR)一下。
而下游的每一个 Bolt,会处理完这个 Tuple 相关的消息,并且向外发送新的 Tuple。每个新发送的 Tuple 里,都需要带上根 Tuple 的 message-id。在新 Tuple 发送出去之后,Bolt 会通知 AckerBolt,通知的内容也很简单,也是一个根 message-id 到校验码的映射关系。
这里的校验码,就是把当前对外发送的所有消息的 message-id,和已经处理完的消息的 message-id 做一下异或。然后 AckerBolt 收到这个消息,会把收到的校验码,和本地的校验码也做一下异或,更新成最新的校验码。
所以,只要有 Tuple 还没有被 acking,我们的校验码就不会是 0,但是一旦所有的 Tuple 树上的 Tuple 都被 acking 了,那么这个校验码必然就是 0。
不过,需要注意的是,这个机制只能保障,Spout 发出来的 Tuple 至少被处理一次,也就是 At Least Once,但是它避免不了 Tuple 可能被重复处理。
Kafka(一):消息队列的新标准
Kafka 的系统架构
首先,我们仍然可以把 Kafka 看成是一个类似于 Scribe 这样的日志收集器。上游的应用服务器仍然会把日志发送给 Kafka 集群,但是在 Kafka 的下游,它不仅能把对应的数据,作为文件上传到 HDFS 上。同时,像 Storm 这样的流式数据处理系统,它的 Spout 会直接从 Kafka 里获取数据,而不是从 HDFS 上去读文件。这个时候,Kafka 其实变成了一个分布式的消息队列。
Kafka 的整个系统架构和概念并不复杂,和你日常见过的消息队列一样,它是一个典型的生产者(Producer)- 消费者(Consumer)模型。在 Kafka 里,有这样几个角色。
首先是 Producer,也就是日志的生产者,通常它就是我们前面的应用服务器。应用服务器会生成日志,作为生产者,把日志发送给到 Kafka 系统中去。
然后是 Broker,也就是我们实际 Kafka 的服务进程。因为为了容错和高可用,Kafka 是一个分布式的集群,所以会有很多台物理服务器,每台服务器上都会有对应的 Broker 的进程。Kafka 会对所有的消息,进行两种类型的分组。
- 第一种,是根据业务情况进行分组,在 Kafka 里,对应的就是 Topic(主题)这个概念。比如我们可能既有广告日志,又有搜索日志,两种日志的格式和用途都不一样,那么我们就会通过 Topic 区分开来。
- 第二种,是进行数据分区,这个和我们见过的其他分布式系统进行分区的原因是一样的。一方面,分区可以帮助我们水平扩展系统的处理能力;同一个 Topic 的日志,可以平均分配到多台物理服务器上,确保系统可以并行处理。另一方面,这也是一个有效的“容错”机制,一旦有某一个 Broker 所在的物理服务器出现了硬件故障,那么上游的 Producer,可以把日志发到其他的 Broker 上,来确保系统仍然可以正常运作。
最后是 Consumer,也就是实际去处理日志的消费者。我们去读取 Kafka 数据,把它放到 HDFS 上的程序,就是一个消费者。而我们去获取实时日志,进行分析的程序,也同样是一个消费者,比如一个已经提交运行的 Storm Topology。Kafka 对于它所处理的消息,是支持多个 Consumer 的
为了区分这两种“多个消费者”代表的不同含义,Kafka 把每一个用途的 Consumer 程序,称之为一个 Consumer Group。也就是说,Kafka 里,会有很多个不同的 Consumer Group,它们会根据自己的用途去消费相同的消息。而一个 Consumer Group 里,会有很多个 Consumer,不同 Consumer 之间分摊压力,会去消费不同的消息。
拉数据而不是推数据
但是,这个主动推送数据到下游的方案,其实有一个很严重的缺陷,那就是消息队列本身,需要维护下游是否已经成功处理消息这个状态。
事实上,不只是 Scribe 这样的日志收集器会遇到这个问题,传统的消息队列也会有类似的情况。传统的消息队列,通常会通过一个 message-id 来唯一标识一条消息,只有当下游的所有订阅了这个消息的消费者,处理完成之后,消息队列就认为这条消息被处理完成了,可以从当前的消息队列里面删除掉了。但是,这个机制也就意味着,这个消息队列在下游数据分析完成之前,需要一直存储着这些消息,等待下游的响应,会消耗大量的资源。
而 Kafka 则采用了一个完全不同的方式来设计整个系统,简单来说,就是两点:
- 第一点,是让所有的 Consumer 来“拉取”数据,而不是主动“推送”数据给到 Consumer。并且,Consumer 到底消费完了哪些数据,是由 Consumer 自己维护的,而不是由 Kafka 这个消息队列来进行维护。
- 第二点,是采用了一个非常简单的追加文件写的方式来直接作为我们的消息队列。在 Kafka 里,每一条消息并没有通过一个唯一的 message-id,来标识或者维护。整个消息队列也没有维护什么复杂的内存里的数据结构。下游的消费者,只需要维护一个此时它处理到的日志,在这个日志文件中的偏移量(offset)就好了。
然后,基于这两个设计思路,Kafka 做了一些简单的限制,那就是一个 consumer 总是顺序地去消费,来自一个特定分区(Partition)的消息。而一个 Partition 则是 Kafka 里面可以并行处理的最小单位,这就是说,一个 Partition 的数据,只会被一个 consumer 处理。
这样一来,整个 Kafka 的系统设计也一下子变得特别简单。所有的 Producer 生成消息,和 Consumer 消费消息,都变成了简单的顺序的文件读和文件写。而我们知道,硬盘的顺序读写的性能要远高于随机读写。
Kafka 的单个 Partition 的读写实现
在实际的实现上,Kafka 是这么做的。每一个 Topic 会有很多个 Partition,分布到不同的物理机器上。一个物理机上,可能会分配到多个 Partition。实际存储的时候,我们的一个 Partition 是一个逻辑上的日志文件。在物理上,这个日志文件会给实现成一组大小基本相同的 Segment 文件,比如每个 Segment 是 1GB 大小。每当有新消息从 Producer 发过来的时候,Broker 就会把消息追加写入到最后那个 Segment 文件里。
而为了性能考虑,Kafka 支持我们自己设置,是每次写入到把数据刷新到硬盘里,还是在写入了一定数量的日志或者经过一个固定的时间的时候,才把文件刷新到硬盘里。
Broker 会在内存里维护一个简单的索引,这个索引其实就是每个通过一个虚拟的偏移量,指向一个具体的 Segment 文件。那么在 Consumer 要消费数据的时候,就是根据 Consumer 本地维护的已经处理完的偏移量,在索引里找到实际的 Segment 文件,然后去读取数据就好了。
Kafka(二):从Lambda到Kappa,流批一体计算的起源
Kafka 的分布式系统的实现
首先,Kafka 系统并没有一个 Master 节点。不过,这一点倒是不让人意外,主要是 Kafka 的整体架构实在太简单了。我们在上一讲就看到了,单个的 Broker 节点,底层就是一堆顺序读写的文件。而要能够分布式地分摊压力,只需要用好 ZooKeeper 就好了。
每一个 Kafka 的 Broker 启动的时候,就会把自己注册到 ZooKeeper 上,注册信息自然是 Broker 的主机名和端口。在 ZooKeeper 上,Kafka 还会记录,这个 Broker 里包含了哪些主题(Topic)和哪些分区(Partition)。
而 ZooKeeper 本身提供的接口,则和我们之前讲解过的 Chubby 类似,是一个分布式锁。每一个 Kafka 的 Broker 都会把自己的信息像一个文件一样,写在一个 ZooKeeper 的目录下。另外 ZooKeeper 本身,也提供了一个监听 - 通知的机制。
高可用机制
在现实中,Kafka 是这么做的:
- 首先,为了让 Kafka 能够高可用,我们需要对于每一个分区都有多个副本,和 GFS 一样,Kafka 的默认参数选择了 3 个副本。
- 其次,这些副本中,有一个副本是 Leader,其余的副本是 Follower。我们的 Producer 写入数据的时候,只需要往 Leader 写入就好了。Leader 自然也就是将对应的数据,写入到本地的日志文件里。
- 然后,每一个 Follower 都会从 Leader 去拉取最新的数据,一旦 Follower 拉到数据之后,会向 Leader 发送一个 Ack 的消息。
- 我们可以设定,有多少个 Follower 成功拉取数据之后,就能认为 Producer 写入完成了。这个可以通过在发送的消息里,设定一个 acks 的字段来决定。如果 acks=0,那就是 Producer 的消息发送到 Broker 之后,不管数据是否刷新到本地硬盘,我们都认为写入已经完成了;而如果设定 acks=2,意味着除了 Leader 之外,至少还有一个 Follower 也把数据写入完成,并且返回 Leader 一个 Ack 消息之后,消息才写入完成。我们可以通过调整 acks 这个参数,来在数据的可用性和性能之间取得一个平衡。
负载均衡机制
Kafka 的 Consumer 一样会把自己“注册”到 ZooKeeper 上。在同一个 Consumer Group 下,一个 Partition 只会被一个 Consumer 消费,这个 Partition 和 Consumer 的映射关系,也会被记录在 ZooKeeper 里。这部分信息,被称之为“所有权注册表”。
而 Consumer 会不断处理 Partition 的数据,一旦某一段的数据被处理完了,对应这个 Partition 被处理到了哪个 Offset 的位置,也会被记录到 ZooKeeper 上。这样,即使我们的 Consumer 挂掉,由别的 Consumer 来接手后续的消息处理,它也可以知道从哪里做起。
那么在这个机制下,一旦我们针对 Broker 或者 Consumer 进行增减,Kafka 就会做一次数据“再平衡(Rebalance)”。所谓再平衡,就是把分区重新按照 Consumer 的数量进行分配,确保下游的负载是平均的。Kafka 的算法也非常简单,就是每当有 Broker 或者 Consumer 的数量发生变化的时候,会再平均分配一次。
而和 Storm 一样,本质上,Kafka 对于消息的处理也是“至少一次”的。如果消息成功处理完了,那么我们会通过更新 ZooKeeper 上记录的 Offset,来确认这一点。而如果在消息处理的过程中,Consumer 出现了任何故障,我们都需要从上一个 Offset 重新开始处理。这样,我们自然也就避免不了重复处理消息。
顺序保障机制
首先,是 Kafka 很难提供针对单条消息的事务机制。因为我们在 ZooKeeper 上保存的,是最新处理完的消息的一个 Offset,而不是哪些消息被处理完了、哪些消息没有被处理完这样的 message-id => status 的映射关系。所以,Consumer 没法说,我有一条新消息已经处理完了,但是还有一条旧消息还在处理中。而是只能按照消息在 Partition 中的偏移量,来顺序处理。
不过,对于快速统计实时的搜索点击率这样的统计分析类的需求来说,这些问题都不是问题。而 Kafka 的应用场景也主要在这里,而不是用来作为传统的消息队列,完成业务系统之间的异步通信。
数据处理的 Lambda 架构
首先,是我们的流式数据处理只能保障“至少一次(At Least Once)”的数据处理模式,而在批处理下,我们做到的是“正好一次(Exactly Once)”。也就意味着,批处理计算出来的数据是准确的,而流式处理计算的结果是有误差的。
其次,是当数据处理程序需要做修改的时候,批处理程序很容易修改,而流式处理程序则没有那么容易。
流式数据处理的性能压力
最常会发生的变更,既不来自于硬件故障导致的数据重复处理,也不是来自于业务需求变更导致我们需要修改程序。最常会发生的变更,来自于解决分析程序里的各种 Bug。在这种场景下,我们的输入数据不会发生变化,输出的表结构也不会发生变化。但是,我们可能需要反复修改数据处理程序,并且反复在同一份日志数据集上运行这个程序。
这样的程序运行场景,对于大数据的批处理来说,压力并不大,但是对于流式数据处理,一样会有大量重放日志的工作量。
Lambda 架构的基本思想
有鉴于此,Storm 的作者南森·马茨(Nathan Marz)提出了 Lamda 架构,把大数据的批处理和实时数据结合在一起,变成一个统一的架构。
Nathan 的思路是这样的,我们先不去看具体数据是通过什么计算框架来处理的,而是把整个数据处理流程抽象成 View = Query(Data) 这样一个函数。我们所有的报表、看板、数据分析结果,都是来自对于原始日志的分析。
所以,原始日志就是我们的主数据(Master Data),不管是 MapReduce 这样的大数据批处理,还是 Storm 这样的大数据流式处理,都是对于原始数据的一次查询(Query)。而这些计算结果,其实就是一个基于特定查询的视图(View)。
而对于实际数据分析系统的用户来说,其实他关心的既不是 Query 也不是 Master Data,而是一个个 View。那么,我们在系统的整体架构上,就只需要对这些用户暴露出 View,而不需要告诉他们,具体下面的 Query 和 Master Data 的细节就好了。这样,我们可以按照 Hadoop 和 Storm 本身合适的场景进行选择。
一方面,我们可以通过 Storm 进行实时的数据处理,能够尽快获得想要的报表和数据分析结果。另一方面,我们同样会定时运行 MapReduce 程序,获得更准确的数据结果。在 MapReduce 程序运行完之前,我们的分析决策基于 Storm 的实时计算结果;但是当 MapReduce 更准确的计算结果出来了,我们就可以拿这个结果替换掉之前的实时计算结果。
而对于外部用户来说,他们看到的始终是同一个视图,只是这个视图,会随着时间的变化不断修正数据结果罢了。
所以,Nathan Marz 总结的 Lambda 结构,是由这样几部分组成的:
- 第一部分是输入数据,也就是 Master Data,这部分也就是我们的原始日志。
- 然后是一个批处理层(Batch Layer)和一个实时处理层(Speed Layer),它们分别是一系列 MapReduce 的任务,和一组 Storm 的 Topology,获取相同的输入数据,然后各自计算出自己的计算结果。
- 最后是一个服务层(Serving Layer),通常体现为一个数据库。批处理层的计算结果和实时处理层的结果,会写入到这个数据库里。并且,后生成的批处理层的结果,会不断替代掉实时处理层的计算结果,也就是对于最终计算的数据进行修正。
可以看到,Lambda 架构很好地结合了 MapReduce 和 Storm 的优点。而这个 Lambda 结构,最终也变成了 Twitter 的一个开源项目 SummingBird。但是,这个 Lambda 架构也有一个显著的缺点,也就是什么事情都需要做两遍。
数据处理的 Kappa 架构
在有了 Kafka 之后,重放日志一下子变得简单了。因为我们所有的日志,都会在 Kafka 集群的本地硬盘上。而通过重放日志来重新进行数据计算,也只是设定一下新的分析程序在 ZooKeeper 上的 Offset 就好了。
Kappa 架构相比于 Lambda 架构,Kappa 架构去掉了 Lambda 架构的批处理层,而是在实时处理层,支持了多个视图版本。
我们之所以要有 View = Query(Data) 这么一个抽象,是因为我们的原始日志,也就是 Data 是不会变化的,而我们想要的 View 也不会变化。但是具体的 Query,可能会因为程序有 Bug 而比较频繁地被修改。
在 Kappa 架构下,如果要对 Query 进行修改,我们原来的实时处理层的代码可以先不用动,而是可以先部署一个新版本的代码,比如一个新的 Topology 上去。然后,我们会对这个 Topology 进行对应日志的重放,在服务层生成一份新的数据结果表,也就是视图的一个新的版本。
在日志重放完成之前,外部的用户仍然会查询旧的实时处理层产生的结果。而一旦日志重放完成,新的 Topology 能够赶上进度,处理到最新产生的日志,那么我们就可以让查询,切换到新的视图版本上来,旧的实时处理层的代码也就可以停掉了。
而随着 Kappa 架构的提出,大数据处理又开始迈入了一个新的阶段,也就是“流批一体”逐步进入主流的阶段。
Dataflow(一):正确性、容错和时间窗口
一个简单的流式数据处理系统
首先,前端的应用服务器,会把产生的广告日志发送给一个负载均衡。然后通过负载均衡,均匀而随机地发送给 Kafka 不同的 Broker 服务器。下游有一个 Storm 集群,里面有一个 Topology,同时完成了广告计费,以及广告的点击率统计的工作。
这个 Topology,就只有简单的两层。
- 第一层是一个 KafkaSpout,它会从 Kafka 拉取日志,然后解析并获取需要的字段,并向下游的 Bolt 进行数据分发。
- KafkaSpout 的每一条日志,都会发送两条消息给下游两种不同的 Bolt。一条发给 AdsCtrBolt,用来统计不同广告的点击率;另一条发给 ClientSpentBolt,用来计算每个广告客户的花费。
“正好一次”的正确性
因为为了性能考虑,我们从 Kafka 拉取数据,不会是拉一条、处理一条,然后更新一次 ZooKeeper 上的偏移量。特别是 ZooKeeper 会受不了这么大的负载,它和 Chubby 一样,是用于实现一个粗粒度的分布式锁,而不是一个高性能的 KV 存储。所以,KafkaSpout 会从 Kafka 拉一小批数据,然后发送出去,等到这一小批数据发送完了,并且下游都处理完了,才会变更一次 ZooKeeper 上的偏移量。
但是,只要其中有一条消息在下游还没有处理完的时候,KafkaSpout 所在的服务器挂掉了,对应的偏移量没有更新。那么在容错机制下,重新启动在另一台服务器上的 KafkaSpout,会重新再发送一遍这一批数据。而这个时候,我们就不只是重新对一条日志重复计费,而是需要对一大批日志重复计费。
要解决这个问题,一个很直观的思路,自然是对重复发送的日志或者消息进行去重。最简单的方式,就是在每一个 Bolt 里,我们维护一个这个 Bolt 已经处理完成的,所有的 message-id 的集合。那么,任何一条新的消息发送过来的时候,我们都去这个集合里看一看,这条消息是否已经处理过了,就能解决这个问题了。
不过,让每个 Bolt 都保留所有处理过的 message-id 的集合,显然会占用太多的内存了。因为在流式系统里,随着时间的推移,系统处理过的日志量在不断地增加,message-id 的集合只会越来越大。所以,在工程实践上,我们可以做两个优化:
- 第一个,是使用 BloomFilter 进行去重,来代替原始的数据集合。我们把所有已经处理过的 message-id 放到一个 BloomFilter 里去,这样可以大大压缩我们需要的内存空间。不过,使用 BloomFilter 会带来的副作用是,我们可能会有很小的概率误算,使得不是重复的消息也会被认为是重复的。
- 第二个,是把数据按照时间窗口,切分成多个 BloomFilter。比如,我们可以设定有 30 个 BloomFilter,每个 BloomFilter 都只存放某一分钟的 message-id。而每过一分钟,我们都把 30 分钟前的那个 BloomFilter 清空。这样,我们可以通过一个固定大小的内存空间,确保只要是 30 分钟内的重复数据,就不会被多次处理。因为一般来说,简单的重发,不太可能超过 30 分钟。我们可以根据系统的实际情况,来设定这个对应的时间窗口。
真正做到“正好一次”的数据处理,是现代流式数据处理的第一个目标。
计算节点迁移的容错问题
BloomFilter 的引入,使得我们用于计算的 Bolt 节点,其实有了“状态”。也就是说,它自身已经不是一个纯粹的函数了。事实上,不仅是为了做到“正好一次”的消息处理需要状态,我们本身的数据处理需求就需要状态。
Bolt 会被拆分和迁移,并且在迁移的过程中,我们需要能够保留状态信息,这意味着我们的状态需要能够持久化下来。我们需要能够把这些状态,也更新到一个稳定的外部存储中去。当我们的节点挂掉,在其他服务器上恢复计算能力的时候,需要把这些状态信息重新读取回来。
通过把各个计算节点的中间状态持久化,使得系统在容错情况下,仍然能够做到“正好一次”的数据处理,并且能够在线上动态扩容、调度计算,是现代流式数据处理的第二个目标。
处理消息的时间窗口
处理逻辑有一个问题,就是我们用消息传输到 AdsCtrBolt 的时间,替代了对应的广告曝光和点击发生的时间。也就是我们用处理时间(Processing Time)替代了事件时间(Event Time)。这样,我们计算出来的点击率,乃至计费信息,会和实际情况有差异。
一个合理的解决方案,就是我们需要使用实际的事件发生的时间(即 Event Time),来进行相应的数据统计。但是这样一来,我们就面临两个新的问题。
第一个问题,是我们不能简单地维护 广告ID=>(展示次数,点击次数,广告花费) 这样一个映射关系了,而是需要一个 时间窗口=>[广告ID1=>(展示次数,点击次数,广告花费),广告ID2=>(展示次数,点击次数,广告花费) , ……] 这样一个三维多层的映射关系了。
第二个问题,是我们很难决策,什么时候应该将我们的统计结果,写入到外部的数据库里。因为在上节课里我们就看到过,上游发送过来的日志,并不是严格按照时间排序的。一个可行的方案,就要考虑很多因素,比如我们要加上几个判断条件和因素。
我们希望能够把和时间窗口相关的,以及触发数据更新到外部数据库相关的处理机制,在流式处理框架中内建。而撰写流式数据处理逻辑的开发人员,不需要关心这些机制和容错问题,这个也就是现代流式数据处理的第三个目标。
Dataflow(二):MillWheel,一个早期实现
MillWheel:S4 和 Storm 的组合模型
计算(Computation)和流(Stream)
首先是 Computation,用来作为有向无环图里面的计算节点。它里面包含了三个部分:
- 它“订阅”了哪些流,也就是消息输入的流向是什么;
- 它会输出哪些流,也就是消息输出的流向是什么;
- 它本身的计算逻辑,也就是进行数据统计,或者是数据过滤的逻辑代码。
键(Key)
然后是 Key,在 MillWheel 的系统里,每一条消息都可以被解析成(Key, Value, TimeStamp)这样一个三元组的组合。而我们在前面也看到了,一个 Computation 可以针对输入的消息流,定义自己的 key_extractor。这一点,比起 Storm 和 S4 其实是有所不同的。
1 | computation SpikeDetector { |
在 Storm 和 S4 里,同样的消息,如果我们要根据不同的字段进行维度划分,分发给不同的 PE 或者 Bolt。那么,在抽象层面,我们其实是发送了两个不同的消息流。而在 MillWheel 里,则是一个相同的消息流,被不同的两个 Computation 订阅了,只是两个 Computation 可以有不同的 key_extractor 而已。这样,我们在系统的逻辑层面就可以复用同一个流,而不需要有两个几乎是完全相同,只是使用的 Key 的字段不同的流了。
从这个角度,MillWheel 的系统逻辑其实更像是 Storm,而 Computation + 一段 Key 的组合,其实就是一个 Bolt,需要处理某一段 Key。
低水位(Low Watermark)和定时器(Timer)
无论是论文里给出的进行异常搜索量的检测,还是上节课我们举过的广告点击率的统计例子,MillWheel 这样的流式系统,都要面对实际的事件发生的时间,和我们接收到数据的时间有差异的问题。MillWheel 需要有一个机制,能够让每个 Computation 进程知道,某个时间点之前的日志应该都已经处理完了。所以,它引入了低水位这个概念,以及 Injector 这个模块。
前面我们讲解 Computation 的时候已经看到了,我们的每一条消息,都会被解析成(Key,Value,TimeStamp)这样一个三元组。这里面的 TimeStamp,其实就是我们需要的事件发生的时间,我们就是根据这个时间戳,来解决这个时间差异的问题的。
MillWheel 引入的低水位是这样一个概念,在某一个 Computation A 里,我们可以拿到所有还没有处理完的消息里面,最早的那个时间戳。这个没有处理完的消息,包括了还在消息管道里面待传输的消息,也包括已经在 Computation 里存储下来的消息,以及处理完了,但是还没有向下游发送的消息。这个最早的时间戳,就是一个“低水位”。这个“低水位”,其实就是告诉了我们,当前这个 Computation 的计算节点里,哪个最早的时间点还有消息没有处理完。
而这个 Computation A,可能还订阅了很多上游的其他 Computation。此时此刻,那些 Computation,也会有一个同样的时间戳。那么,本质上,A 的低水位,就是它和它上游的低水位中,时间戳最早的那一个。
那么,MillWheel 是这么做的:每一个 Computation 进程,都会统计自己处理的所有日志的“低水位”信息,然后上报给一个 Injector 模块,而这个 Injector 模块,会收集所有类型的 Computation 进程的所有低水位信息。接着,它会通过 Injector,把相应的水位信息下发给各个 Computation 进程,由各个 Computation 进程自己,来计算自己最终的低水位是什么。
每一种类型的 Computation,都会有一个自己的水位信息。同一个 Computation 下,不同进程的水位信息也是不同的,因为它自己处理的消息的进度可能不一样。
Strong Production 和状态持久化
无论是在 Timer 还没有触发时,我们统计到的中间阶段的数据结果,还是我们已经确定要向下游发送的计算结果,都需要持久化下来。这个是为了整个 MillWheel 系统的容错能力,以及我们可以“迁移”某段 Computation + Key 到另外一个服务器上。
所以,MillWheel 也封装掉了整个的数据持久化层,你不需要自己有一个外部数据库的连接,而是直接通过 MillWheel 提供的 API,进行数据的读写。
Dataflow(三):一个统一的编程模型
Dataflow 的基础模型
Dataflow 的核心计算模型非常简单,它只有两个概念,一个叫做 ParDo,顾名思义,也就是并行处理的意思。另一个叫做 GroupByKey,也就是按照 Key 进行分组数据处理的问题。
ParDo,地位相当于 MapReduce 里的 Map 阶段。所有的输入数据,都会被一个 DoFn,也就是处理函数处理。但是这些数据,不是在一台服务器上处理的,而是和 MapReduce 一样,会在很多台机器上被并行处理。只不过 MapReduce 里的数据处理,只有一个 Map 阶段和一个 Reduce 阶段。而在 Dataflow 里,Pardo 会和下面的 GroupByKey 组合起来,可以有很多层,就好像是很多个 MapReduce 串在一起一样。
而 GroupByKey,地位则是 MapReduce 里的 Shuffle 操作。在 Dataflow 里,所有的数据都被抽象成了 key-value 对。前面的 ParDo 的输入和 Map 函数一样,是一个 key-value 对,输出也是一系列的 key-value 对。而 GroupByKey,则是把相同的 Key 汇总到一起,然后再通过一个 ParDo 下的 DoFn 进行处理。
比如,我们有一个不断输入的日志流,想要统计所有广告展示次数超过 100 万次的广告。那么,我们可以先通过一个 Pardo 解析日志,然后输出(广告 ID,1)这样的 key-value 对,通过 GroupByKey,把相同的广告 ID 的数据分组到一起。然后再通过一个 ParDo,并行统计每一个广告 ID 下的展示次数。最后再通过一个 ParDo,过滤掉所有展示次数少于 100 万次的广告就好了。
理解流批一体
在 Dataflow 里,我们还有一个很重要的维度没有加入进来,这个维度就是时间。
Dataflow 里的 GroupByKey,会把相同 Key 的数据 Shuffle 到一起供后续处理,但是它并没有定义在什么时间,这些数据会被 Shuffle 到一起。
在 MapReduce 的计算模型下,会有哪些输入数据,是在 MapReduce 的任务开始之前就确定的。这意味着数据从 Map 端被 Shuffle 到 Reduce 端,只依赖于我们的 CPU、网络这些硬件处理能力。而在 Dataflow 里,输入的数据集是无边界的,随着时间的推移,不断会有新的输入数据加入进来。
当我们把“批(Batch)”的记录数限制到了每批一条,那么它就是所谓的流了。进一步地,MapReduce 的“有边界(Bounded)”的数据集,也只是 Dataflow 的“无边界(Unbounded)”的数据集的一种特殊情况。所以,Jay Kreps 才会在 2014 年提出流批一体的 Kappa 架构,而到了 2015 年的 Dataflow,我们就看到了批处理本来就是流处理的一种特殊情况。
时间窗口的分配与合并
我们先来看一看时间窗口的概念,在流式数据处理里,我们需要的往往不是“统计所有的广告展示数量”,而往往是“每 5 分钟统计一次广告展示数量”,或者“统计过去 5 分钟的广告展示数量”。我们常用的时间窗口,也会分成好几种:
- 首先是固定窗口(Fixed Window)。比如,我们统计“每小时的广告展示数量”,那么我们的数据,就会被划分成 0 点到 1 点、1 点到 2 点,这样一个个固定区间的窗口。
- 然后是滑动窗口(Sliding Window),也就是窗口随着时间的变动在“滑动”。比如,我们要统计“过去 2 分钟的广告展示”,那么我们的窗口并不是划分成 12:00
12:02,12:0212:04 这样一段段。而是 12:0012:02,然后一分钟之后变成 12:0112:03,在这个例子里,2 分钟被称之为窗口大小,而窗口每 1 分钟“滑动”一次,这个 1 分钟被称之为滑动周期。 - 最后是会话窗口(Session Window)。这个常常用在统计用户的会话上,对于会话的划分,往往是通过我们设置的两次事件之间的一个“超时时间”来定义的。比如,我们有一个客服聊天系统,如果用户和客服之间超过 30 分钟没有互动,我们就认为上一次会话结束了。在这之后无论是用户主动发言,还是客服主动回复,我们都会认为是进入了一个新的会话。
既然引入了时间窗口这个概念,相信你很容易理解,我们在 Dataflow 模型里,需要的不只是 GroupByKey,实际在统计数据的时候,往往需要的是 GroupByKeyAndWindow。统计一个不考虑任何时间窗口的数据,往往是没有意义的,1 分钟内广告展示了 100 万次,和 1 个月内展示了 100 万次代表着完全不同的广告投放力度。我们需要根据特定的时间窗口,来进行数据统计。
而在实际的逻辑实现层面,Dataflow 最重要的两个函数,也就是 AssignWindows 函数和 MergeWindows 函数。每一个原始的事件,在我们的业务处理函数之前,其实都是(key, value, event_time)这样一个三元组。而 AssignWindows 要做的,就是把这个三元组,根据我们的处理逻辑,变成(key, value, event_time, window)这样的四元组。
而在有了 Window 的信息之后,如果我们想要按照固定窗口或者滑动窗口统计数据,我们可以很容易地根据 Key+Window 进行聚合,完成相应的计算。
窗口的分配和合并功能,就使得 Dataflow 可以处理乱序数据。相同的数据以不同的顺序到达我们的计算节点,计算的结果仍然是相同的。并且在这个过程里,我们可以把上一次计算完的结果作为状态持久化下来,然后每一个新进入的事件,都按照 AssignWindows 和 MergeWindows 的方式不断对数据进行化简。
触发器和增量数据处理
这样一来,有了对应的窗口函数逻辑,如果我们的输入数据是确定的,能够一次性都给出来,我们就很容易统计会话数这样的数据了,即使数据是乱序的也没有关系。但是,在实际情况里,我们的输入数据是以流的形式传输到每个计算节点的。并且,我们会遇到延时、容错等情况,所以我们还需要有一个机制告诉我们,在什么时候数据都已经到了,我们可以把计算结果向下游输出了。
在 MillWheel 的论文里,我们是通过计算一个低水位(Low Watermark)来解决这个问题的。
但是,这个基于水位的方法在实践中,必然会遇到这样两个问题:
- 第一个,在实际的水位标记之后,仍然有新的日志到达。
- 第二个,我们的水位标记,因为需要考虑所有节点。只要有一条日志来晚了,我们的水位就会特别“低”,导致我们迟迟无法输出计算结果。
Dataflow 采用了Lamdba 架构,就是我们可以尽快给出一个计算结果,但是在后续根据获得的新的数据,不断去修正这个计算结果。而这个思路,在 Dataflow 里,就体现为触发器(Trigger)机制。
本文链接: http://woaixiaoyuyu.github.io/2021/12/27/%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%BB%8F%E5%85%B8%E8%AE%BA%E6%96%87%E8%A7%A3%E8%AF%BB%E7%9A%843/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!