切勿浮沙筑高台
Hive:来来去去的DSL,永生不死的SQL
Hive 的设计目标
- 对于 Facebook 当时的数据体量来说,如果使用商业的关系型数据库,面临的瓶颈是计算时间,可能一个每日生成的数据报表一天都跑不完,或者一个临时性的分析任务(ad-hoc)也需要等待很长时间。从这个角度来看,我们等不起机器的时间。
- MapReduce 提供的编程模型和接口仍然太“底层”(low-level)了。即使做一个简单的 URL 访问频次的统计,你也需要通过写一段代码来完成。而不少分析师并不擅长写程序,稍微复杂一些的统计逻辑,可能需要好几个 MapReduce 任务,花上个几天时间。从这个角度来看,我们等不起人的时间。
所以针对这两个瓶颈问题,Hive 的解法和思路也很明确,那就是通过一个系统,我们可以写 SQL 来执行 MapReduce 任务。
Hive 的数据模型
但是,从原始的 MapReduce 任务,到 SQL 语言之间,其实有很多鸿沟。
首先就是序列化和类型信息,基于 SQL 的数据库,有明确的表结构,每个字段的类型也都是明确的。而原先的 MapReduce 里,是没有明确的字段以及字段类型的定义的。所以,填补这个鸿沟的第一步,就是先要在数据的输入输出部分加上类型系统。
Hive 的数据存储
Hive 的表的底层数据,其实就是以文件的形式存放在 HDFS 上的。而且存储的方式也非常直观,就是一张 Hive 的表,就占用一个 HDFS 里的目录,里面会存放很多个文件,也就是实际的数据文件。而通过 Hive 运行 HQL,其实也是通过 MapReduce 任务扫描这些文件,获得计算的结果。
Hive 采用了一个数据库里面常用的解决办法,也就是分区(Partition)。Hive 里的分区非常简单,其实就是把不同分区的文件,放到表的目录所在的不同子目录下。
而在分区之外,Hive 还进一步提供了一个分桶(Bucket)的数据划分方式。
在分区之后的子目录里,Hive 还能够让我们针对数据的某一列的 Hash 值,取模之后分成多个文件。这个分桶,虽然不能让我们在分析查询数据的时候,快速过滤掉数据不进行检索,但是却提供了一个采样分析的功能。这样的数据采样,可以帮助数据分析师快速定性地判断问题,等到有了一些初步结论之后,再在完整的数据集上运行,获得更精确的结果。
Hive 的架构与性能
- 一系列的对外接口
- 驱动器
- 首先是一个编译器
- 然后是一个优化器
- 最后是一个执行引擎和一个有向无环图
- Metastore
从Dremel到Parquet(一):深入剖析列式存储
从行存储到列存储
无论是上节课我们看到的最初版本的 Hive,还是更之前看过的 Thrift,它们存储数据的方式都是“行存储”。所谓的行存储,就是一行(Row)或者说一条数据,是连续存储在一起的。这对于我们写程序去解析数据来说非常方便,我们只需要顺序读取数据,然后反序列化出来一个个对象,遍历去顺序处理就好了。
对于硬盘来说,顺序读远远要优于随机读。
我们能不能不要一行一行存储数据,而是一列一列存储数据?最极端的情况下,我们可以把每一列的数据都单独存储成一个文件。
对于追加写入的数据,我们可以先写 WAL 日志,然后再把数据更新到内存中,接着再从内存里面,定期导出按列存储的文件到硬盘上。一个 MapReduce 程序,把原来的按行存储的数据做一个格式转换就好了,在这个 MapReduce 的过程中,数据的读写都是顺序的,我们的分析程序也只需要读取这个数据转换的结果就好了。
解决嵌套结构问题
而 Dremel 的论文,就对重复嵌套字段和可选字段这两个问题提供了一个解决方案,那就是除了在列里存储每一个值之外,它还额外存储了两个字段。有了这两个字段,我们就能反向组装出原来的一行行数据了。
- 第一个字段叫做 Repetition Level,用来告诉我们,当前这个值相对于上一次出现,是由第几层嵌套结构引起的。
- 第二个字段叫做 Definition Level,用来告诉我们,当一个字段是 Optional,也就是可选的时候,它现在没有填充值,是因为多层嵌套的哪一层的字段为空。
对于很多取值为 NULL 的字段来说,我们并不知道它为空,是因为自己作为一个 Optional 字段,所以没有值,还是由于上一层的 Optional 字段的整个对象为空。而这个 Definition Level,是要告诉我们,对于取值为 NULL 的对象,具体到哪一层 Optional 的对象的值变成了 NULL。知道了这个信息,我们通过列数据反向组装成行对象的时候,就能够 100% 还原了,不会出现不知道哪一层结构应该设置为空的情况。
这里图片文字有个小问题:country为空,是因为第1层的language没有值,所以d=1
从Dremel到Parquet(二):他山之石的MPP数据库
Dremel 也是从很多过去的系统中汲取了养分:
- 第一,它从传统的 MPP 数据库,学到了数据分区和行列混合存储,并且把计算节点和存储节点放在同一台服务器上。
- 第二,它从搜索引擎的分布式索引,学会了如何通过一个树形架构,进行快速检索然后层层归并返回最终结果。
- 第三,它从 MapReduce 中借鉴了推测执行(Speculative Execution),来解决了少部分节点大大拖慢了整个系统的整体运行时间的问题。
Dremel 系统架构
第一点是让计算节点和存储节点放在同一台服务器上。MPP 数据库和搜索引擎的分布式索引的架构也是这样的。
第二点是进程常驻,做好缓存,确保不需要大量的时间去做冷启动。这一点,也跟 MPP 数据库和分布式索引采用的架构和优化手段类似。
第三点是树状架构,多层聚合,这样可以让单个节点的响应时间和计算量都比较小,能够快速拿到返回结果。这个架构,和搜索引擎的分布式索引架构是完全相同的。
1)首先是根服务器(root server),用来接收所有外部的查询请求,并且读取 Dremel 里各个表的 METADATA,然后把对应的查询请求,路由到下一级的服务树(serving tree)中。
2)然后是一系列的中间服务器(intermediate servers),中间服务器可以有很多层。比如第一层有 5 个服务器,那么每个服务器可以往下再分发下一层的 5 个服务器,它是一个树状结构,这也是服务树的这个名字的由来。我们所有查询 Dremel 系统的原始 SQL,每往下分发一层,就会重写(rewrite)一下,然后把结果在当前节点做聚合,再返回给上一层。
3)最下面是一层叶子服务器(leaf servers),叶子服务器是最终实际完成数据查询的节点,也算是我们实际存储数据的节点。
最后一点则仍然来自于 GFS/MapReduce,一方面是即使不使用 GFS,数据也会复制三份存放到不同的节点。然后在计算过程中,Dremel 会监测各个叶子服务器的执行进度,对于“落后”的计算节点,会调度到其他计算节点,这个方式和 MapReduce 的思路是一样的。更进一步的,Dremel 还会只扫描 98% 乃至 99% 的数据,就返回一个近似结果。对于 Top K,求唯一数,Dremel 也会采用一些近似算法来加快执行速度。这个方法,也是我们在 MapReduce 中经常用到的。
1 | -- 原始sql |
行列混合存储的 MPP 架构
Dremel 的列存储本质上是行列混合存储的。所以每一个节点所存储的数据,是一个特定的分区(Partition),但是里面包含了这个分区所有行的数据。把数据存储和计算放在同一个节点,以及将用户 SQL 查询重写,并行分发到多个节点并且汇总所有节点的查询结果,是 MPP 数据库的常见方案。
树形分发的搜索引擎架构
这个架构中最核心的价值,在于可以通过中间服务器来进行“垂直”扩张。并且通过“垂直”扩张,可以在计算量基本不变的情况下,通过服务器的并行,来缩短整个 SQL 所花费的时间。也就是通过增加更多的服务器,让系统的吞吐量(Throughoutput)不变,延时(Latency)变小。这个“垂直”扩张,并不是所谓的对硬件升级进行 Scale-Up,而是增加中间层服务器,增加归并聚合计算的并行度。因为实际扫描数据,是在最终的叶子节点进行的,所以这一层花费的时间和性能是固定的。如果我们没有中间服务器,而是所有的叶子节点数据都直接归并到根服务器,那么性能瓶颈就会在根服务器上。
中间层,帮助我们把数据归并的工作并行化了。我们归并工作需要的 CPU 时间越多,这个并行化就更容易缩短整个查询的响应时间。我们的叶子节点越多,叶子节点返回的数据记录越多,增加中间层就越划算。
Spark:别忘了内存比磁盘快多少
通过引入 RDD 这样一个函数式对象的数据集的概念,Spark 在多轮的数据迭代里,不需要像 MapReduce 一样反反复复地读写硬盘,大大提升了处理数据的性能。
使用硬盘来“容错”的 MapReduce
Map 函数的输出结果会输出到所在节点的本地硬盘上。Reduce 函数会从 Map 函数所在的节点里拉取它所需要的数据,然后再写入本地。接着通过一个外部排序的过程,把数据进行分组。最后将排序分完组的数据,通过 Reduce 函数进行处理。在这个过程里,任何一个中间环节,我们都需要去读写硬盘。因为 Reduce 对于前面的 Map 函数有依赖关系,所以任何一个 Map 节点故障,意味着 Reduce 只收到了部分数据,最终等于是整个任务重来一遍。
“函数式”的 RDD
我们可以看到这个是对于数据的一个抽象。一旦被缓存到内存里,这个 RDD 就能够再次被下游的其他数据转换反复使用。一方面,这个数据不需要写入到硬盘,所以我们减少了一次数据写。另一方面,下游的其他转化也不需要再从硬盘读数据,于是,我们就节省了大量的硬盘 I/O 的开销。
从 RDD 的这个逻辑上,其实我们可以看到计算机工程上的其他系统中的影子。
- 第一个是惰性求值(Lazy-Evaluation),我们的一层层数据转化,只要没有调用 persistent,都可以先不做计算,而只是记录这个计算过程中的函数。而当 persistent 一旦被调用,那么我们就需要把实际的数据结果计算出来,并存储到内存里,再供后面的数据转换程序调用。
- 第二个是数据库里的视图功能。为了查询方便,对于复杂的多表关联,很多时候我们会预先建好一张数据库的逻辑视图。那么我们在查询逻辑视图的时候,其实还是通过一个多表关联 SQL 去查询原始表的,这个就好像我们并没有调用 persistent,把数据实际持久化下来。
- 当然,我们也可以把对应视图的查询结果,直接写入一张中间表,这样实际上就相当于把计算的结果持久化下来了,后续查询的 SQL 就会查询这个中间表。
宽依赖关系和检查点
检查点这里就不介绍了,主要就是自己设置一个checkpoint。
如果一个 RDD 的一个分区,只会影响到下游的一个节点,那么我们就称这样的上下游依赖关系为窄依赖。而如果一个 RDD 的一个分区,会影响到下游的多个节点,那么我们就称这样的上下游关系为宽依赖。
换句话说,在宽依赖下,一个上游节点的失效,会以几倍的影响在下游得到放大。所以,在宽依赖的场景下,上游会像 MapReduce 里的 Map 一样,把输出结果序列化到硬盘上,以减少故障后的恢复成本。
Megastore(一):全国各地都能写入的数据库
互联网时代的数据库
在我们前面介绍过的那些论文里,会发现工程师们对于“可用性”问题的考虑,往往是局限在一个数据中心里。GFS 里我们会对数据做三份备份,但是这三份数据还是在同一个数据中心的三台服务器里;针对 Chubby 这样的服务,我们用了五个节点,放到不同的交换机下,但这仍然是在一个数据中心里。可是,如果我们的分布式数据库只能在一个数据中心里,那无论是在“可用性”上,还是在“性能”上,在互联网时代都有点不够看。
那么,解决这两个问题最好的办法,就是我们有多个数据中心。每个用户的请求都可以访问到就近的本地数据中心,对应的数据也直接就近写入本地数据中心里的数据库,也都从本地数据中心的数据库里读。而各个数据中心的数据库之间,会进行数据复制,确保你在旧金山写入的数据,我在上海一样可以读到。
复制、分区和数据本地化
和 Hive 没有重写计算引擎而是直接用了 MapReduce 一样,Megastore 也没有重写数据存储层,而是直接使用了 Bigtable。那么,Megastore 想要解决的第一个问题,就是如何在多个数据中心的 Bigtable 之间复制数据。
我们常见的数据复制的方案,其实无非就是三种:
- 第一种是异步主从复制(Asynchronous Master/Slave),这个我们之前在讲解Chubby的时候就讨论过。异步主从复制有两个核心问题,第一个是如果 Master 节点挂了,Slave 如果还没有及时同步数据的话,我们可能会丢数据。第二个是如果写数据在数据中心 A,读数据在数据中心 B,那么刚写入的数据我们会读不到,也就是无法满足整个系统是“可线性化”的。
- 第二种是同步主从复制(Synchronous Master/Slave),这也是大部分系统的解决方案。
- 第三种叫做乐观复制(Optimistic Replication),这种方案是 AP(Availability + Partition Tolerance)系统中常常用到的方案。也就是数据可以在任何一个副本中写入,然后这个改动会异步地同步到其他副本中。因为可以在任何一个副本就近写入,所以系统的可用性很好,延时也很低。但是,什么时候会同步完成我们并不知道,所以系统只能是最终一致(eventually-consistent)的。而且,这个系统基本无法实现事务,因为两个并发写入究竟谁先谁后很难判定,所以隔离性就无从谈起了,而“可线性化”自然也就没法做到了。
Megastore 在这件事情上的选择非常简单明确,那就是直接使用 Paxos 算法来进行多个数据中心内的数据库的同步。要注意,Megastore 并不是像我们之前讲解 Bigtable+Chubby 那样,只是采用 Paxos 来确保只有一个 Master。Megastore 是直接在多个数据中心里,采用 Paxos 同步写入数据,是一个同步复制所有的数据库日志,但是没有主从区分的系统。
从业务需求到架构设计
几乎在我下单之前,另一位在海南的朋友也下单了。那么,他下的这张订单是否在我下单之前可以读取到,我却并不在意。一方面,自然我也没有权限看到他的订单,另一方面,在业务上我们也并不需要分辨,几乎在同一时间下单的人谁先谁后。
根据这个例子我们可以看到,从业务上来说,我们不一定需要全局的“可线性化”,而只要一些业务上有关联的数据之间,能够保障“可线性化”就好了。不仅从“可线性化”的角度是这样的,其实数据库事务隔离性的“可串行化”也是这样的。
而这个现实业务中,对于数据库事务的“可串行化”,以及分布式系统的“可线性化”不需要是全局的,就给 Megastore 带来了解决问题的思路。Megastore 是这么做的。
首先,它引入了一个叫做“实体组”(Entity Group)的概念。Megastore 里的数据分区,也是按照实体组进行数据分区的。然后,一个分区的实体组,会在多个数据中心之间通过 Paxos 进行数据同步读写。本质上,Megastore 其实是把一个大数据表,拆分成了很多个独立的小数据表。每一个小数据表,在多个数据中心之间是通过 Paxos 算法进行同步复制的,你可以在任何一个数据中心写入数据。但是各个小数据表之间,并没有“可线性化”和“可串行化”的保障。
可以理解为同一个实体组内部是需要实现串行,可线性化的。但是,两个实体组之间只需要实现数据库的最终一致性即可。
Megastore 在每一个实体组内,支持一阶段的数据库事务。但是,如果你有跨实体组的操作需求,你该怎么办呢?你有两个选择,第一个,是使用两阶段事务,当然它的代价非常高昂,是一个阻塞的、有单点的解决方案。而第二个,则是抛弃事务性,转而采用 Megastore 提供的异步消息机制。因为一旦跨实体组,我们就不能保障数据操作是在同一个服务器上进行的了,就需要跨服务器的操作需求。
两阶段事务相信你已经非常熟悉了,我们来看看这个消息传递机制是怎么样的。当我们需要同时操作两个实体组 A 和 B 的时候,我们可以对第一个实体组,通过一阶段事务完成写入。然后,通过 Megastore 提供的一个队列(queue),向实体组 B 发起一个消息。实体组 B 接收到这个消息之后,可以原子地执行这个消息所做的改动。
在实际应用层面,我们对于“可串行化”以及“可线性化”的需求并不是全局的,而是可以分区的。
Megastore(二):把Bigtable玩出花来
与其说 Megastore 是一个独立的分布式数据库方案,不如说它更像一个 Bigtable 上的应用层的封装。
实体组的数据布局
可以看到,我们是直接拿 User 表的主键 user_id,作为了 Bigtable 里的行键。而对于 Photo 表,我们是拿 user_id 和 photo_id 组合作为行键,存放在同一张表里。因为 Bigtable 里面的数据,是按照行键连续排列的。所以,同一个 User 下的 Photo 的数据记录,会连续存储在对应的 User 记录后面。
这样天然的把不同表相关联的内容存放在了一起。这其实是利用了BigTable的特性。
为了避免热点问题,所有的行键都会带上 Root 实体记录的 Key 的哈希值,这样,虽然同一个实体组里的数据还是连续排列的,但是同一张表的两个连续实体组的 Root 记录的 Key,就不一定存储在一个服务器上了。
Megastore 的索引
Megastore 的索引,分成了两种类型,一个是本地索引,另一种叫做全局索引。本地索引的数据,是直接存储在实体组“内部”的,它是我们已经确定是哪一个实体组的情况下,去寻找具体的记录位置。
而另一种全局索引,就不需要预先知道是哪一个实体组了,但是它的更新就不是实时的了。最新的数据更新,不一定会在全局索引里反映出来,这也是为什么论文里说,全局索引是弱一致的。我们也不难猜到,全局索引应该是异步更新的。
索引优化
- 第一点,是 Megastore 支持在索引中存储数据
- 传统的数据库索引里,往往只存储指向具体数据记录的主键。这就意味着,当我们查询数据的时候,需要两次请求:第一次请求是查询索引,拿到对应数据记录的主键;第二次请求是再通过主键,去查询对应的整条数据,然后拿到我们需要的字段的值。
- 而在 Megastore 里,你可以通过一个 STORING 语句,指定索引里存储下对应的数据记录的某一个字段的值。
- 这个优化听起来微不足道,但是在分布式数据库里其实作用很大。在一般的单机数据库里,索引和数据都是在同一台服务器上,所以索引里不存储数据,只是多了硬盘随机访问的压力。但是在分布式数据库里,如果我们的索引和数据不存储在一个节点上,就意味着还会多一次网络往返,进一步会拉低整个集群的性能。(666)
- 第二点,是 Megastore 支持为 repeated 类型的字段建立索引(Repeated Indexes)
- 而 Megastore 会为里面的每一个 tag 都记录一条索引。这样,我们就可以通过索引,反向查询到某一个 tag 关联到的所有的 Photo 记录。Megastore 这种支持 repeated 字段的索引,使得我们不需要为这样的单个 repeated 字段,去单独建立一张子表。无论这张子表是一张独立的表,还是像 Megastore 的实体组一样挂载在 Root 表上,都很浪费存储空间,也让这个数据表结构变得过于复杂,不容易理解。
- tag是表中的一个字段。
- 最后一点,是 Megastore 提供了对于内联索引(Inline Indexes)的支持。
- 这个索引类型,是为了帮助父实体(Parent Entity)能够快速访问子实体(Child Entity)的某些字段。
- 当有了内联索引之后,我们在第二步查询子实体数据的时候,就可以少一次索引的访问了。不需要在通过索引访问父实体,再访问父实体的某个字段去访问数据,减少了一个步骤。
索引实现
1 | CREATE LOCAL INDEX PhotosByTime |
- PhotosByTime 这个索引由 user_id 和 time 这两个字段组成;
- 并且它索引的是 Photo 这个表,对应的 Photo 表的主键就是 user_id 和 photo_id;
- 那么,索引这一行的行键,就是 ((user_id, time), (user_id, photo_id)) 这样的一个组合;
- 而如果我们的索引,指向的是一个 repeated 的字段,比如 tags,那么每一个 tag 都会有一行数据。比如有三个 tag,分别是 [tag1, tag2, tag3],我们的索引,就会有三条记录,分别是 (tag1, (user_id, time)), (tag2, (user_id, time)) 和 (tag3, (user_id, time))。
Megastore 的事务与隔离性(666,好厉害)
Megastore 只支持同一个实体组下的一阶段事务,那我们就可以把同一个实体组下的所有数据行,看成是一个抽象的“迷你数据库”。在这个迷你数据库上,Megastore 也支持了“可串行化”的 ACID 语义。
现代的关系型数据库,都是采用一种叫做 MVCC(Multiversion Concurrency Control)的机制来实现,中文名称叫做多版本并发控制。这个机制,通俗来讲,就是数据库中的数据会有多个历史版本。你的每一次事务请求,都会拿到当前最新已经提交的那个版本的快照,在整个事务提交的时候,会检查当前数据库里数据的最新版本,是否和你拿到的快照版本一致。
看完这个对于 MVCC 机制的描述,不知道你是不是和我一样,发现 Bigtable 的底层数据读写机制,和它非常匹配。因为 Bigtable 天然会存储数据的多个版本,每一次的数据写入,都是追加了一个新版本,而不是把原来的数据覆盖掉。这样我们就可以把每一个事务提交时的时间戳,用作 MVCC 机制里面的版本。
我们在提交事务的时候,需要指定一个时间戳,而不是让每一行的数据更新都使用当前的时间戳。然后我们在读取数据的时候,只需要找到最后成功提交的事务的时间戳,我们读取这个时间戳版本的数据,就是最新的版本。
正是因为这个时间戳机制的存在,Megastore 对于读取数据提供了 current,snapshot 以及 inconsistent 三种模式。这三种模式其实看名字就很明白了:
- current 就是读取最新版本的数据。在读数据之前,Megastore 的事务系统,会先确认已经提交的事务都已经应用成功。然后,事务系统会读取最新事务对应的时间戳的数据版本。
- snapshot,则不会等待当前是否有已经提交的事务应用完成,而是直接返回上一个完全应用的事务对应的数据版本。
- inconsistent,则是完全忽视事务系统的日志信息,直接获取到 Bigtable 里面最新的数据就好了。自然,在这个机制下,我们就会读到“不一致”的数据,因为我们可能在事务提交到一半的时候,读取到不同行的不同版本的数据。
Megastore 在 Bigtable 本身的存储系统之外,添加了一个独立的事务系统。而这个事务系统,其实就是我们在 Chubby 里面所说的,是一个复制日志的状态机。而我们的事务提交,是通过下面这样 5 个步骤来进行的:
- 读(Read):我们先要获取到时间戳,以及最后一次提交的事务的日志的位置。
- 应用层的逻辑(Application Logic):我们要从 Bigtable 读取数据,并且把所有需要的写操作,收集到一条日志记录(log entry)中。
- 提交事务(Commit):通过 Paxos 算法,我们要和其他数据中心对应的副本,达成一致,把这个日志记录追加到日志的最后。
- 应用事务(Apply):也就是把实际对于实体和索引的修改,写入到 Bigtable 里。
- 清理工作(Clean UP):也就是把不需要的数据删除掉。
而第 3、4 步之间的这个时间差,也是为什么我们的数据需要区分是读取 current 版本,还是读取 snapshot 版本。current 版本,就是预写日志已经完成,但是数据还没有更新到 Bigtable 里,那我们就等待数据更新完到 Bigtable 里,再获取这个最新的数据。snapshot 版本,则不会等待预写日志已经完成,但是数据还没有更新到 Bigtable 里的数据,而是直接获取上一个已经更新到 Bigtable 的数据版本。
所以,如果我们所有的数据读,都是用 current 读,我们就能保障“可线性化”,但是它在有些情况下的延时,会比 snapshot 读长一些,性能会差一些。
那么,当我们出现并发写的时候怎么办呢?请你回头去看一下前面的第 1 步,其中会获取到最新的日志位置。两个并发写入,会在第 3 步,去竞争写入同一个日志位置,但是只有一个会成功。而失败的那个,就会从头来过,重新拿到新的最新日志位置,来发起事务。
小结
实体组的设计,其实是把多张数据表存放在一个 Bigtable 的表的方式,来让根据一个主键能够关联起来的数据,在物理上连续排布在一起。这样无论是读写数据,都有很强的局部性,数据读写的性能都会大大增强。而对应的索引,其实也是 Bigtable 里一行行的记录,同样是根据 Bigtable 行键连续分布的特性,使得根据索引的范围查找和随机查找都变得很容易。
Megastore 与其说是一个数据库系统,不如说是对 Bigtable 的特性进行了合理封装后的一个数据应用层。
Megastore(三):让Paxos跨越“国界”(好厉害的设计思路)
可能会记录很多,但真的不好删减,很精彩的系统整合和设计。
Megastore 面临的 Paxos 挑战
基于 Master 的 Paxos 算法,既有缺点,就是所有的读写都要通过 Master,又有优点,那就是能够通过合并两次 Paxos 算法的 Accept 和 Prepare 阶段来减少网络请求。那么,我们能不能只享受这个办法的优点,又不用去承担这个算法的缺点呢?这个,就是 Megastore 想到的办法,也就是基于 Leader 的 Paxos 算法。接下来,我们分别就从读和写两方面,来看一看 Megastore 是怎么做的。
数据的快速读
Megastore 在每个数据中心,都引入了一个叫做协同服务器(Coordinator Server)的节点,这个节点是用来追踪一个当前数据中心的副本里,已经观察到的最新的实体组的集合。对于所有在这个集合里的实体组,我们只需要从本地读数据就好了。如果实体组不在这个集合里,我们就需要有一个“追赶共识”(catch up)的过程。
- 第一步,是查询本地的协同服务器,看看想要查询的实体组是否是最新的。
- 第二步,是根据查询的结果,来判断是从本地副本还是其他数据中心的副本,找到最新的事务日志位置。
- 我们就从本地副本,拿到最新的日志位置,以及时间戳就好了。在实践当中,Megastore 不会等待查询到本地是不是最新版本,再来启动这个查询。而是会在查询协同服务器的同时就从 BigTable 里面获取这个事务日志的数据。这样通过并行查询的方式来缩短网络延时,即使本地不是最新版本,也无非浪费一次 BigTable 的数据读取而已
- 如果本地副本不是最新的,那么我们就要用到 Paxos 的投票特性了,我们会向 Paxos 其他的节点发起请求,让它们告诉我们最新的事务日志位置是什么。根据多数意见,我们就知道此时最新的事务日志的位置了。然后,我们可以挑一个响应最及时的,或者拥有最新更新的副本,从它那里来开始“追赶共识”。因为我们的本地节点往往是响应最快的,所以从本地副本去“追赶共识”,往往也会是一种常用策略,但这不是我们的必然选择。
- 接着第三步,就是“追赶共识”的过程了。一旦从哪个副本启动“追赶共识”的过程确定了,我们就只要这样操作就好了:
- 第四步,如果我们在“追赶”的过程中,是通过本地副本来发起整个追赶过程的。那么一旦这个追赶的过程完成,意味着本地的数据已经更新到了最新的状态。那么它会向本地的协同服务器发起一个 Validate 的消息,让协同服务器知道这个实体组的数据已经是最新的了。这个消息不需要等待协同服务器确认返回,因为即使它失败了,无非是下一次读数据的时候,把前面的整个过程再走一遍就是了
- 等到前面的这个过程完成,在第五步,我们就根据使用了哪一个“副本”来“追赶共识”,通过我们拿到的日志位置和时间戳,去问它的 Bigtable 要数据。如果这个时候,这个副本不可用了,那么我们就要再找一个副本,从前面的第三步“追赶共识”开始重复一遍。
数据的快速写
Megastore 读数据的思路很直接,其实就是尽可能从本地读取。但是到写数据的时候,问题就没有那么容易了。因为我们前面刚刚说过,我们既希望像基于 Master 的 Paxos 那样,可以把两阶段的 Prepare-Accept 过程,合并成一个过程。但是我们又不希望,单个 Master 成为我们写入数据的瓶颈。所以,Megastore 采用了一个叫做基于 Leader(Leader-Based)的实现方式。
这个思路,首先是借鉴基于 Master 的合并策略,把前一次的 Paxos 算法的 Accept 和后一次的 Prepare 的请求合并。不过,相比于 Master 节点一旦故障,我们就有一段不可用时间。而基于 Leader 的算法,仍然可以在 Leader 节点故障的时候,正常完成整个算法过程。我们只要在 Leader 故障或者不可用的时候,直接退回到原始的 Paxos 算法就好了。
这个 Leader 是谁,我们是怎么知道的呢?其实这是我们在每次提交事务的时候,“写”进去的。我们的每一次事务写入的“值”(也就是事务内容),除了原本要写入到数据库里的数据之外,还会加上对于 Leader 节点的“提名”,一旦事务写入成功,我们就认为提名通过了,下一次 Paxos 算法的 Leader 也就确定了。
总体而言就是先直接accept,如果发现处理accept的leader挂掉了,再使用Paxos共识算法。
系统整体架构
因为事务提交成功,和数据在实际存放的 Bigtable 里可见,其实是分开的两个步骤。这一下子就凸显出协同服务器的重要性出来了。
这也意味着,我们为整个系统引入了一个新的“故障点”。不过,好在协同服务器的这些数据,都只维护在内存里,而且只需要维护本地数据中心的实体组是否是最新的这样一个状态。并且它也没有任何外部依赖,即使节点故障,重新启动一个新的节点,里面的数据也可以通过 Paxos 的“多数投票”过程,恢复出来。
至于协同处理器还是用了类似Chubby的思路,去对应文件上获取相关的锁。
整个 Megastore 系统的整体架构横向来看,是这样的:
- 每一个数据中心里,都有我们的应用服务器,应用服务器本身会通过一个 Megastore 的库,来完成和 Megastore 的所有交互操作。所有的外部请求,比如用户数据更新、上传照片,都是先到应用服务器,再由应用服务器调用它所包含的 Megastore 的库,来进行数据库操作。
- 每一个数据中心里的数据,都是存储在底层的 Bigtable 里的。无论是事务日志,还是实际的数据,都是通过 Bigtable 存储。所以我们也就不用操心数据存储层面的灾备、容错、监控等一系列问题了。
- 中间层,Megastore 有两类服务器。一类是我们刚刚说过的协同服务器,用来维护本地数据是否是最新版本。另一类,叫做复制服务器(Replication Server)。我们所有的数据写入请求,如果是写到本地数据中心里的,直接写 Bigtable。如果是要告诉远端的另外一个数据中心,则是发送给那个数据中心的复制服务器。这个复制服务器,本质上就是起到一个代理的作用。此外,复制服务器还要负责定期扫描本地的副本里,因为网络故障、硬件故障,导致没有完整写入或者应用的数据库事务,然后通过 Paxos 里的 no-op 操作去同步到最新的完整数据。
而为了进一步提升性能和服务器使用的效率,Megastore 对于每一个数据中心的副本,还分成了三种不同的类型:
- 第一种叫做完全副本(Full Replica),也就是拥有前面我们说的所有的这些服务。
- 第二种叫做见证者副本(Witness Replica),这样的副本只参与投票,并且记录事务日志。但是它不会保留实际的数据库数据,所以我们也不能从这些副本查询数据。如果我们的完整副本比较少,比如只有北京和上海两个数据中心,这样我们无法完成 Paxos 需要的至少三个节点的投票机制。那么,我们就可以随便在哪个数据中心,再增加一个见证者副本,确保可以完成“多数通过”的投票机制。
- 第三种叫做只读副本(Read-Only Replica),它恰恰和见证者副本相反,它不会参与投票,但是会包含完整的数据库数据。读取这个副本,可以拿到特定时间点的数据快照,我们可以把它当作是一个异步的数据备份。在明确知道我们读取数据的时候,不需要保障“可线性化”的时候,也可以直接去读取这个数据。因为大部分的数据库都是读多写少的,这个副本也可以减轻我们的完全副本的负载。
Spanner(一):“重写”Bigtable和Megastore
Spanner 的整体架构设计
部署好的一个 Spanner 数据库,在论文里被称之为是一个“宇宙”(Universe)。在这个“宇宙”里,会有很多很多个区域(Zone)。每一个 Zone 里,都有一套类似于 Bigtable 这样的分布式数据库的系统,你可以把它看成是在一个内网内的一套类 Bigtable 的部署。而很多套这样的类 Bigtable 的部署,就共同组成了一个 Spanner 宇宙。注意,尽管多个“Zone”可能是在一个数据中心里的,但是它们互相之间是物理隔离的。最佳的策略,就是让数据副本,尽量放在离会读写它的用户近的数据中心就好。
而一个 Zone 里的服务器,主要分成了三种类型,分别是:
- Zonemaster,负责把数据分配给 Spanserver;
- Spanserver,负责把数据提供给客户端;
- Location Proxy,用来让客户端定位哪一个 Spanserver 可以提供自己想要的数据。
其实这个组合和我们之前学习的 Bigtable 非常类似。Zonemaster 就好像 Bigtable 里的 Master 节点,负责分配 Tablet;Spanserver 就好像 Tablet Server,负责提供数据和在线服务。而 Location Proxy 则是新出现的节点,它承担了原先 Chubby 和 METADATA 表的功能。
一个 Zone 里,只会有一个 Zonemaster,但是会有多个 Location Proxy,但是又有成百上千个 Spanserver。而每个 Spanserver 里,又和 Bigtable 里的 Tablet Server 一样,会负责管理 100~1000 个 Tablet。
而在很多个 Zone 之上,整个 Spanner 宇宙里,还会有两个服务器。一个叫做 Universemaster,也就是这个宇宙的 Master,它只是提供一系列的控制台信息,主要是 Zone 的各种状态信息。这些信息就像我们在 MapReduce 的论文里见过的JobTracker那样,通过暴露各类信息,方便我们去监控和调试整个“宇宙”。还有一个服务器,叫做 Placement Driver,它的作用就是调度数据,也就是把数据分配到不同的 Zone 里。这个分配策略,是使用数据库的应用程序就可以定义的。
Spanserver 的实现
Spanserver 的架构
首先是数据模型。Spanner 的底层数据存储,是一个 B 树这样的数据结构,以及对应的预写日志(WAL)。Spanner 没有像 Bigtable 那样,采用 SSTable 这样的 LSM 树。所以 Spanner 的数据模型,更像是一个关系型数据库,而不是一个 KV 数据库。
在 Bigtable 里,我们看到每一个列的值,都有一个时间戳,在 Megastore 里我们也利用这个时间戳,来实现数据的多版本和 MVCC 下的隔离性。但是,作为一个数据库,其实我们的“版本”应该是在整条数据上,而不是某一列的数据上,我们的数据库事务,针对的也是整条数据记录。
Spanner 的数据调度
一个 Tablet 和它的所有副本,在 Spanner 里组成了一个 Paxos 组(Paxos Group)。Spanner 的一个重要的设计,是可以实现“数据调度”,也就是把数据记录,调度到不同的数据中心里。而这个调度,就是在不同的 Paxos 组之间进行的。
而整个数据在不同 Paxos 组之间的转移,则是通过一个 movedir 的后台任务。这个任务也不是一个事务,这是为了避免在转移数据的过程里,阻塞了正常的数据库事务读写。它的策略是先在后台转移数据,而当所有数据快要转移完的时候,再启动一个事务转移最后剩下的数据,来减少可能的阻塞时间。
Spanner 的数据模型
1 | CREATE TABLE Users { |
User 表,通过一个 uid 作为主键,也就是数据表的行键;Albums 表,也就代表一个相册,使用 uid 和 aid 作为联合主键,并且通过 INTERLEAVE IN PARENT Users ON DELETE CASCADE 指明了它和 Users 表的关系。
在这其中,INTERLEAVE IN 是告诉数据库,父表的行键和子表所有相同行键的数据,共同构成了一个目录。而 ON DELETE CASCADE 则更进一步,不仅申明了关系,而且让 Spanner 可以去做级联删除,一旦父表的 User 的某一行数据删除了,对应的子表里的记录也就删除了。
Spanner(二):时间的悖论
分布式事务
Spanner 的架构就是这样的,每个 Spanserver 上,会有一个事务管理器,用来支持分布式事务。
如果是一个两阶段的跨 Paxos Group 的事务,那么我们就需要通过前面的事务管理器,和其他 Paxos Group 里的事务管理器互相协调,来实现一个两阶段提交。而如果这个事务只在当前的 Paxos Group 里就可以完成,我们就不需要通过事务管理器,直接从锁表里面去获取锁,在本地通过 Paxos 算法进行事务日志的提交就好了。
分布式事务下的可线性化
为了保障可串行化,我们需要给数据打上“版本”,然后多个事务并发的时候。确保读取的数据,是最新版本的快照。然后在事务提交的时候,判断你读写的数据,是否已经有新的版本了,来避免错误地覆盖了已经更新的数据,这也就是所谓的“乐观锁”的并发机制。
所以,一般来说,在分布式系统里我们会使用一个天然的信息作为版本,那就是时间戳。我们直接用事务提交的时间戳,来作为事务的版本信息,提交得早的,事务 ID 就小。如果两个事务提交的时间戳一样怎么办呢?其实这个问题也并不困难,我们只要把所有的服务器编个号,然后把时间戳 + 服务器编号组合在一起,作为事务 ID 就好了。
分布式事务的时间悖论
每个服务器的时间是不能保持全部同步的,为了解决这个问题。采用了原子钟和置信区间。
本质上也只有两个核心点。第一个,是尽可能缩短服务器之间的时钟差异;第二个,则是在缩短了时钟差异之后,让所有的数据写入,不再是有一个时间戳,而是给出一个时间戳的范围。
Spanner(三):严格串行化的分布式系统
深入来讲解 Spanner 的这个分布式事务具体是怎么实现的。
- 第一个,自然是 Spanner 的分布式事务,具体是怎么实现的。这个也有助于你了解最新一代的分布式数据库的实现,现在已经有不少新的数据库,都是师从 Spanner 的论文了。
- 第二个,是深入理解分布式数据库的可串行化和可线性化概念的相关性和差别。分布式数据库的主要挑战,就来自这两方面。
可串行化
可串行化是一个数据库事务隔离性的一个概念,它本身和分布式系统没有关系。它的定义是,一个事务的集合 S,可串行化是该集合中事务的一个满足全序关系的排列。换句话说,可串行化,相当于在外部看起来,这些事务是按照某一个顺序,一个一个执行下来的。
可线性化
而可线性化是一个分布式系统中的概念。它的含义是,对单个对象上的操作,是“实时”的。也就是你对一个数据写入操作成功了,那么立刻去读取它,就会读到刚刚写入的值。
可线性化的本质,是需要我们在分布式系统上的操作符合常识。我们写入成功的数据,之后去读取,必须要能读取到刚刚写入的新数据,而不是有可能会读到一个旧版本的数据。即使写入和读取的动作,可能分别是从两个相聚几千公里的不同数据中心发起的。
不过,你需要注意,可线性化是针对单个对象上的操作。如果两个操作,或者是两个数据库事务,互相之间没有任何交集,那它们在系统里记录的事务的时间,是不需要保障先后顺序的。
严格串行化
而在 Spanner 这个数据库里,我们需要同时保障可串行化和可线性化。而可串行化和可线性化的组合,在分布式数据库领域里,被称之为严格串行化(Strict Serializability)。也就是在外部看来,事务是串行执行的,并且一旦一个事务执行成功,我们立刻去读取数据的话,无论是从分布式系统的哪个节点发起,我们一定能够读取到刚才的最新数据。
我们的数据库,不仅因为支持可串行化的事务隔离性,在内部看起来是一致的,对于所有外部使用数据库的人来说,也是一致并且正确的。这也是为什么 Spanner 说它自己是一个“外部一致”的数据库。
Spanner 事务的具体实现
TrueTime API
tt = TT.now()
tt.ealierst < tabs(enow) < tt.latest
Spanner 为了要保障可线性化,本质上就是要确保任何一个在绝对时间 t 发生的数据读取,一定只能看到在 t 这个时间节点之前已经提交成功的数据写入,之后的数据写入它是看不到的。并且,只要是在绝对时间 t 之前已经提交成功的事务写入,它也一定要能看到,而不能有些看不到。
Spanner 支持的读写类型
Spanner 一共支持 3 种操作,分别是读写事务、快照读事务、以及普通的快照读。
读写事务
Spanner 的数据库事务面临的第一个挑战,就是一个事务写入的时间戳,到底应该是什么。Spanner 的读写事务是这样的:
- 首先,我们先把每一个 Paxos 组(Paxos Group)看成是一个单元,一个 Paxos 组内,数据就是完全通过 Paxos 协议来做到同步复制的,我们先不用去关心它内部的细节,可以先把它当成是一台服务器来看。
- 我们的分布式事务,其实是多个 Paxos 组参与的二阶段事务提交。其中,我们会选定一个 Paxos 组是协调者,其余 Paxos 组都是参与者。
那么整个读写过程会是这样的:
第一步,客户端写入数据库的请求,会先到达分布式事务中的协调者。而且在 Spanner 里,这个协调者本身也是一个事务的参与者。
第二步,和一般的两阶段事务一样,协调者会向参与者的 Paxos 的 Leader 发起一个提交请求,也就是 Prepare 请求。这个时候,参与者会为要更新的数据获取一个写锁。参与者会为事务生成一个 Prepare 请求的时间戳,并且向 Paxos 里,写入一条 Prepare 请求的记录。这个时间戳,会比这个 Paxos 组之前分配给所有事务的时间戳都要晚。
第三步,协调者本身不会给自己发 Prepare 请求,它会等待所有的参与者,返回事务是否可以提交的应答。这些应答中,参与者会把分配给 Prepare 请求的时间戳,一并返回给协调者。这个时候,协调者会为这次事务生成一个事务提交的时间戳。
第四步,在有了这个时间戳之后,协调者会进行一个有条件的等待。它会等到 TT.after(s),也就是在考虑误差的情况下,绝对时间一定已经晚于这个选择的事务提交时间之后,再提交事务。
提交事务的时候,作为一个参与者,它会在本地应用这个事务。作为协调者,它会向其他的参与者发起请求,让它们也提交事务。所有参与者写入的数据都有相同的时间戳,并且在事务应用完成之后,参与者会把对应的写锁释放掉。
第五步,在整个事务提交完成之后,协调者会返回结果给到客户端。
快照事务和快照读取
既然我们已经能够保障,Spanner 的事务写入的时候的时间戳没有问题。那么,下一个挑战就来自于数据库读了。因为 Spanner 允许从任何一个副本读取数据,所以我们就会面临另一个挑战,就是 Paxos 下,我们并不能保障每一个副本都拥有最新的数据。因为数据的写入只需要多数通过就好了,我们完全可能因为网络分区、数据延时等问题,访问了一个没有最新版本数据的副本。
Spanner 是这样解决这个问题的。Spanner 里的数据库读取,都会带上一个时间戳,如果客户端自己没有指定,那么系统会为它分配一个时间戳,这个时间戳我们记为 sread,这个值是由对应记录的 Paxos 组的 Leader 生成的。最简单直接的方式,就是使用 Leader 的 TT.now().latest,也就是考虑时间误差下,当前可能的最晚时间。
而当一个读取数据的请求到达某一个数据副本的时候,这个副本需要回答一个问题,就是它本地的最新数据是什么版本的呢?它是可以直接返回本地的数据,还是要让这个读取请求,等待到它把数据同步完成?
所以,每个副本会在本地维护一个称之为 tsafe 的值。只要请求带来的 sread 比这个值小,那就说明本地副本的数据足够新,可以返回 sread 这个时间节点下最新的数据快照。这个 tsafe 是这样取值的:
$$
tsafe=min(tsafepaxos,tsafeTM)
$$
本文链接: http://woaixiaoyuyu.github.io/2021/12/21/%E5%A4%A7%E6%95%B0%E6%8D%AE%E7%BB%8F%E5%85%B8%E8%AE%BA%E6%96%87%E8%A7%A3%E8%AF%BB%E7%9A%842/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!