切勿浮沙筑高台
Raft(一):不会背叛的信使
在 2021 年的今天,最常被使用的分布式共识算法,已经从 Paxos 变成了 Raft。这要归功于来自斯坦福大学,在 2013 年发表的一篇论文《In Search of an Understandable Consensus Algorithm》。
分布式共识问题和复制状态机
无论是在 Chubby 还是 Spanner 里,我们通过 Paxos 算法想要解决的问题,其实都是一个“状态机复制(State Machine Replication)”问题,简称 SMR 问题。
我们可以把一个数据库的写入操作,看成是一系列的顺序操作日志,也就是把对于数据库的写入,变成了一个状态机。而对于这个状态机的更新,我们需要保障高可用性和容错能力。这个需要我们对于某一个服务器上的日志追加写入,能够做到同步复制到多个服务器上。
这样,在当前我们写入的节点挂掉的时候,我们的数据并不会丢失。为了避免单点故障,我们希望可以从多台服务器都能写入数据,但是又不能要求所有服务器都同步复制成功。前者,是为了确保某个服务器挂掉的时候,我们可以快速切换到另一台备用服务器;后者,是避免我们写入的时候,因为某一台服务器挂掉了就无法写入成功。
Raft 算法拆解
Raft 算法则把问题拆分成了一个个独立的子问题,比如 Leader 选举(Leader Election)、日志复制(Log Replication)、安全(Safety)和成员变化(Membership Changes)等等。这些子问题都解决了,状态机复制的问题自然就解决了。
Raft 算法的思路非常简单明确,它做出了这样几个选择:
- 首先是让系统里始终有一个 Leader,所有的数据写入,都是发送到 Leader。一方面,Leader 会在本地写入,另一方面,Leader 需要把对应的数据写入复制到其他的服务器上。这样,问题就变简单了,我们只需要确保两点,第一个是系统里始终有 Leader 可用;第二个,是基于 Leader 向其他节点复制数据,始终能确保一致性就好了。
- 因为 Leader 所在的服务器可能会挂掉,那么挂掉之后,我们需要尽快确认一个新 Leader,所以我们就需要解决第一个子问题,就是 Leader 选举问题。
- 我们需要保障分布式共识,所以 Leader 需要把日志复制到其他节点上,并且确保所有节点的日志始终是一致的。这个,就带来了第二个问题,也就是日志复制问题。
- 同时,在 Leader 挂掉,切换新 Leader 之后,我们会遇到一个挑战,新的 Leader 可能没有同步到最新的日志写入。而这可能会导致,新的 Leader 会尝试覆盖之前 Leader 已经写入的数据。这个问题就是我们需要解决的第三个问题,也就是“安全性”问题。
基本概念和算法框架
这里,我们先来看看,Raft 系统的基本数据写入流程是怎么样的。首先,Raft 的系统里也会和 Chubby/ZooKeeper 一样,有多台服务器。这些服务器,会分成这样三个角色:
- Leader,它会接收外部客户端数据写入,并且向 Follower 请求同步日志的服务器。同一时间,在整个 Raft 集群里,只会有一个 Leader。
- Follower,它会接收 Leader 同步的日志,然后把日志在本地复制一份。
- Candidate,这是一个临时角色,在 Leader 选举过程中才会出现。
外部的客户端写入数据的时候,都是发送给 Leader。Leader,本质上是通过一个两阶段提交,来做到同步复制的。一方面,它会先把日志写在本地,同时也发送给 Follower,这个时候,日志还没有提交,也就没有实际生效。Follower 会返回 Leader,是否可以提交日志。当 Leader 接收到超过一半的 Follower 可以提交日志的响应之后,它会再次发送一个提交的请求给到 Follower,完成实际的日志提交,并把写入结果返回给客户端。
Leader 选举
那么,既然是一个两阶段提交,我们就会遇到 Leader 节点挂掉的时候。Raft 不能让整个系统有单点故障,所以节点挂掉的时候,它需要能够协商出来一个新的 Leader,这个协商机制就是 Leader 选举。
Raft 的 Leader 选举是这样的:
- 在 Raft 里,Leader 会定期向 Follower 发送心跳。Follower 也会设置一个超时时间,如果超过超时时间没有接收到心跳,那么它就会认为 Leader 可能挂掉了,就会发起一轮新的 Leader 选举。
- Follower 发起选举,会做两个动作。第一个,是先给自己投一票;第二个,是向所有其他的 Follower 发起一个 RequestVote 请求,也就是要求那些 Follower 为自己投票。这个时候,Follower 的角色,就转变成了 Candidate。
- 在每一个 RequestVote 的请求里,除了有发起投票的服务器信息之外,还有一个任期(Term)字段。这个字段,本质上是一个 Leader 的“版本信息”或者说是“逻辑时钟”。
- 每个 Follower,在本地都会保留当前 Leader 是哪一个任期的。当它要发起投票的时候,会把任期自增 1,和请求一起发出去。
- 其他 Follower 在接收到 RequestVote 请求的时候,会去对比请求里的任期和本地的任期。如果请求的任期更大,那么它会投票给这个 Candidate。不然,这个请求会被拒绝掉。
- 除了对比任期之外,Candidate 还需要满足后面我们讨论的一些“安全性”要求,那就是选举出来的 Leader 上,一定要有最新的已经提交的日志,这个我们在这里先不聊,后面在讲解安全性的时候会深入讲解。
- 在一个任期里,一台服务器最多给一个 Candidate 投票,所以投票过程是先到先得,两个服务器都发起了 RequestVote,我们的 Follower 也只能投给一台。
而我们的 Candidate,会遇到三种情况。
- 第一种,自然是超过半数服务器给它投票。那么,它就会赢得选举,变成 Leader,并且进入一个新的任期。
- 第二种,是有另外一个 Candidate 赢得了选举,变成了下一任的 Leader。这个时候,我们还不知道自己已经输了。过了一会儿,新的 Leader 接收到了外部客户端的写入请求,就会向我们这个 Candidate 发起日志同步的请求。或者,新的 Leader 也会向我们发送一个心跳请求。无论是哪种请求,里面都会包含一个任期的信息,这个任期的信息比我们当前知道的最新的 Leader 大,那么我们就知道自己在投票里面输了,会把自己变成一个 Follower,并更新最新的任期和 Leader 信息。
- 第三种,是过了一段时间,无人获胜。我们会为这种情况设置一个超时时间,如果到了超时时间,我们既没有赢也没有输,那么我们会对任期自增 1,再发起一轮新的投票。
那就是当 Leader 挂掉的时候,很多个 Follower 都会成为 Candidate。他们都会先给自己投票,然后向其他人发起 RequestVote。这样,就会导致票被很多人“瓜分”,没有人能拿到超过半数的票。然后我们就会陷入前面的第三种情况,无限循环出不来了。Raft 对于这个问题的解决方案,是让选举的超时时间在一个区间之内随机化。这样,不同的服务器会在不同的时间点超时,最先超时的那个服务器,大概率会在其他服务器发现超时之前,就赢得投票。
日志复制
Leader 选出来了,那我们自然就可以向 Leader 发送写入数据的请求。既然是一个“状态机复制”的方案,写入请求其实就是一条操作日志的追加写。在 Raft 里,就是通过一个 AppendEntries 的 RPC 调用实现的。
整个数据写入,就是一个前面我们说的两阶段提交的过程,只不过这个两阶段提交只需要“半数”通过,就可以发起第二阶段的提交操作,而不需要等待所有服务器都确认可以提交。
我们追加写入的每一条日志,都包含三部分信息:
- 第一个,是日志的索引,这是一个随着数据写入不断自增的字段。
- 第二个,是日志的具体内容,也就是具体的状态机更新的操作是什么。
- 第三个,就是这一次数据写入,来自 Leader 的哪一个任期。
我们在追加写入日志的时候,不只是单单追加最新的一条日志,还需要做一个校验,确保对应的 Follower 的数据和 Leader 是一致的:
- 首先,在发起追加写入日志的复制请求的时候,Leader 的 AppendEntries 的 RPC 里,不仅会有最新的一条日志,还会有上一条日志里的日志索引和任期信息。
- Follower 会先对比日志索引和任期信息,在自己的日志里寻找相同索引和任期的日志所在的位置。
- 如果找到了,Follower 会把这个位置之后的日志都删除掉。然后把新日志追加上去。
- 如果找不到,Follower 会拒绝新追加的日志。然后,Leader 就知道,这个 Follower 没有同步到最新的日志,那么 Leader 会在自己的日志里,找到前一条日志,再重新发送给 Follower。
- Leader 会不断重复这个过程,确保找到和 Follower 的同步点,找到了之后,就会把这个位置之后的日志都删除掉,然后把同步点之后的日志一条条复制过去。
本质上,Raft 的复制操作,是让 Leader 为每一个 Follower 都从 Leader 的尾部往头部循环,找到 Follower 最新同步到哪里的日志。然后从这个位置开始,往后复制 Leader 的日志,直到最新一条的日志。通过这个过程,我们把每一次 Leader 的日志复制,都变成了一次强制所有 Follower 和当前 Leader 日志同步的过程。
我们需要确保,我们的 Leader 始终是包含了最新提交的日志。也就是无论我们因为故障也好,其他原因也好,随便怎么切换 Leader,Leader 的日志都是最新并准确的。
而这个,则需要我们下面所说的安全性机制来保障。
安全性
首先要注意,Raft 里,每一个服务器写入的日志,会有两种状态,一种是未提交的,一种是已提交的。我们这里所说的最新,指的是已提交的日志。我们想要确保 Leader 的日志是最新的,只需要在 Leader 选举的时候,让只有最新日志的 Leader 才能被选上就好了。
而要做到这一点,也并不困难,Raft 的做法是,直接在选举的 RPC 里,顺便完成 Leader 是否包含所有最新的日志就好了。Raft 是这么做的:
- 在 RequestVote 的请求里,除了预期的下一个任期之外,还要带上 Candidate 已提交的日志的最新的索引和任期信息。
- 每一个 Follower,也会比较本地已提交的日志的最新的索引和任期信息。
- 如果 Follower 本地有更新的数据,那么它会拒绝投票。
在这种情况下,一旦投票通过,就意味着 Candidate 的已提交的日志,至少和一半的 Follower 一样新或者更新。而 Raft 本身写入数据,就需要至少一半成功,才会提交成功。所以,在前面愿意给 Candidate 投票的里面,至少有一个服务器,包含了最新一次的数据提交,而 Candidate 至少和它一样新,自然也就包含了最新一次的数据提交。
Raft(二):服务器增减的“自举”实现
成员变更(Membership Change)
这个“过渡共识”的策略,和我们平时进行应用开发,进行大的数据库表的重构是类似的,那就是采用一个“双写”的迁移策略。
通过 Raft 算法,通过一个日志追加写入更新到整个 Raft 集群里的。那么,整个配置变更的过程就会变成这样:
- 外部的客户端,先向我们的 Raft 集群,写入一个操作,就是把我们的集群配置,从 Cold 变更成 Cold,Cnew。这个写入操作的提交,需要获得半数的 Cold 里的服务器通过。
- 这个写入操作成功之后,集群就进入了“过渡共识”阶段。此时此刻,所有的数据写入,都需要至少获得半数的 Cold 里的服务器和半数的 Cnew 里的服务器通过。
- 即使这个时候 Leader 挂掉了,我们去选举一个新的 Leader,一样也需要获得 Cold 里和 Cnew 里半数服务器的通过。
- 而且,根据我们上节课里要求的安全性,这个过程里新选举出来的 Leader,也一定有最新提交的日志。也就是新的 Leader 的配置,也一定是基于 Cold,Cnew 这个过渡共识的配置的。
- 所以,在整个过渡阶段,我们可以确保所有的日志写入,无论是在老的配置下,还是新的配置下都是可以达成共识的。那么,接着我们就可以再从这个过渡阶段的配置,切换到新配置下。
- 这个切换,同样是通过一个日志追加写入来完成的。我们外部的客户端,会向当前的 Leader,写入一个把配置从 Cold,Cnew 变成 Cnew 的操作。
- 这个操作,只需要 Cnew 里的服务器的半数通过,而不再需要 Cold 里的半数通过了。因为此时,我们已经不再需要 Cold 服务器里的日志信息了。
- 于是,我们的集群就整个地切换到了 Cnew 的配置之下了。
日志压实(Log Compaction)
无论是 Raft 还是 Paxos,分布式共识算法都是基于不断追加写入的日志的。不过,我们之前都回避了这样一个问题,随着时间的推移,写入的日志越来越多,我们的硬盘空间可能会不够,那我们该怎么办呢?
其实,回到我们问题的本源,我们并不需要保留这些写入的日志。我们需要保留的,只是日志应用之后的“状态机”就好了,因为 Raft 要解决的不是日志同步问题,而是状态机复制问题。同步并应用日志,只是这个问题的解决方案。
所以,和一般的数据库系统一样。我们可以定期给状态机创建快照(Snapshot),保留下它当前的最新状态,然后把之前的日志都丢弃掉就好了。这个快照里面需要包含两部分信息:
- 首先自然是状态机本身的最新状态,也就是数据库里面的具体数据;
- 其次,是对应的日志相关的元数据,也就是当前快照里面,对应最后一条日志的索引和任期信息。
这样,通过这个快照,以及后续我们追加写入的日志,我们同样可以组合获得最新的状态机状态。这里我放了一张图,你可以对照着理解一下这个快照是怎么样的。
这个创建快照、清理日志的方式,一般被称之为日志压实(Log Compaction)。有些地方,会把这个 Compaction 翻译成压缩,我觉得是不合理的。因为,这个动作实际上并不是对数据进行压缩(Compression),而是把对于数据库里同一个 Key 的多次更新的日志,压实之后只保留最后一条。
Borg(一):当电力成为成本瓶颈
CGroups-Linux 下隔离资源的解决方案
把不同类型的程序,部署在同一台服务器上,我们面临的第一个问题,就是这两个程序会竞争资源。比如,我们在同一台服务器上,既部署了 Kafka 的 Broker 进程,也部署了 Bigtable 的 Tabet Server 进程。那么,当我们在流量高峰的时候,Kafka 传输的流量占满了整个网络带宽之后,我们的 Tablet Server 就无法对外提供服务了,所有的请求都会因为超时而失败。
要解决这个问题,本质上,我们需要把服务器的资源拆开来,然后把对应的一组应用程序隔离出来,只允许它们去使用服务器的一部分资源。而这个解决方案,就是 Linux 下的 CGroup 功能。CGroups的全称叫做 Linux Control Group,它可以限制一组 Linux 进程使用的资源。具体来说,它主要能做到这样几点:
- 资源限制,比如限制一组进程总共可以使用的内存。
- 优先级,比如限制这组进程能够拿到的 CPU 和 I/O 的吞吐量。
- 结算,也就是统计这组进程实际使用了多少资源。
- 控制,可以冻结一组进程的运行,或者反过来恢复它们的运行。
Borg:典型的 Master-Slave 系统
我们的各种系统的进程都被封装进了一个个的 LXC 容器,这样我们就可以通过一个集群管理系统,把不同的 LXC 容器分配到不同的服务器上运行,这个集群管理系统,就是我们的 Borg。
从用户视角看 Borg
一个 Borg 的集群被称之为一个 Cell,通常被部署在一个数据中心里。而在 Google,一个中等规模的 Cell 通常有 1 万台机器。
在 Borg 里,部署的并不是一个裸的 CGroup,而是一个个的 Tasks。用户会向 Borg 提交一个个的 Job。这些 Job 里,既有像长期驻留的服务,也有一次性运行批处理任务。一个 Job 其实就是一个二进制的程序,这个 Job 程序会被部署到一台或者多台服务器上。每个服务器上实际运行的进程就叫做 Tasks。
Borg 采用了配额和优先级的机制。和我们操作系统里的进程类似,Borg 里的 Job 都有一个优先级,从高到低分别是监控(Monitoring),生产(Production),批处理(Batch)和尽最大努力(Best Effort)这四种,同一种类的优先级下,还能根据一个整数的 priority 参数,来区分不同任务的优先级大小。Job 提交之后部署到具体的服务器上,就会变成一个个 Task,这些个 Task 也就继承了 Job 上的优先级的属性。
Borg 有一种叫做 alloc 的机制。alloc 是一组可以预留的资源,不管这个服务器资源是否被用到了,这个资源我们始终会分配给 alloc。这样,对于 MapReduce 类型的任务,我们可以始终通过 alloc,在我们的 Borg 集群里预留一部分资源,这些资源是其他生产类型的 Job 抢不走的。
Borg 的系统架构
Borg 是一个典型的 Master-Slave 类型的系统。一个 Borg Cell 通常由这样几部分组成。
首先是用户界面,你可以通过配置文件、命令行以及浏览器内的 Web 界面,向 Borg 发起请求,所有这些请求都会统一发送给 Borg 的 Master。
然后就是一个 Master 集群,为了保障高可用性,Master 的所有数据当然是通过 Paxos 协议来维护多个同步复制的副本的。它一样,也是通过 Checkpoint 来建立快照,通过日志记录所有的操作,整个 Master 的集群里,也有一个选举出来的 master 中的 master。而这个 master 中的 master,则是通过获取一个 Chubby 里的锁来确保唯一性。
实际 Task 分配给哪个服务器执行,却不是由 Master 来决定的。Borg 把这部分职责单独从 Master 里剥离出来,给了一个叫做 Scheduler 的服务器。当一个 Job 提交给 Master 之后,Master 会把它变成一个个有待调度的 Tasks,然后加入到一个队列里。而我们的 Scheduler 则会异步遍历这些 Tasks,当它判断我们的集群有足够的资源,可以满足整个 Job 的需求的时候,它就会把 Task 分配到 Slave 服务器里。
Slave 服务器,就是我们实际负责去运行 Task 的服务器。每个 Slave 服务器上,都会有一个叫做 Borglet 的进程。Master 并不会直接和每个 Slave 服务器上的 Task 进程通信,而只是和 Borglet 进程通信。
Borg 会把所有的 Slave 机器分片,然后让每一个 master 的副本,都负责一部分 Borglet 的通信。然后,这个副本会把 Borglet 上报的最新信息,和 Master 已经知道信息的差(Diff),交给 Master 里的 master,以减少这一台特定机器的负载。
Borg(二):互不“信任”的调度系统
虽然 Borg 的 Master 集群,仍然是一个会选举出 master 的 Paxos 实现。但是除了 master 之外的其他副本,也需要去承担和 Borglet 通信的职责,而不仅仅是一个同步数据的副本。
“贪心”的开发者和 Borg
每一个使用 Borg 的开发者,在向 Borg 提交任务的时候,都需要申明任务需要使用的资源。那么,作为开发者你会怎么做呢?要是我,那肯定是尽量多申请点资源给自己留一点余量。
所以,面对贪心的都会多给自己申请一点资源的开发者,Borg 是通过这样两个方式,来提升机器的使用率。
第一个办法,是对资源进行“超卖”。
也就是我明明只有 64GB 的内存,但是我允许同时有申明了 80GB 的任务在 Borg 里运行。当然,为了保障所有生产类型的任务一定能够正常运行,Borg 并不会对它们进行超卖。但是,对于非生产类型的任务,比如离线的数据批处理任务,超卖是没有问题的。大不了,其中有些任务在资源不足的时候,会被挂起,或者调度到其他机器上重新运行,任务完成的时间需要久一点而已。
第二个办法,则是对资源进行动态的“回收”。
Borg 会在 Task 开始的时候,先为它分配它所申请的所有资源。然后,在 Task 成功启动 5 分钟之后,它会慢慢减少给 Task 分配的资源,直到最后变成 Task 当前实际使用的资源,以及 Borg 预留的一些 Buffer。当然,Task 使用的资源可能是动态变化的。比如一个服务是用来处理图片的,平时都是处理的小图片,内存使用很小,忽然来了一张大图片,那么它使用的内存一下子需要大增。这个时候,Borg 会迅速把 Task 分配的资源,增加到它所申请的资源数量。
我们把开发者申请的资源被称之为限制资源(Resource Limit),而实际 Borg 动态分配给它的则是保留资源(Resource Reservation),这两者的差值就是我们的回收资源(Resource Reclamation)。这部分回收资源,就是我们可以再利用起来的了。
平均分配还是多留点闲人
首先,Borg 里的调度器和 Master 是分离的。调度器是异步从 Master 写入的队列里,去扫描有哪些 Task,然后再进行分配的。这个扫描过程中,调度器会先调度高优先级的,再调度低优先级的。而为了保障公平,在同一个优先级里,Borg 会采用轮询的方式。
然后,调度的过程分成两步。第一步,我们当然要先保障这个 Task 能正常运行,所以只能寻找哪些服务器能够满足被调度的 Task 的资源需求。这个,被称之为可行性检查(feasible checking)。然后是第二步,Borg 会对这个 Task 分配到每一台机器上,去打个分,根据打分高低来选择一台服务器。而当资源不足的时候,高优先级的 Task 会抢占掉已经在运行的,低优先级的 Task。
从Omega到Kubernetes:哺育云原生的开源项目
压力山大的调度系统
第一个疑惑是,为什么在线服务和批处理任务,在 Borg 看来都是一个 Job,并且是放在一起调度的。Borg 既没有把在线服务和批处理任务拆分成 Service 和 Job,分别有不同的处理方式,也没有在 Borg 里实现两种调度器(Scheduler),分别去调度这两种不同类型的程序。
第二个疑惑是,Borg 是一个单 Master 系统,它真的能够做到去管理 1 万台服务器吗?如果我们有 Kafka、Hadoop、Flink 以及各种 Web Application 这样多个不同类型的在线服务,我们为什么不给里面的每一个集群,都设计一个 Master 去分配资源,然后再在 Borg 层面去管理这些 Master 就好了呢?
单一的 Master 的确成为了 Borg 的一个瓶颈。
从解决这个问题的视角来看,一个可行的方式就是我们需要多个调度器。我们的一次性运行的 Job,不需要考虑太多的因素,因为毕竟它只运行几秒钟。最差情况下,无非对应的 Task 被抢占了,我们再重新运行一次就好了。
而一旦有了多个调度器,那么我们就要回答一个新的问题:在多个调度器之间,应该如何协调。因为它们毕竟调度的是同一组服务器。于是,在调度器层面,我们也面临在单机多线程程序类似的问题,就是不能让两个 Job 被调度到同一份资源上。
一种最笨的办法,当然是对集群进行静态分区。
那么,针对静态分区的优化,自然就是动态分区。也就是,虽然我们对集群进行了功能层面的划分,但是每个集群分配的资源会动态调整。这样,当我们的在线服务的资源有空余的时候,就可以匀出一些给批处理任务。开源的 Mesos 系统就是采用这样的方案。
把 Master 调度器变成数据库
我们先让多个不同的调度器都能看到全局的资源,然后大家都可以去竞争同一份资源。两个调度器对自己 Job 的调度去抢占服务器资源,就好像并发的两个事务请求到了数据库要更新同一条记录一样。如果第一个成功了,那么第二个“事务”就失败了,需要重试。
不过,这个时候你可能会问:那我们的在线服务,每次调度都需要花很多时间计算,岂不是很容易在事务竞争中失败呢?这个其实没有关系。不要忘了,在 Borg 里,我们的在线服务,是可以去抢占批处理任务的资源的。即使批处理任务先拿到了对应的资源,我们仍然可以重试一次,把这个资源强行占为己有。而所有的相同优先级的在线服务,又都是使用同一个调度器,互相之间不会产生并发和竞争。
本文链接: http://woaixiaoyuyu.github.io/2022/01/18/%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%BB%8F%E5%85%B8%E8%AE%BA%E6%96%87%E8%A7%A3%E8%AF%BB%E7%9A%844/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!