切勿浮沙筑高台
大数据技术的核心理念
其实“大数据”技术的核心理念是非常清晰的,基本上可以被三个核心技术理念概括。
- 第一个,是能够伸缩到一千台服务器以上的分布式数据处理集群的技术
- 第二个,是这个上千个节点的集群,是采用廉价的 PC 架构搭建起来的
- 最后一个,则是“把数据中心当作是一台计算机”(Datacenter as a Computer)
大数据技术的来龙与去脉
三驾马车和基础设施
Google 就在 2003、2004 以及 2006 年,分别抛出了三篇重磅论文。也就是我们常说的“大数据”的三驾马车:GFS、MapReduce 和 Bigtable。
到这里,GFS、MapReduce 和 Bigtable 这三驾马车的论文,就完成了“存储”“计算”“实时服务”这三个核心架构的设计。不过你还要知道,这三篇论文其实还依赖了两个基础设施。
第一个是为了保障数据一致性的分布式锁。对于这个问题,Google 在发表 Bigtable 的同一年,就发表了实现了 Paxos 算法的 Chubby 锁服务的论文(我会在基础知识篇“分布式锁 Chubby”这一讲中为你详细解读这篇论文)。
第二个是数据怎么序列化以及分布式系统之间怎么通信。Google 在前面的论文里都没有提到这一点,所以在基础知识篇的“通过 Thrift 序列化:我们要预知未来才能向后兼容吗?”我们会一起来看看 Facebook 在 2007 年发表的 Thrift 的相关论文。
OLAP 和 OLTP 数据库
实际上,如果说 MapReduce 对应的迭代进行,是在不断优化 OLAP 类型的数据处理性能,那么 Bigtable 对应的进化,则是在保障伸缩性的前提下,获得了更多的关系型数据库的能力。
实时数据处理的抽象进化
在 2015 年,Google 发表的 Dataflow 的模型,可以说是对于流式数据处理模型做出了最好的总结和抽象。一直到现在,Dataflow 就成为了真正的“流批一体”的大数据处理架构。而后来开源的 Flink 和 Apache Beam,则是完全按照 Dataflow 的模型实现的了。
这里,我把这些论文的前后之间的脉络联系专门做了一张图,放在了下面。当你对某一篇论文感到困惑的时候,就可以去翻看它前后对应的论文,找到对应问题的来龙去脉。
将所有服务器放在一起的资源调度
那么,为了解决一致性问题,我们就有了基于 Paxos 协议的分布式锁。但是 Paxos 协议的性能很差,于是有了进一步的 Multi-Paxos 协议。
而接下来的问题就是,Paxos 协议并不容易理解,于是就有了 Raft 这个更容易理解的算法的出现。Kubernetes 依赖的 etcd 就是用 Raft 协议实现的,我们在后面的资源调度篇里,会一起来看一下 Raft 协议到底是怎么实现的,以及现代分布式系统依赖的基础设施是什么样子的。
小结
学习方法:建立你的大数据知识网络
大数据领域的知识地图
The Google File System (一): Master的三个身份
整个 GFS 的架构,是通过非常重要的工程原则来设计的,也就是尽量简单、需要考虑实际的硬件情况,以及针对实际的应用场景进行设计。
Master 的第一个身份:一个目录服务
在整个 GFS 中,有两种服务器,一种是 master,也就是整个 GFS 中有且仅有一个的主控节点;第二种是 chunkserver,也就是实际存储数据的节点。
因此,在 GFS 里面,会把每一个文件按照 64MB 一块的大小,切分成一个个 chunk。每个 chunk 都会有一个在 GFS 上的唯一的 handle,这个 handle 其实就是一个编号,能够唯一标识出具体的 chunk。然后每一个 chunk,都会以一个文件的形式,放在 chunkserver 上。
而 chunkserver,其实就是一台普通的 Linux 服务器,上面跑了一个用户态的 GFS 的 chunkserver 程序。这个程序,会负责和 master 以及 GFS 的客户端进行 RPC 通信,完成实际的数据读写操作。
当然,为了确保数据不会因为某一个 chunkserver 坏了就丢失了,每个 chunk 都会存上整整三份副本(replica)。其中一份是主数据(primary),两份是副数据(secondary),当三份数据出现不一致的时候,就以主数据为准。有了三个副本,不仅可以防止因为各种原因丢数据,还可以在有很多并发读取的时候,分摊系统读取的压力。
master 里面会存放三种主要的元数据(metadata):
- 文件和 chunk 的命名空间信息,也就是类似前面 /data/geektime/bigdata/gfs01 这样的路径和文件名;
- 这些文件被拆分成了哪几个 chunk,也就是这个全路径文件名到多个 chunk handle 的映射关系;
- 这些 chunk 实际被存储在了哪些 chunkserver 上,也就是 chunk handle 到 chunkserver 的映射关系。
然后,当我们要通过一个客户端去读取 GFS 里面的数据的时候,需要怎么做呢?GFS 会有以下三个步骤:
- 客户端先去问 master,我们想要读取的数据在哪里。这里,客户端会发出两部分信息,一个是文件名,另一个则是要读取哪一段数据,也就是读取文件的 offset 及 length。因为所有文件都被切成 64MB 大小的一个 chunk 了,所以根据 offset 和 length,我们可以很容易地算出客户端要读取的数据在哪几个 chunk 里面。于是,客户端就会告诉 master,我要哪个文件的第几个 chunk。
- master 拿到了这个请求之后,就会把这个 chunk 对应的所有副本所在的 chunkserver,告诉客户端。
- 等客户端拿到 chunk 所在的 chunkserver 信息后,客户端就可以直接去找其中任意的一个 chunkserver 读取自己所要的数据。
master 的快速恢复性和可用性保障
master 节点的所有数据,都是保存在内存里的。这样,master 的性能才能跟得上几百个客户端的并发访问。
但是数据放在内存里带来的问题,就是一旦 master 挂掉,数据就会都丢了。所以,master 会通过记录操作日志和定期生成对应的 Checkpoints 进行持久化,也就是写到硬盘上。
可要是 master 节点的硬件彻底故障了呢?
这里使用backup master,实现高可用,可以理解为master的副本。
所有针对 master 的数据操作,都需要同样写到另外准备的这几台服务器上。只有当数据在 master 上操作成功,对应的操作记录刷新到硬盘上,并且这几个 Backup Master 的数据也写入成功,并把操作记录刷新到硬盘上,整个操作才会被视为操作成功。这种方式,叫做数据的“同步复制”,是分布式数据系统里的一种典型模式。
Shadow Master
从监控程序发现 master 节点故障、启动备份节点上的 master 进程、读取 Checkpoints 和操作日志,仍然是一个几秒级别乃至分钟级别的过程。在这个时间段里,我们可能仍然有几百个客户端程序“嗷嗷待哺”,希望能够在 GFS 上读写数据。虽然作为单个 master 的设计,这个时候的确是没有办法去写入数据的。Google 的工程师还是想了一个办法,让我们这个时候还能够从 GFS 上读取数据。
这个办法就是加入一系列只读的“影子 Master”,这些影子 Master 和前面的备胎不同,master 写入数据并不需要等到影子 Master 也写入完成才返回成功。而是影子 Master 不断同步 master 输入的写入,尽可能保持追上 master 的最新状态。
这种方式,叫做数据的“异步复制”,是分布式系统里另一种典型模式。异步复制下,影子 Master 并不是和 master 的数据完全同步的,而是可能会有一些小小的延时。
影子 Master 会不断同步 master 里的数据,不过当 master 出现问题的时候,客户端们就可以从这些影子 Master 里找到自己想要的信息。
The Google File System (二): 如何应对网络瓶颈?
我们接着来学习 GFS 论文中第二个重要的设计决策,也就是根据实际的硬件情况来进行系统设计。
在单台服务器下,我们的硬件瓶颈常常是硬盘。而到了一个分布式集群里,我们又有了一个新的瓶颈,那就是网络。
GFS 的数据写入
写入和读取不同的是,读取只需要读一个 chunkserver,最坏的情况无非是读不到重试。而写入,则是同时要写三份副本,如果一个写失败,两个写成功了,数据就已经不一致了。
GFS 写入数据的具体步骤:
- 第一步,客户端会去问 master 要写入的数据,应该在哪些 chunkserver 上。
- 第二步,和读数据一样,master 会告诉客户端所有的次副本(secondary replica)所在的 chunkserver。这还不够,master 还会告诉客户端哪个 replica 是“老大”,也就是主副本(primary replica),数据此时以它为准。
- 第三步,拿到数据应该写到哪些 chunkserver 里之后,客户端会把要写的数据发给所有的 replica。不过此时,chunkserver 拿到发过来的数据后还不会真的写下来,只会把数据放在一个 LRU 的缓冲区里。
- 第四步,等到所有次副本都接收完数据后,客户端就会发送一个写请求给到主副本。我在上节课一开始就说过,GFS 面对的是几百个并发的客户端,所以主副本可能会收到很多个客户端的写入请求。主副本自己会给这些请求排一个顺序,确保所有的数据写入是有一个固定顺序的。接下来,主副本就开始按照这个顺序,把刚才 LRU 的缓冲区里的数据写到实际的 chunk 里去。
- 第五步,主副本会把对应的写请求转发给所有的次副本,所有次副本会和主副本以同样的数据写入顺序,把数据写入到硬盘上。
- 第六步,次副本的数据写入完成之后,会回复主副本,我也把数据和你一样写完了。
- 第七步,主副本再去告诉客户端,这个数据写入成功了。而如果在任何一个副本写入数据的过程中出错了,这个出错都会告诉客户端,也就意味着这次写入其实失败了。
GFS 的数据写入使用了两个很有意思的模式,来解决这节课一开始我提到的网络带宽的瓶颈问题。
分离控制流和数据流
和之前从 GFS 上读数据一样,GFS 客户端只从 master 拿到了 chunk data 在哪个 chunkserver 的元数据,实际的数据读写都不再需要通过 master。另外,不仅具体的数据传输不经过 master,后续的数据在多个 chunkserver 上同时写入的协调工作,也不需要经过 master。
其次,是采用了流水线(pipeline)式的网络传输。数据不一定是先给到主副本,而是看网络上离哪个 chunkserver 近,就给哪个 chunkserver,数据会先在 chunkserver 的缓冲区里存起来,就是前面提到的第 3 步。但是写入操作的指令,也就是上面的第 4~7 步,则都是由客户端发送给主副本,再由主副本统一协调写入顺序、拿到操作结果,再给到客户端的。
流水线式的网络数据传输
之所以要这么做,还是因为 GFS 最大的瓶颈就在网络。如果用一个最直观的想法来进行数据传输,我们可以把所有数据直接都从客户端发给三个 chunkserver。
但是这种方法的问题在于,客户端的出口网络会立刻成为瓶颈。
比如,我们要发送 1GB 的数据给 GFS,客户端的出口网络带宽有 100MB/ 秒,那么我们只需要 10 秒就能把数据发送完。但是因为三个 chunkserver 的数据都要从客户端发出,所以要 30s 才能把所有的数据都发送完,而且这个时候,三个 chunkserver 的网络带宽都没有用满,各自只用了 1/3,网络并没有被有效地利用起来。
而在流水线式的传输方式下,客户端可以先把所有数据,传输给到网络里离自己最近的次副本 A,然后次副本 A 一边接收数据,一边把对应的数据传输给到离自己最近的另一个副本,也就是主副本。
同样的,主副本可以如法炮制,把数据也同时传输给次副本 B。在这样的流水线式的数据传输方式下,只要网络上没有拥堵的情况,只需要 10 秒多一点点,就可以把所有的数据从客户端,传输到三个副本所在的 chunkserver 上。
GFS 最大利用网络带宽,同时又减少网络瓶颈的选择就是这样的:
- 首先,客户端把数据传输给离自己“最近”的,也就是在同一个机架上的次副本 A 服务器;
- 然后,次副本 A 服务器再把数据传输给离自己“最近”的,在不同机架,但是处于同一个汇聚层交换机下的主副本服务器上;
- 最后,主副本服务器,再把数据传输给在另一个汇聚层交换机下的次副本 B 服务器。
独特的 Snapshot 操作
复制文件,相信这个是你用自己的电脑的时候,会常常做的事儿。在 GFS 上,如果我们用笨一点的办法,自然是通过客户端把文件从 chunkserver 读回来,再通过客户端把数据写回去。这样的话,读数据也经过一次网络传输,写回三个副本服务器,即使是流水线式的传输,也要三次传输,一共需要把数据在网络上搬运四次。
GFS 就专门为文件复制设计了一个 Snapshot 指令,当客户端通过这个指令进行文件复制的时候,这个指令会通过控制流,下达到主副本服务器,主副本服务器再把这个指令下达到次副本服务器。不过接下来,客户端并不需要去读取或者写入数据,而是各个 chunkserver 会直接在本地把对应的 chunk 复制一份。
The Google File System (三): 多写几次也没关系
GFS 的最后一个设计特点,是“放宽数据一致性的要求”。
GFS 希望在机械硬盘上尽量有比较高的写入性能,所以它只对顺序写入考虑了一致性,这就自然带来了宽松的一致性。
我们先来看看,一致性到底指的是什么东西。在 GFS 里面,主要定义了对一致性的两个层级的概念:
- 第一个,就叫做“一致的(Consistent)”。这个就是指,多个客户端无论是从主副本读取数据,还是从次副本读取数据,读到的数据都是一样的。
- 第二个,叫做“确定的(Defined)”。这个要求会高一些,指的是对于客户端写入到 GFS 的数据,能够完整地被读到。可能看到这个定义,你还是不清楚,没关系,我下面会给你详细讲解“确定的”到底是个什么问题。
追加写入的“至少一次”的保障
实际上,这是因为随机写入并不是 GFS 设计的主要的数据写入模式,GFS 设计了一个专门的操作,叫做记录追加(Record Appends)。这是 GFS 希望我们主要使用的数据写入的方式,而且它是原子性(Atomic)的,能够做到在并发写入时候是基本确定的。
GFS 的记录追加的写入过程,和上一讲的数据写入几乎一样。它们之间的差别主要在于,GFS 并不会指定在 chunk 的哪个位置上写入数据,而是告诉最后一个 chunk 所在的主副本服务器,“我”要进行记录追加。
这个时候,主副本所在的 chunkserver 会做这样几件事情:
- 检查当前的 chunk 是不是可以写得下现在要追加的记录。如果写得下,那么就把当前的追加记录写进去,同时,这个数据写入也会发送给其他次副本,在次副本上也写一遍。
- 如果当前 chunk 已经放不下了,那么它先会把当前 chunk 填满空数据,并且让次副本也一样填满空数据。然后,主副本会告诉客户端,让它在下一个 chunk 上重新试验。这时候,客户端就会去一个新的 chunk 所在的 chunkserver 进行记录追加。
- 因为主副本所在的 chunkserver 控制了数据写入的操作顺序,并且数据只会往后追加,所以即使在有并发写入的情况下,请求也都会到主副本所在的同一个 chunkserver 上排队,也就不会有数据写入到同一块区域,覆盖掉已经被追加写入的数据的情况了。
- 而为了保障 chunk 里能存的下需要追加的数据,GFS 限制了一次记录追加的数据量是 16MB,而 chunkserver 里的一个 chunk 的大小是 64MB。所以,在记录追加需要在 chunk 里填空数据的时候,最多也就是填入 16MB,也就是 chunkserver 的存储空间最多会浪费 1/4。
我们可以一起来看这样一个例子:有三个客户端 X、Y、Z 并发向同一个文件进行记录追加,写入数据 A、B、C,对应的三个副本的 chunkserver 分别是 Q、P、R。
主副本先收到数据 A 的记录追加,在主副本和次副本上进行数据写入。在 A 写入的同时,B,C 的记录追加请求也来了,这个时候写入会并行进行,追加在 A 的后面。
这个时候,A 的写入在某个次副本 R 上失败了,于是主副本告诉客户端去重试;同时,客户端再次发起记录追加的重试,这次的数据写入,不在 A 原来的位置,而会是在 C 后面。
如此一来,在 B 和 C 的写入,以及 A 的重试完成之后,我们可以看到:
- 在 Q 和 P 上,chunkserver 里面的数据顺序是 A-B-C-A;
- 但是在 R 上,chunkserver 里面的数据顺序是 N/A-B-C-A;
- 也就是 Q 和 P 上,A 的数据被写入了两次,而在 R 上,数据里面有一段是有不可用的脏数据。
MapReduce(一):源起Unix的设计思想
这篇论文,基本上可以看作是三个部分:
- MapReduce 的计算模型和应用场景;
- MapReduce 实际是如何实现的,使得开发者无需关心分布式的存在;
- 如何逐步迭代优化 MapReduce 的性能。
而需要的计算方式,抽象来说,无非是三种情况。
- 第一种,是对所有的数据,我们都只需要单条数据就能完成处理。比如,我们有很多网页的内容,我们要从里面提取出来每一个网页的标题。这样的计算可以完全并行化。
- 第二种,是需要汇总多条数据才能完成计算。比如,要统计日志里面某个 URL 被访问了多少次,只需要简单累加就可以了。或者我们需要更复杂一些的操作,比如统计某个 URL 下面的唯一用户数。而对于这里的第二种情况,我们就需要将所有相同 URL 的数据,搬运到同一个计算节点上进行处理。不过,在搬运之后,不同的 URL 还是可以放到不同的节点进行处理的。
- 第三种,自然是一、二两种情况的组合了。比如,我们先从网页数据里面,提取出网页的 URL 和标题,然后根据标题里面的关键字,统计特定关键字出现在多少个不同的 URL 里面,这就需要同时采用一二这两种情况的操作。
当然,我们可以有更复杂的数据操作,但是这些动作也都可以抽象成前面的两个动作的组合。因为无非,我们要处理的数据要么是完全独立的,要么需要多条数据之间的依赖。实际上,前面的第一种动作,就是 MapReduce 里面的 Map;第二种动作,就是 MapReduce 里面的 Reduce;而搬运的过程,就是 Shuffle(混洗,这个概念稍后我会给你介绍)。
MapReduce 的应用场景
实际上它能够实现的应用场景,论文里可列了不少,包括以下六个:
- 分布式 grep;
- 统计 URL 的访问频次;
- 反转网页 - 链接图;
- 分域名的词向量;
- 生成倒排索引;
- 分布式排序。
分布式 grep
实际上,“分布式 grep”就是一个分布式抽取数据的抽象,无论是像 grep 一样通过一个正则表达式来提取内容,还是用复杂的算法和策略从输入中提取内容,都可以看成是一种“分布式 grep”。而在 MapReduce 这个框架下,你只需要撰写一个 Map 函数,并不需要关心数据存储在具体哪台机器上,也不需要关心哪台机器的硬件或者网络出了问题。
统计 URL 的访问频次
还是挺容易的,直接根据url生成(key,value)对,然后聚合即可
MapReduce(二):不怕失败的计算框架
MapReduce 框架的三个挑战
要想让写 Map 和 Reduce 函数的人不需要关心“分布式”的存在,那么 MapReduce 框架本身就需要解决好三个很重要的问题:
- 第一个,自然是如何做好各个服务器节点之间的“协同”,以及解决出现各种软硬件问题后的“容错”这两部分的设计。
- 第二个,是上一讲我们没怎么关心的性能问题。和我们在 GFS 论文里面讲过的一样,MapReduce 框架一样非常容易遇到网络性能瓶颈。尽量充分利用 MapReduce 集群的计算能力,并让整个集群的性能可以随硬件的增加接近于线性增长,可以说是非常大的一个挑战。
- 后一个,还是要回到易用性。Map 函数和 Reduce 函数最终还是运行在多个不同的机器上的,并且在 Map 和 Reduce 函数中还会遇到各种千奇百怪的数据。当我们的程序在遭遇到奇怪的数据出错的时候,我们需要有办法来进行 debug。
MapReduce 的协同
- 第一步,你写好的 MapReduce 程序,已经指定了输入路径。所以 MapReduce 会先找到 GFS 上的对应路径,然后把对应路径下的所有数据进行分片(Split)。每个分片的大小通常是 64MB,这个尺寸也是 GFS 里面一个块(Block)的大小。接着,MapReduce 会在整个集群上,启动很多个 MapReduce 程序的复刻(fork)进程。
- 第二步,在这些进程中,有一个和其他不同的特殊进程,就是一个 master 进程,剩下的都是 worker 进程。然后,我们会有 M 个 map 的任务(Task)以及 R 个 reduce 的任务,分配给这些 worker 进程去进行处理。这里的 master 进程,是负责找到空闲的(idle)worker 进程,然后再把 map 任务或者 reduce 任务,分配给 worker 进程去处理。
这里你需要注意一点,并不是每一个 map 和 reduce 任务,都会单独建立一个新的 worker 进程来执行。而是 master 进程会把 map 和 reduce 任务分配给有限的 worker,因为一个 worker 通常可以顺序地执行多个 map 和 reduce 的任务。
- 第三步,被分配到 map 任务的 worker 会读取某一个分片,分片里的数据就像上一讲所说的,变成一个个 key-value 对喂给了 map 任务,然后等 Map 函数计算完后,会生成的新的 key-value 对缓冲在内存里。
- 第四步,这些缓冲了的 key-value 对,会定期地写到 map 任务所在机器的本地硬盘上。并且按照一个分区函数(partitioning function),把输出的数据分成 R 个不同的区域。而这些本地文件的位置,会被 worker 传回给到 master 节点,再由 master 节点将这些地址转发给 reduce 任务所在的 worker 那里。
- 第五步,运行 reduce 任务的 worker,在收到 master 的通知之后,会通过 RPC(远程过程调用)来从 map 任务所在机器的本地磁盘上,抓取数据。当 reduce 任务的 worker 获取到所有的中间文件之后,它就会将中间文件根据 Key 进行排序。这样,所有相同 Key 的 Value 的数据会被放到一起,也就是完成了我们上一讲所说的混洗(Shuffle)的过程。
- 第六步,reduce 会对排序后的数据执行实际的 Reduce 函数,并把 reduce 的结果输出到当前这个 reduce 分片的最终输出文件里。
- 第七步,当所有的 map 任务和 reduce 任务执行完成之后,master 会唤醒启动 MapReduce 任务的用户程序,然后回到用户程序里,往下执行 MapReduce 任务提交之后的代码逻辑。
MapReduce 的容错(Fault Tolerance)
MapReduce 的容错机制非常简单,可以简单地用两个关键词来描述,就是重新运行和写 Checkpoints。
MapReduce 的性能优化
把程序搬到数据那儿去
既然网络带宽是瓶颈,那么优化的办法自然就是尽可能减少需要通过网络传输的数据。
这就好像你想要研究金字塔,最好的办法不是把金字塔搬到你家来,而是你买张机票飞过去。这里的金字塔就是要处理的数据,而你,就是那个分配过去的 MapReduce 程序。
通过 Combiner 减少网络数据传输
提前在map端提前进行一些聚合,通过 Combiner 合并本地 Map 输出的数据,来尽可能减少数据在网络上的传输。
Bigtable(一):错失百亿的Friendster
在没有HBase这样强力的BigTable出现之前,一家互联网公司面对“伸缩性”这个问题,最好的选择是使用一个 MySQL 集群。
分库分表的扩容方式
常见的mysql集群扩展方式就是分库分表,但是首先代码逻辑上需要有对应的修改,同时移植数据的瓶颈容易发生,非常不灵活
同时随着业务需求的不断增加,需要我们有极其超前的数据划分能力,但这是不科学的。
Bigtable 的设计目标
看到这里,相信你对 Bigtable 的设计目标应该更清楚了。最基础的目标自然是应对业务需求的,能够支撑百万级别随机读写 IOPS,并且伸缩到上千台服务器的一个数据库。但是光能撑起 IOPS 还不够。在这个数据量下,整个系统的“可伸缩性”和“可运维性”就变得非常重要。
这里的伸缩性,包括两点:
- 第一个,是可以随时加减服务器,并且对添加减少服务器数量的限制要小,能够做到忙的时候加几台服务器,过几个小时峰值过去了,就可以把服务器降下来。
- 第二个,是数据的分片会自动根据负载调整。某一个分片写入的数据多了,能够自动拆成多个分片来平衡负载。而如果负载大了,添加了服务器之后,也能很快平衡数据,让各个节点均匀承担压力。
而可运维性,则除了上面的两点之外,小部分节点的故障,不应该影响整个集群的运行,我们的运维人员也不用急匆匆地立刻去恢复。集群自身也要有很强的容错能力,能够把对应的请求和服务,调度到其他节点去。
那么,当我们回头看这个设计目标之后,会发现 Bigtable 的设计思路和 GFS 以及 MapReduce 一脉相承。这三个系统的核心设计思路,就是把一个集群当成一台计算机。
当然,除了这些目标之外,Bigtable 也放弃了很多目标,其中有两个非常重要:
- 第一个是放弃了关系模型,也不支持 SQL 语言;
- 第二个,则是放弃了跨行事务,Bigtable 只支持单行的事务模型。
Bigtable(二):不认识“主人”的分布式架构
数据分区,可伸缩的第一步
把一个数据表,根据主键的不同,拆分到多个不同的服务器上,在分布式数据库里被称之为数据分区( Paritioning)。分区之后的每一片数据,在不同的分布式系统里有不同的名字,在 MySQL 里呢,我们一般叫做 Shard,Bigtable 里则叫做 Tablet。
所以,在 Bigtable 里,我们就采用了另外一种分区方式,也就是动态区间分区。我们不再是一开始就定义好需要多少个机器,应该怎么分区,而是采用了一种自动去“分裂”(split)的方式来动态地进行分区。
通过 Master + Chubby 进行分区管理
我们还需要有一套存储、管理分区信息的机制,这在哈希分片的 MySQL 集群里是没有的。在 Bigtable 里,我们是通过 Master 和 Chubby 这两个组件来完成这个任务的。这两个组件,加上每个分片提供服务的 Tablet Server,以及实际存储数据的 GFS,共同组成了整个 Bigtable 集群。
Master、Chubby 和 Tablet Server 的用途
Tablet Server 的角色最明确,就是用来实际提供数据读写服务的。一个 Tablet Server 上会分配到 10 到 1000 个 Tablets,Tablet Server 就去负责这些 Tablets 的读写请求,并且在单个 Tablet 太大的时候,对它们进行分裂。
而哪些 Tablets 分配给哪个 Tablet Server,自然是由 Master 负责的,而且 Master 可以根据每个 Tablet Server 的负载进行动态的调度,也就是 Master 还能起到负载均衡(load balance)的作用。而这一点,也是 MySQL 集群很难做到的。
这是因为,Bigtable 的 Tablet Server 只负责在线服务,不负责数据存储。实际的存储,是通过一种叫做 SSTable 的数据格式写入到 GFS 上的。也就是 Bigtable 里,数据存储和在线服务的职责是完全分离的。我们调度 Tablet 的时候,只是调度在线服务的负载,并不需要把数据也一并搬运走。
事实上,Master 一共会负责 5 项工作:
- 分配 Tablets 给 Tablet Server;
- 检测 Tablet Server 的新增和过期;
- 平衡 Tablet Server 的负载;
- 对于 GFS 上的数据进行垃圾回收(GC);
- 管理表(Table)和列族的 Schema 变更,比如表和列族的创建与删除。
Bigtable 需要 Chubby 来搞定这么几件事儿:
- 确保我们只有一个 Master;
- 存储 Bigtable 数据的引导位置(Bootstrap Location);
- 发现 Tablet Servers 以及在它们终止之后完成清理工作;
- 存储 Bigtable 的 Schema 信息;
- 存储 ACL,也就是 Bigtable 的访问权限。
如果没有 Chubby 的话,我能想到最直接的集群管理方案,就是让所有的 Tablet Server 直接和 Master 通信,把分区信息以及 Tablets 分配到哪些 Tablet Server,也直接放在 Master 的内存里面。这个办法,就和我们之前在 GFS 里的办法一样。但是这个方案,也就使得 Master 变成了一个单点故障点(SPOF-Single Point of Failure)。当然,我们可以通过 Backup Master 以及 Shadow Master 等方式,来尽可能提升可用性。
可是这样第一个问题就来了,我们在 GFS 的论文里面说过,我们可以通过一个外部服务去监控 Master 的存活,等它挂了之后,自动切换到 Backup Master。但是,我们怎么知道 Master 是真的挂了,还是只是“外部服务”和 Master 之间的网络出现故障了呢?
那么 Chubby,就是这里的这个外部服务,不过 Chubby 不是 1 台服务器,而是 5 台服务器组成的一个集群,它会通过 Paxos 这样的共识算法,来确保不会出现误判。而且因为它有 5 台服务器,所以也一并解决了高可用的问题,就算挂个 1~2 台,也并不会丢数据。
为什么数据读写不需要 Master?
Chubby 帮我们保障了只有一个 Master,那么我们再来看看分区和 Tablets 的分配信息,这些信息也没有放在 Master。Bigtable 在这里用了一个很巧妙的方法,就是直接把这个信息,存成了 Bigtable 的一张 METADATA 表,而这张表在哪里呢,它是直接存放在 Bigtable 集群里面的,其实 METADATA 表自己就是一张 Bigtable 的数据表。
- Bigtable 在 Chubby 里的一个指定的文件里,存放了一个叫做 Root Tablet 的分区所在的位置。
- 然后,这个 Root Tablet 的分区,是 METADATA 表的第一个分区,这个分区永远不会分裂。它里面存的,是 METADATA 里其他 Tablets 所在的位置。
- 而 METADATA 剩下的这些 Tablets,每一个 Tablet 中,都存放了用户创建的那些数据表,所包含的 Tablets 所在的位置,也就是所谓的 User Tablets 的位置。
这张图片还是比较清晰的展示了三层逻辑关系的结构,熟悉操作系统的同学看着应该恨亲切。
那么,我们的客户端具体是怎么查询的呢?
- 客户端先去发起请求,查询 Chubby,看我们的 Root Tablet 在哪里。
- Chubby 会告诉客户端,Root Tablet 在 5 号 Tablet Server,这里我们简写成 TS5。
- 客户端呢,会再向 TS5 发起请求,说我要查 Root Tablet,告诉我哪一个 METADATA Tablet 里,存放了 ECOMMERCE_ORDERS 业务表,行键为 A20210101RST 的记录的位置。
- TS5 会从 Root Tablet 里面查询,然后告诉客户端,说这个记录的位置啊,你可以从 TS8 上面的 METADATA 的 tablet 107,找到这个信息。
- 然后,客户端再发起请求到 TS8,说我要在 tablet 107 里面,找 ECOMMERCE_ORDERS 表,行键为 A20210101RST 具体在哪里。
- TS8 告诉客户端,这个数据在 TS20 的 tablet 253 里面。
- 客户端发起最后一次请求,去问 TS20 的 tablet 253,问 ECOMMERCE_ORDERS 表,行键为 A20210101RST 的具体数据。
- TS20 最终会把数据返回给客户端。
可以看到,在这个过程里,我们用了三次网络查询,找到了想要查询的数据的具体位置,然后再发起一次请求拿到最终的实际数据。一般我们会把前三次查询位置结果缓存起来,以减少往返的网络查询次数。而对于整个 METADATA 表来说,我们都会把它们保留在内存里,这样每个 Bigtable 请求都要读写的数据,就不需要通过访问 GFS 来读取到了。
Master 的调度者角色
的确,在单纯的数据读写的过程中不需要 Master。Master 只负责 Tablets 的调度而已,而且这个调度功能,也对 Chubby 有所依赖。我们来看一看这个过程是怎么样的:
- 所有的 Tablet Server,一旦上线,就会在 Chubby 下的一个指定目录,获得一个和自己名字相同的独占锁(exclusive lock)。你可以看作是,Tablet Server 把自己注册到集群上了。
- Master 会一直监听这个目录,当发现一个 Tablet Server 注册了,它就知道有一个新的 Tablet Server 可以用了,也就是可以分配 Tablets。
- 分配 Tablets 的情况很多,可能是因为其他的 Tablet Server 挂了,导致部分 Tablets 没有分配出去,或者因为别的 Tablet Server 的负载太大,这些情况都可以让 Master 去重新分配 Tablet。具体的分配策略论文里并没有说,你可以根据自己的需要实现对应的分配策略。
- Tablet Server 本身,是根据是否还独占着 Chubby 上对应的锁,以及锁文件是否还在,来确定自己是否还为自己分配到的 Tablets 服务。比如 Tablet Server 到 Chubby 的网络中断了,那么 Tablet Server 就会失去这个独占锁,也就不再为原先分配到的 Tablets 提供服务了。
- 而如果我们把 Tablet Server 从集群中挪走,那么 Tablet Server 会主动释放锁,当然它也不再服务那些 Tablets 了,这些 Tablets 都需要重新分配。
- 无论是前面的第 4、5 点这样异常或者正常的情况,都是由 Master 来检测 Tablet Server 是不是正常工作的。检测的方法也不复杂,其实就是通过心跳。Master 会定期问 Tablets,你是不是还占着独占锁呀?无论是 Tablet Server 说它不再占有锁了,还是 Master 连不上 Tablet Server 了,Master 都会做一个小小的测试,就是自己去获取这个锁。如果 Master 能够拿到这个锁,说明 Chubby 还活得好好的,那么一定是 Tablet Server 那边出了问题,Master 就会删除这个锁,确保 Tablet Server 不会再为 Tablets 提供服务。而原先 Tablet Server 上的 Tablets 就会变回一个未分配的状态,需要回到上面的第 3 点重新分配。
- 而 Master 自己,一旦和 Chubby 之间的网络连接出现问题,也就是它和 Chubby 之间的会话过期了,它就会选择“自杀”,这个是为了避免出现两个 Master 而不自知的情况。反正,Master 的存活与否,不影响已经存在的 Tablets 分配关系,也不会影响到整个 Bigtable 数据读写的过程。
小结
整个 Bigtable 是由 4 个组件组成的,分别是:
- 负责存储数据的 GFS;
- 负责作为分布式锁和目录服务的 Chubby;
- 负责实际提供在线服务的 Tablet Server;
- 负责调度 Tablet 和调整负载的 Master。
Bigtable(三):SSTable存储引擎详解
整个存储引擎的实现方式,我们会发现,我们看到的 Bigtable 的数据模型,其实是一系列的内存 + 数据文件 + 日志文件组合下封装出来的一个逻辑视图。
数据库的存储引擎并不是用了什么高深的算法、特别的硬件,而是在充分考虑了硬件特性、算法和数据结构,乃至数据访问的局部性,综合到一起设计出来的一个系统。每一个环节都是教科书上可以找到的基础知识,但是组合在一起就实现了一个分布式数据库。而这个数据库暴露给用户的,也是一个非常简单的、类似于 Map 的根据键 - 值读写的接口。
如何提供高性能的随机数据写入?
Bigtable 为了做到高性能的随机读写,采用了下面这一套组合拳,来解决这个问题:
- 首先是将硬盘随机写,转化成了顺序写,也就是把 Bigtable 里面的提交日志(Commit Log)以及将内存表(MemTable)输出到磁盘的 Minor Compaction 机制。
- 其次是利用“局部性原理”,最近写入的数据,会保留在内存表里。最近被读取到的数据,会存放到缓存(Cache)里,而不存在的行键,也会以一个在内存里的布隆过滤器(BloomFilter)进行快速过滤,尽一切可能减少真正需要随机访问硬盘的次数。
Bigtable 实际写入数据的过程是这样的:
- 当一个写请求过来的时候,Tablet Server 先会做基础的数据验证,包括数据格式是否合法,以及发起请求的客户端是否有权限进行对应的操作。这个权限设置,是 Tablet Server 从 Chubby 中获取到,并且缓存在本地的。
- 如果写入的请求是合法的,对应的数据写入请求会以追加写的形式,写入到 GFS 上的提交日志文件中,这个写入对于 GFS 上的硬盘来说是一个顺序写。这个时候,我们就认为整个数据写入就已经成功了。
- 在提交日志写入成功之后,Tablet Server 会再把数据写入到一张内存表中,也就是我们常说的 MemTable。
- 而当我们写入的数据越来越多,要超出我们设置的阈值的时候,Tablet Server 会把当前内存里的整个 MemTable 冻结,然后创建一个新的 MemTable。被冻结的这个 MemTable,一般被叫做 Immutable MemTable,它会被转化成一个叫做 SSTable 的文件,写入到 GFS 上,然后再从内存里面释放掉。这个写入过程,是完整写一个新文件,所以自然也是顺序写。
实际上,这是因为我们并不会在写入的时候,去修改之前写入的数据。我们在插入数据和更新数据的时候,其实只是在追加一个新版本的数据。我们在删除数据的时候,也只是写入一个墓碑标记,本质上也是写入一个特殊的新版本数据。
数据的“修改”和“删除”
而对于数据的“修改”和“删除”,其实是在两个地方发生的。
第一个地方,是一个叫做 Major Compaction 的机制。按照前面的数据写入机制,随着数据的写入,我们会有越来越多的 SSTable 文件。这样我们就需要通过一个后台进程,来不断地对这些 SSTable 文件进行合并,以缩小占用的 GFS 硬盘空间。而 Major Compaction 这个名字的由来,就是因为这个动作是把数据“压实”在一起。
第二个地方,是在我们读取数据的时候。在读取数据的时候,我们其实是读取 MemTable 加上多个 SSTable 文件合并在一起的一个视图。也就是说,我们从 MemTable 和所有的 SSTable 中,拿到了对应的行键的数据之后,会在内存中合并数据,并根据时间戳或者墓碑标记,来对数据进行“修改”和“删除”,并将数据返回给到客户端。
这也是为什么在 Bigtable 的数据模型里面,很自然地对于一个列下的值,根据时间戳可以有多个版本。
如何提供高性能的随机数据读取?
MemTable 的数据结构通常是通过一个 AVL 红黑树,或者是一个跳表(Skip List)来实现的。而 BigTable 的 Memtable 和 SSTable 的源码,一般被认为就是由 Google 开源的 LevelDB 来实现的。在实际的 LevelDB 源码中,MemTable 是选择使用跳表来作为自己的数据结构。之所以采用这个数据结构,原因也很简单,主要是因为 MemTable 只有三种操作:
- 第一种是根据行键的随机数据插入,这个在数据写入的时候需要用到;
- 第二种是根据行键的随机数据读取,这个在数据读取的时候需要用到;
- 最后一种是根据行键有序遍历,这个在我们把 MemTable 转化成 SSTable 的时候会被用到。
当 MemTable 的大小超出阈值之后,我们会遍历 MemTable,把它变成一个叫做 SSTable 的文件。SSTable 的文件格式其实很简单,本质上就是由两部分组成:
- 第一部分,就是实际要存储的行键、列、值以及时间戳,这些数据会按照行键排序分成一个个固定大小的块(block)来进行存储。这部分数据,在 SSTable 中一般被称之为数据块(data block)。
- 第二部分,则是一系列的元数据和索引信息,这其中包括用来快速过滤当前 SSTable 中不存在的行键盘的布隆过滤器,以及整个数据块的一些统计指标,这些数据我们称之为元数据块(meta block)。另外还有针对数据块和元数据块的索引(index),这些索引内容,则分别是元数据索引块(metaindex block)和数据索引块(index block)。
因为 SSTable 里面的数据块是顺序存储的,所以要做 Major Compaction 的算法也很简单,就是做一个有序链表的多路归并就好了。
那么在这个过程中,Bigtable 又利用了压缩和缓存机制做了更多的优化,下面我就来给你介绍下这些优化步骤。
首先,是通过压缩算法对每个块进行压缩。这个本质上是以时间换空间,通过消耗 CPU 的计算资源,来减少存储需要的空间,以及后续的缓存需要的空间。
其次,是把每个 SSTable 的布隆过滤器直接缓存在 Tablet Server 里。布隆过滤器本质是一个二进制向量,它可以通过一小块内存空间和几个哈希函数,快速检测一个元素是否在一个特定的集合里。在 SSTable 的这个场景下,就是可以帮助我们快速判断,用户想要随机读的行键是否在这个 SSTable 文件里。
最后,Bigtable 还提供了两级的缓存机制。
- 高层的缓存,是对查询结果进行缓存,我们称之为 Scan Cache。比如前面的示例代码中,我们要查询 com.cnn.www 这个行键的数据,那么第一次查询到了这个数据之后,我们会把对应的数据,放在 Tablet Server 的一个缓存空间里。这样,下一次我们查询同样的数据,就不需要再访问 GFS 上的硬盘了。
- 低层的缓存,是对查询所获取到的整个数据块进行缓存,我们称之为 Block Cache。还以 com.cnn.www 这个行键为例,我们会把它所在的整个块数据都缓存在 Tablet Server 里。因为一个块里存储的数据都是排好序的,所以当下一次用户想要查询 com.cnn.www1 这样的行键的时候,就可以直接从 Block Cache 中获取到,而不需要再次访问 GFS 上的 SSTable 文件。
需要注意的是,这两层缓存都是针对单个 SSTable 上的,而不是在单个 Tablet 上。
通过Thrift序列化:我们要预知未来才能向后兼容吗?
csv
1 | user_id,search_term,rank,landing_url,click_timestmap |
json
1 | [ |
常用的 CSV 和 JSON 格式
本身这两种数据格式侧重点事便于程序员理解,有一个清晰的标准,能够提高开发效率,在普通的业务上是快捷。但是在大数据的情况下,会有一些缺陷:
- csv文件不会保存数据本身的类型,只保留了column的名字,所以需要程序解析数据格式,但是存在一定后置性问题,比如第一行数据有一个数据单元存放的内容是10023,程序不能直接判断这一列数据都是整型,因为之后可能会出现数据单元是10023d_d,那就是字符串类型了,所以存在一定后置性,效率上会带来问题,理轮上需要把整个文件的数据全部扫描之后才能确定数据类型。
- json确实可以用 JSON Schema,定义好字段类型。但是对于第二个问题,使用 JSON 的话,问题就更糟糕了。因为对于每一条数据,我们不仅要存储数据,还要再存储一份字段名,占用的空间就更大了。
thrift
想要减少存储所占的空间,那最直接的想法,自然是我们自定义一个序列化方法,按照各个字段实际的格式把数据写进去。典型的办法就是 Java 的序列化,我们按照 String—>String—>Int—>String—>Int 的顺序,把数据写入到一个字节数组里面,等需要读数据的时候,我们就按照这个顺序读出来就好了。
spark也是用序列化的方法压缩了存储,只不过是在数据中埋了length和offset来定位。thrift这个相当于是提前准备好了schema,只要更新的数据可以兼容就行。
这个思路其实不难,就是提前准备好schema,然后解析数据的时候对照着schema解析。
但有两个难点需要解决,也就是向前兼容,和向后兼容。这个能够同时向前向后兼容的能力,就是我们对于 Thrift 的 TBinaryProtocol 协议的序列化提出的要求了。
向前兼容
首先,是我们历史上已经生成了很多版本的数据了,如果在格式切换之后,要去统计一段时间的数据,我们的新程序就需要同时能够解析 旧版本和新版本的数据。这个,也就是我们的程序需要有向前兼容能力。
向后兼容
其次,是除了要满足新需求之外,我们可能有各种各样的数据分析程序,仍然依赖原有的数据格式。如果我们要替换数据格式,意味着对所有这些程序都需要在当天切换的时候做完改造。这样既不现实,也不经济。最好,我们的老程序仍然可以读新的数据格式,也就是有向后兼容的能力。
Thrift IDL 文件
这个就是通俗的thrift的schema,实际工作中接触过,但没有想过怎么实现。
TBinaryProtocol是序列化使用的协议。
1 | struct SearchClick |
而 Thrift 里的 TBinaryProtocol 的实现方式也很简单,那就是顺序写入数据的过程中,不仅会写入数据的值(field-value),还会写入数据的编号(field-id)和类型(field-type);读取的时候也一样。并且,在每一条记录的结束都会写下一个标志位。
思考
这个是thrift执行时会存放的数据,我们思考一下,这样设计之后如何解决json和csv之前的涉及到的缺陷。同时如何解决向前兼容和向后兼容
- 针对csv的数据类型问题,存储了field-type已经解决了。
- 针对json存储数据过多,感觉还是要存放类型,不过不用存放数据字段的名字了,因为有编号。(其实使用了TCompactProtocol 就是一种“紧凑”的编码方式)
- 针对向前兼容,存储了没哥字段的编号,可以有一个空间存放需要解析的编号信息,这样之前的程序解析时,遇到没有存储的编号,直接跳过即可。
- 针对向后兼容,老程序要识别新版本的数据,可以直接使用新版本数据存储的编号,进行解析。
thrift实际解决这些问题方式
在读取数据的时候,老版本的 v1 代码,看到自己没有见过的编号就可以跳过。新版本的 v2 代码,对于老数据里没有的字段,也就是读不到值而已,并不会出现不兼容的情况。(想的差不多)
而且,写下编号还带来了一个好处,就是我们不再需要确保每个字段都填上值了,这个帮我们解决了很多问题。(bigtable 稀疏存储)
针对json的存储缺陷,使用了TCompactProtocol 就是一种“紧凑”的编码方式。
进一步优化的 TCompactProtocol
Delta Encoding
顾名思义,TCompactProtocol 就是一种“紧凑”的编码方式。Thrift 的 IDL 都是从 1 开始编号的,而且通常两个字段的编号是连续的。所以这个协议在存储编号的时候,存储的不是编号的值,而是存储编号和上一个编号的差。
比如,第一个编号是 1,第二个编号是 5,编号 2、3、4 没有值或者已经被我们废弃掉了,那么,第二个编号存储的直接就是 4。这种方式叫做 Delta Encoding,在倒排索引中也经常会用到,用来节约存储空间。我们用 4 个 bit 来存储这个差值。
ZigZag 编码 +VQL 可变长数值表示
这个主要是学一个编码的思路。
这个编码方式的每一个字节的高位,都会用第一个 bit 用来标记,也就是第一个 bit 用来标记整个整数是否还需要读入下一个字节,而后面的 7 个 bit 用来表示实际存放的数据。
用标志位来表示是否要读取下一个字节,来实现可扩展性。
跨语言、跨协议和可扩展性
Thrift 本身并不绑定任何编程语言,这也是论文标题中“Cross-Language Service Implementation”的含义,也就是跨语言的。
跨语言 + 序列化 +RPC,使得 Thrift 解决了一个在“大数据领域”中很重要的问题,就是习惯于使用不同编程语言团队之间的协作问题。
分布式锁Chubby(一) :交易之前先签合同
大数据需要考虑高可用性,也就是容灾能力,针对之前说的master,需要解决两种情况。
- 第一个,是我们怎么能够做到 Backup Master 和 Master 完全同步?特别是当硬件、网络可能出现故障的情况下,我们怎么能够做到两边的数据始终同步。如果数据不能做到始终同步,那么当真有需要我们切换节点到 Backup Master 的时候,我们就会遇到数据丢失的情况。
- 第二个,是监控程序本身也是一个单点,当我们的监控程序说 Master 挂了的时候,我们怎么知道 Master 是真的挂了,还只是监控程序到 Master 的网络中断了呢?如果是后者的话,会不会出现一个集群里有两个 Master 的情况?
从两阶段提交到 CAP 问题
因为同步复制要求下的数据写入操作,要跨越两个服务器。所以我们不能像前面 Bigtable 里的 SSTable 那样,只要预写日志(WAL)写入成功,就认为在 Master 上数据写入成功了。因为很有可能,同步在 Backup Master 里写入的数据,会由于硬件问题或者进程忽然被 kill 等原因失败了。这个时候,所谓的“同步复制”也就不复存在了。
为了解决这个分布式事务问题,我们需要有一个机制,使得 Master 和 Backup Master 两边的数据写入可以互相协同。那么第一个被想到的解决办法,就是两阶段提交(2PC,Two Phases Commit)。
两阶段提交的过程其实非常直观,就是把数据的写入,拆分成了提交请求和提交执行这两个不同的阶段,然后通过一个协调者(Coordinator)来协调我们的 Master 和 Backup Master。
这个过程是这样的:
第一个阶段是提交请求
协调者会把要提交的事务请求发给所有参与者,所有的参与者要判断自己是否可以执行这个请求,如果不行的话,它会直接返回给协调者,说自己不能执行这个事务。而如果它确定自己可以执行事务,那么,它会先把要进行的事务以预写日志的方式写入下来。
需要注意,这个写入和我们在 Bigtable 中所说的,写入日志就意味着数据写入成功有所不同。在提交请求阶段写入的 WAL 日志,还没有真正在参与者这里生效。并且,在写入的日志里,不仅有如何执行事务的日志(redo logs),也有如何放弃事务,进行回滚的日志(undo logs)。当参与者确定自己会执行事务,并且对应的 WAL 写入完成之后,它会返回响应给协调者说,“我答应你我会执行事务的”。
第二个阶段是提交执行
当协调者收到各个参与者的返回结果之后,如果所有人都说它们答应执行这个事务。那么,协调者就可以再次发起请求,告诉大家,可以正式执行刚才的那个事务了。等实际的事务执行完成之后,参与者就会反馈给协调者,而协调者收到所有参与者成功完成的消息之后,整个事务就成功结束。
如果参与者是一个 MySQL 数据库,那么如果协调者发起的数据写入请求,可能会违背 MySQL 里某个表的字段的唯一性约束。这样 MySQL 数据库就应该在提交请求阶段告诉协调者,而不是等到要实际执行的时候才说。
而协调者这个时候,就会在提交执行阶段,直接发送事务回滚的请求。这个时候,各个参与者写下的 undo logs 就会派上用场了,各个节点可以回滚刚才写入的数据,整个事务也就没有发生。
在两阶段提交的逻辑里,是通过一个位居中间的协调者来对外暴露接口,并对内确认所有的参与者之间的消息是同步的。不过,两阶段提交的问题也很明显,那就是两阶段提交虽然保障了一致性(C),但是牺牲了可用性(A)。无论是协调者,还是任何一个参与者出现硬件故障,整个服务器其实就阻塞住了,需要等待对应的节点恢复过来。
三阶段提交和脑裂问题
为了提升整个系统的可用性,有人就会想,要不,我们把提交请求阶段再拆成两步?
第一步,我们不用让各个参与者把执行的动作都准备好,也就是不用去写什么 undo logs 或者 redo logs,而是先判断一下这个事务是不是可以执行,然后再告诉协调者。这一步的请求叫做 CanCommit 请求。
第二步,当协调者发现大家都说可以执行的时候,再发送一个预提交请求,在这个请求的过程里,就和两阶段提交的过程中一样。所有的参与者,都会在这个时候去写 redo logs 和 undo logs。这一步的请求呢,叫做 PreCommit 请求。
原先无论任何一个参与者决定不能执行事务,所有的参与者都会白白先把整个事务的 redo logs 和 undo logs 等操作做完,并且在请求执行阶段还要再做一次回滚。
而在新的三阶段提交场景下,大部分不能执行的事务,都可以在 CanCommit 阶段就放弃掉。这意味着所有的参与者都不需要白白做无用功了,也不需要浪费很多开销去写 redo logs 和 undo logs 等等。
这个方式,可以提升整个系统的可用性,在出现一些网络延时、阻塞的情况下,整个事务仍然会推进执行,并最终完成。这个是因为,进入到提交执行阶段的时候,至少所有的参与者已经都在 PreCommit 阶段答应执行事务了。
但是,在一种特殊的情况下,三阶段提交带来的问题会比二阶段更糟糕。这种情况是这样的:
- 所有参与者在 CanCommit 阶段都答应了执行事务。
- 在 PreCommit 阶段,协调者发送 PreCommit 信息给所有的参与者之后,参与者 A 挂掉了,所以它没有实际执行事务。协调者收到了这个消息,想要告诉参与者 B。而这个时候,参与者 B 和协调者之间的网络中断了。在等待了一段时间之后,参与者 B 决定继续执行事务。
- 而在这个时候,就会发生一个很糟糕的状况,那就是参与者 B 的状态和其他的参与者都不一致了。也就是出现了所谓的“脑裂”,即系统里不同节点出现了两种不同的状态。
实际上,三阶段提交,就是为了可用性(A),牺牲了一致性(C)。
分布式锁Chubby(二) :众口铄金的真相
虽然它们可以帮助我们实现一个分布式的事务,但同时也有着很明显的缺陷:这两个都是一个“单点”特别明显的系统,一旦作为单点的“协调者”出现网络问题或者硬件故障,整个系统就没法运行了。
理解 ACID
我们通常是使用 ACID 这四个字母,来描述事务性需要满足的 4 个属性。
- 原子性(Atomicity),也就是一个事务不能分成更小的粒度。在事务里,也就是几个操作要么同时发生,如果任何一个失败,整个就应该回滚。前面的两阶段提交,其实就是为了保障事务的原子性。但是在三阶段提交中,我们可能就会失去事务的原子性。
- 一致性(Consistency),这个更多是一个应用层面的属性。我们需要确保应用里面的数据是一致的。
- 隔离性(Isolation),它需要解决的问题是,当有多个事务并发执行的时候,相互之间应该隔离,不能看到事务执行的中间状态。
- 持久性(Durability),就是一旦事务成功提交,对应的数据应该保存在硬盘中,并且在硬件故障或者系统崩溃的情况下也不会丢失。
在数据库的 ACID 属性里面,A、C 和 D 看起来是显而易见的。但是这个隔离性 I,就值得说道说道了。我们之前看到的事务的案例,是一个单独的事务。那么,如果有两个事务想要同时发生,并且它们想要读写的数据记录之间有重合,该怎么办?
隔离性需要解决的问题。一般数据库的隔离级别,会分成四种。
- 未提交读(Read Uncommitted):脏数据
- 提交读(Read Committed):幻读
- 可重复读(Repeatble Read):一房两卖(一个经典的例子)
- 可串行化(Serializable):类似于临界资源,只不过资源数是1
摆脱单点故障的 Paxos
对于 GFS 的 Master 的操作,其实并没有有关隔离性的需求,本身是一个分布式的问题,大部分操作,不需要保证先后顺序,实际环境并没有这个需求,只要保证一次处理一个请求,并且可以同步即可。
我们对 GFS 的 Master 操作,最抽象来看,就是写日志。而 Backup Master 起到的作用,就是同步复制日志,每一条日志,都是对文件系统的一次操作。我们可以把这样的一条条操作看成是一个个事件,每一次事件,都让整个文件系统的状态进行了一次改变。所以本质上,这就是一个状态机(State Machine)。
所有异步复制的主从系统,如果我们去读从库,都会遇到这样线性不一致的情况(客户端写入数据到 Master;Master 崩溃了;刚才的数据还没有来得及同步到 Shadow Master;客户端只能从 Shadow Master 读取数据,刚刚写入的数据并没有被读到)。
而为了保障线性一致性,或者说系统的可线性化,我们必须让主从节点之间是同步复制的。而要做到高可用的同步复制,我们就需要 Paxos 这样的共识算法。
并不可行的多协调者
最容易发生的问题,是操作顺序的错乱。我们以两个协调者 A 和 B,以及两个参与者 C 和 D 为例,如果 A 要在 C 和 D 上,删除目录 /data/bigdata,而 B 则是要在 C 和 D 上,把目录 /data/bigdata 改名成 /data/bigdatapaper。因为是分布式的网络环境,那么可能会出现在 C 这里,A 的请求先到,B 的请求后到,/data/bigdata 的目录已经被删除了,所以改名也失败了;而在 D 这里,两个顺序是反过来的,那么 D 这里的 /data/bigdata 目录已经改名成功,而删除则失败了。C 和 D 之间的数据也就不一致了。
Paxos 算法中的协调过程
zookeeper利用的思路原型就是这个。
我们希望啊,我们在写入数据的时候,能够向一组服务器发起请求,而不是一个服务器。这组服务器里面的任何一个挂掉了,我们都能向里面的另外一台服务器发送请求,并且这组服务器里面的每一台,最终写入并执行日志的顺序是一样的。
在 Paxos 算法里,我们把每一个要写入的操作,称之为提案(Proposal)。接受外部请求,要尝试写入数据的服务器节点,称之为提案者(Proposer),比如说,我们可以让一组服务器里面有 5 个提案者,可以接受外部的客户端请求。
在 Paxos 算法里,并不是提案者一旦接受到客户端的请求,就决定了接下来的操作和结果的,而是有一个异步协调的过程,在这个协调过程中,只有获得多数通过(accept)的请求才会被选择(chosen)。这也是为什么,我们通常会选择 3 个或者 5 个节点这样的奇数数字,因为如果是偶数的话,遇到 2:2 打平这样的事情,我们就没法做出判断了。
给提案编号
首先是每一个请求,我们都称之为一个“提案”。然后每个提案都有一个编号,这个编号由两部分组成。高位是整个提案过程中的轮数(Round),低位是我们刚才的服务器编号。每个服务器呢,都会记录到自己至今为止看到过,或者用到过的最大的轮数。
那么,当某一台服务器,想要发起一个新提案的时候,就要用它拿到的最大轮数加上 1,作为新提案的轮数,并且把自己的服务器编号拼接上去,作为提案号发放出去。并且这个提案号必须要存储在磁盘上,避免节点在挂掉之后,不知道最新的提案号是多少。
Prepare 阶段
那么,当提案者收到一条来自客户端的请求之后,它就会以提案者的身份发起提案。提案包括了前面的提案号,我们把这个提案号就叫做 M。这个提案会广播给所有的接受者,这个广播请求被称为 Prepare 请求。
而所有的 Acceptor 在收到提案的时候,会返回一个响应给提案者。这个响应包含的信息是这样的:
- 首先,所有的接受者一旦收到前面的 Prepare 请求之后,都会承诺它接下来,永远不会接受提案号比当前提案号 M 小的请求;
- 其次,如果接受者之前已经接受过其他提案的内容(假设是 X)了,那么它要存储下已经接受过的内容和对应的提案号。并且在此之后,把这个提案号和已经接受过的内容 X,一起返回给提案者。而如果没有接受过,就把内容填为 NULL。
Accept 阶段
当提案者收到超过半数的响应之后呢,整个提案就进入第二个阶段,也称之为 Accept 阶段。提案者会再次发起一个广播请求,里面包含这样的信息:
- 首先仍然是一个提案号,这个提案号就是刚才的 Prepare 请求里的提案号 M;
- 其次,是提案号里面的内容,一般我们也称之为提案的值。不过这个值,就有两种情况了。
第一种情况,是之前接受者已经接受过值了。那么这里的值,是所有接受者返回过来,接受的值当中,提案号最大的那个提案的值。也就是说,提案者说,既然之前已经做出决策了,那么我们就遵循刚才的决策就好了。
而第二种情况,如果所有的提案者返回的都是 NULL,那么这个请求里,提案者就放上自己的值,然后告诉大家,请大家接受我这个值。
那么接受到这个 Accept 请求的接受者,在此时就可以选择接受还是拒绝这个提案的值。通常来说:
- 如果接受者没有遇到其他并发的提案,自然会接受这个值。一旦提案者收到超过半数的接受者“接受”的请求。那么它就会确定,自己提交的值被选定了。
- 但也有可能,接受者刚才已经答应了某个新的提案者说,不能接受一个比提案号 N 早的请求。而 N>M,所以这个时候接受者会拒绝 M。
不管是接受还是拒绝,这个时候接受者都会把最新的提案编号 N,返回给提案者。
提案者还是会等待至少一半的接受者返回的响应。如果其中有人拒绝,那么提案者就需要放弃这一轮的提案,重新再来:生成新的提案号、发起 Prepare 请求、发起 Accept 请求。而当超过一半人表示接受请求的时候,提案者就认为提案通过了。当然,这个时候我们的提案虽然没有变,但是提案号已经变了。而当没有人拒绝,并且超过一半人表示接受请求的时候,提案者就认为提案通过了。
可线性化和共识算法
可以看到,在 Paxos 算法这个过程中,其实一直在确保一件事情,就是所有节点,需要对当前接受了哪一个提案达成多数共识。
如果有多个 Proposer 同时想要向这个一致性模块写入一条日志,那么最终只会有一条会被成功写入,其余的提案都会被放弃。多个并发在多个 Proposer 上发生的写入请求,互相之间需要去竞争一次成功提案的机会。我们能够确保所有服务器上写入日志的顺序是一样的,不过,相信你也发现了 Paxos 算法的一个问题,那就是开销太大了,任何一个共识的达成,都需要两轮 RPC 调(prepare+accept),当然,我们可以用各种手段在共识算法层面进行优化,比如一次性提交一组日志,而不是一条日志。这也是后续 Multi-Paxos 这些算法想到的解决方案。
分布式锁Chubby(三) :移形换影保障高可用
不过,无论是 GFS 也好,Bigtable 也好,我们能看到它们都是一个单 Master 系统,而不是有多个 Master,能够同时接受外部的请求来保持高可用性。所以,尽管在论文里面,Google 没有说 GFS 在 Master 和 Backup Master 之间数据的同步复制是怎么进行的,但是我的推测,采用一个两阶段提交的方式会更简单直接一点。
如果还是使用两阶段提交这样的方式,我们不还是会面临单点故障吗?而且,我们上一讲所说的 Paxos 算法也用不上啊?
通过 Chubby 转移可用性和“共识”问题
无论是 GFS 还是 Bigtable,其实都是一个单 Master 的系统。而为了让这样的系统保障高可用性,我们通常会采用两个策略。
第一个,是对它进行同步复制,数据会同步写入另外一个 Backup Master,这个方法最简单,我们可以用两阶段提交来解决。第二个,是对 Master 进行监控,一旦 Master 出现故障,我们就把它切换到 Backup Master 就好了。
但我们之前也说过,这里面有两个问题,首先是两阶段提交也有单点故障,其次是监控程序怎么去判断 Master 是真的挂掉了,还是只是监控程序和 Master 之间的网络中断了呢?
其实,解决这两个问题的答案,就是 Chubby,也就是 Paxos 算法。Chubby 具体的技术实现并不简单,但是思路却非常简单。那就是,我们的“共识”并不需要在每一个操作、每一条日志写入的时候发生,我们只需要有一个“共识”,确认哪一个是 Master 就好了。
那么,在 Chubby 这个系统里,它其实针对 Paxos 做了封装,把对外提供的接口变成一个锁。这样,Chubby 就变成了一个通用的分布式锁服务,而不是一个 Paxos 的一致性模块。在锁服务下达成的共识,就不是谁是 Master 了,而是哪一台服务器持有了 Master 的锁。对于应用系统来说,谁持有 Master 的锁,我们就认为这台服务器就是 Master。
而且,Chubby 这个锁服务,是一个粗粒度的锁服务。所谓粗粒度,指的是外部客户端占用锁的时间是比较长的。比如说,我们的 Master 只要不出现故障,就可以一直占用这把锁。但是,我们并不会用这个锁做很多细粒度的动作,不会通过这个分布式的锁,在 Bigtable 上去实现一个多行数据写入的数据库事务。
这是因为,像 Master 的切换这样的操作,发生的频率其实很低。这就意味着,Chubby 负载也很低。而像 Bigtable 里面的数据库事务操作,每秒可以有百万次,如果通过 Chubby 来实现,那 Chubby 的负载肯定是承受不了的。要知道,Chubby 的底层算法,也是 Paxos。我们上一讲刚刚一起来了解过这个算法,它的每一个共识的达成,都是需要通过至少两轮的 RPC 协商来完成的,性能肯定跟不上。
那么相信到这里,你对 Chubby 在整个分布式系统中的作用应该就弄明白了。Chubby 并不是提供一个底层的 Paxos 算法库,然后让所有的 GFS、Bigtable 等等,基于 Paxos 协议来实现数据库事务。而是把自己变成了一个分布式锁服务,主要解决 GFS、Bigtable 这些系统的元数据的一致性问题,以及容错场景下的灾难恢复问题。
Chubby 对外提供的接口
Chubby 里的每一个目录或者文件,都被称之为一个节点(node)。外部应用所使用的分布式“锁”,其实就是锁在这个节点上。哪个客户端获得了锁,就可以向对应的目录或者文件里面写入数据。比如谁是真正的 Master,就是看谁获得了某个特定的文件锁。
举个例子,我们可以定义 /gfs/master 这个命名空间,就用来存放 Master 的相关信息。这样,Master 服务器会通过 RPC 锁住这个文件,然后往里面写下自己的 IP 地址以及其他相关的元数据就好了。而其他客户端在这个时候,就无法获得这个锁,自然也就无法把 Master 改成自己。所有想要知道谁是 Master 的客户端,就只需要去查询 /gfs/master 这个文件就行。
Chubby 作为分布式锁的挑战
现在我们知道,每一个 Chubby 的目录或者文件,就是一把锁。那么是不是我们有了锁之后,分布式共识的问题就被解决了呢?如果你是这么想的,那么你肯定还没有在网络延时上吃到足够的亏。
首先,作为分布式锁,客户端去获取的锁都是有时效的,也就是它只能占用这个锁一段时间。这个和我们前面提到的 Chubby 的 Master 的“租约”原理类似,主要是为了避免某个客户端获取了锁之后,它因为网络或者硬件原因下线了。
- 我们有一个应用的客户端 A,获取了某个 Chubby 里面的锁,比方说 /chubby/geektime 这个文件。A 对这个节点的租期呢,是一段时间 T。而这个锁,是告诉我们这个客户端可以往外部的 Bigtable 数据库的一个 geektime 的行,写入数据。
- 在获取到了锁之后,过了一小段时间,A 仍然还持有这个锁,于是 A 就向 Bigtable 发起一个请求 X,想要往 geektime 这个行里面去写入数据。
- 但是这个时候,可能 A 和 Bigtable 之间的网络非常拥堵,这个请求花了比较长的时间才到达 Bigtable。
- 而当这个写入请求 X 还在路上的时候,客户端 A 的“租约”到期了。这个时候,另外一个客户端 B 获取到了对应的锁,然后它往这个 Bigtable 的 geektime 的行里,写入了数据 Y。
- 当 Y 被写入之后,请求 X 才到了 Bigtable。但是 Bigtable 并不知道谁拥有锁,它只会认为应用层面已经通过锁,实现了对于资源的保护。那么,之前客户端 A 的数据会覆盖掉客户端 B 写入的数据。但是这个情况肯定是我们不愿意接受的,因为对于客户端 B 来说,我明明已经持有了锁,为什么我写入的数据会被此时此刻没有锁的人覆盖掉呢?而且,客户端 A 的数据也是更早版本的数据
首先是锁延迟(lock-delay)
也就是当客户端 A 的“租约”不是正常到期由客户端主动释放的话,它会让客户端继续持有这个锁一段时间。这很好理解,如果是客户端主动释放的话,意味着它已经明确告诉 Chubby,我不会再往里面写入数据。而没有主动释放,很有可能是还有请求在网络上传输,我们就再稍微等一会儿。
其次是锁序列器(lock-sequencer)
它本质上是一个乐观锁,或者在很多地方也叫做 Fencing 令牌。这种方式是这样的:客户端在获取 Chubby 的锁的时候,就要拿到对应的锁的序号,比方说 23。在发送请求的时候,客户端会带上这个序号。而当 Chubby 把锁给了别的客户端之后,对应的锁的序号会变大,变成了 24。而我们对应的业务服务,比如 Bigtable 呢,也要记录每次请求的锁序列号,通过对比锁序列号来确定是否会有之前的锁,尝试去覆盖最新的数据。当遇到这种情况的时候,我们姗姗来迟的来自上一个锁的客户端请求,就会被业务服务拒绝掉。
本文链接: http://woaixiaoyuyu.github.io/2021/10/22/%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%BB%8F%E5%85%B8%E8%AE%BA%E6%96%87%E8%A7%A3%E8%AF%BB/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!