引言

不论是在分布式环境下还是在计算机内部,每时每刻都在发生事件。例如,单机环境下,用户输入属于事件;进程内,各个线程之间的唤醒也属于事件。在分布式系统中,事件的种类又包括了其他系统发送的消息,而根据场景的不同,消息的含义也不一样。所以本文就从一个宏观的角度,对当前处理事件的系统做一个分类,并介绍一下这些系统各自的特点。

事件的定义

在讨论基于事件处理的系统之前,我们首先对事件做一个定义。一般来说,事件可以分成以下三类:

  • 通知类事件,通知相关系统某件事发生了。
  • 持久化事件,事件发生后将其永久保存下来,但不通知相关系统。
  • 持久化通知,既将事件保存下来,又通知相关系统。

通知类事件

通知类事件是说,当某一个事件发生时,相关系统会执行某段代码或某个函数,响应这个事件,对发生的事件进行处理。这类事件可以说在各个应用中都会存在。例如,运行在浏览器中的JavaScript代码,就会对用户点击浏览器中某一个Button或链接做出反应,这里的点击操作就属于一种通知事件。在一些UI框架如QT中,提供了信号槽机制用于对象之间的通信,当某个事件发生时,系统中的对象就会发射一个信号,订阅这个信号的槽函数就会得到这个通知,然后完成对应的逻辑。许多操作系统通常会提供异步I/O的功能,也即线程在一个EventLoop中等待,当IO完成后,操作系统就会通知该线程,该线程就会被唤醒处理该事件。另外在反应式编程框架中,通常会提供一些高层的API来处理流式事件,例如RxJava,Vert.x等。上述的这些事件通常都是在进程的内存空间中存放,不会写入到硬盘中,因此事件的生命周期小于进程的生命周期。

持久化事件

与通知类事件相反,持久类事件通常不会进行通知,而仅仅是将发生的事情记录下来。例如,时间序列类型的数据库通常会记录每个时间点发生的事件,比如定时从一个温度传感器上读取数值并存储,或者监控CPU的状态,或者记录股票基金的价格等等。另外,数仓中的星状模型或雪花模型,通常用一个事实表来记录发生了什么事件,用一些维表来解释事实表。这些事件通常都会被写入到磁盘中用于日后的分析,但是事件发生时并不会调用处理代码对收到的消息进行处理。

上证指数

持久化通知

有时我们希望在收到事件时既要立刻做出反应,又要事件永久的存储下来,这种事件就是持久化通知。例如消息队列Kafka,Kafka收到事件后会在服务端将事件持久化存储,消费者会周期性地拉取事件,对事件进行处理。对于处理这类事件的系统,称之为流处理系统(Stream Processing)。本文也将围绕流处理系统展开。

事件处理系统的分类

基于窗口的事件处理系统

对于持久化通知,我们可以根据处理事件的方式,对这流处理系统进一步分类。

最简单的处理方式是将事件看成独立的个体,彼此之间独立,例如,在数据库中的单条数据查询操作,不同的select之间不会产生影响。而一个事件通常包含的信息量比较少,整个系统提供的能力也比较弱。

在更多情况下,我们需要同时处理一批数据得到一个结果。例如,在数据库中,我们通常会对查询结果加上group语句或多个表之间进行join操作。

那么对于需要处理多个事件的系统来说,还有一个分类依据是数据处理的范围。在某些系统中,需要处理的数据仅仅是某一段时间内的。例如,股票基金交易系统,通常需要某小时或某一天基金价格的最大值或最小值;在监控服务器性能时,我们通常也是查看某个时间段内报警的数量来评估该时间段内服务的质量。这类系统可以称为基于时间窗口的流处理系统。

但是有的时候我们不能给处理的事件设定一个时间范围,也就是需要处理的事件之间可能间隔了非常长的时间。以微博为例,我们看一下微博中的关注和取消关注这个场景。如果微博是基于事件驱动的,那么关注事件相当于向关注表中插入一条数据,取消关注事件相当于在关注表中删除一条数据。由于用户会在任意一个时刻关注或取关某一个用户,那么当一个用户查看他关注的用户时,就需要检索从当前用户创建以来的所有的关注取关事件,才能得到当前关注人的列表。这时就不能给处理的事件划定时间范围。我们将这类系统称为无窗口的流处理系统。

数据库备份中的事件

下面我们重点关注无窗口的流处理系统。数据库系统的备份机制就是其中很典型的例子。数据库备份的目的是,将同一份数据在多个节点上保存,以保证数据安全或提高读数据时的性能。数据库备份最终目标是保证多个节点上的数据会收敛到同一个状态,并且所有提交的事务在所有的节点上都成功执行了。在数据库备份的过程中,备份机制保证每个事务都能在每一个节点正确地执行。那么我们就可以将事务看作是一个事件,数据的复制过程可以看做是事件在这个分布式系统中的传播的过程,每当数据有更新时,主库就会发送一个备份事件,从库收到后就立刻对数据做对应修改。

数据库的备份机制通常分成两类:

  • 基于日志的备份,将事务以日志的形式写入到日志文件中。
  • 基于一些备份算法,如gossip protocols,anti-entropy等。

基于日志的备份算法有两个关键的要素,首先在日志文件中记录日志的顺序必须是有序的;另外,日志文件必须是仅追加的,也即写入文件后就不能更改了。这种日志的构造过程,要么是基于一种一致性算法,如Raft或Multi-Paxos;要么是将某一个节点指定为主库,其他为从库。事实上大部分一致性算法本质上也是基于主从模式,只不过增加了当主库down之后,通过某种方式指定一个新的主库。

那么在主从模式下,仅由主库决定写入日志的内容和写入日志的顺序,而从库只能读取日志并执行操作。

对于备份日志的内容也有很多不同的实现。例如,许多数据库通常使用Write Ahead Log的方式,在日志文件中记录,数据库中哪些表发生了改变。

数据库备份过程

上图是MySql数据库的备份过程,其中从库在发出备份请求时,会创建一个I/O线程与主库建立连接,然后主库会创建一个Log Dump线程读取BinLog中的值发送给从库,从库收到的日志写入到本地的Relay Log中。由从库这边的SQL线程执行。

现在也有很多监听数据变化的工具,比如DataBus来监控备份日志文件的变化,并通过消息队列发送给别的系统处理,例如我们的业务中通常利用Databus监听数据变更,将变更发送到ES进行数据备份,加快查询速度。

事件溯源(Event Sourcing)

对于基于日志的事件处理系统来说,他们的数据模型是数据本身,数据发生变化的这个事件只是在修改数据时的副产物。如果我们将二者的角色进行交换,将修改数据的事件作为我们的数据模型,而数据的状态为处理事件的副产物,我们可以得到一类新的事件处理系统,其中典型的代表是State machine replication及Event Sourcing。

在SMR和EventSourcing中,首先会定义一系列的事件,当某个事件发生时,就将这个事件存储下来,然后所有订阅这个日志的节点都会收到这个事件,然后调用事件处理函数进行处理。其中事件处理函数可以以任意的方式处理这些事件。对于所有节点来说,如果初始状态相同、处理事件的顺序相同、处理事件的逻辑相同,那么最终所有的节点都可以到达相同的状态。

下图是一个Event Sourcing的例子,其中用户做的动作都可以转换为事件,将事件永久存储之后,就可以将其发布出去。订阅这些事件的系统可以将事件转换为当前系统的状态,或发给外部系统分析,或对事件进行溯源。

Event Souring举例

那么这种事件处理系统有什么好处呢,或者说与数据库备份那种系统有什么不同呢?

首先,事件定义本身是有其意义的。例如在领域驱动编程中,我们就可以将事件定义为领域内的某个事件。例如,我们在大学线上报名参加某一个课程,对于基于数据库的系统,产生的事件可能就是“向参加表中新插入一行,课程表中的空闲位置减一”。如果使用SMR或Event Sourcing,我们就可以定义一个报名事件,事件的内容就是“XXX报名了YYY课程”,由事件处理程序对数据库进行操作。

其次,可以对事件进行溯源。有时我们不仅仅需要当前系统的状态,还想知道是如何达到当前状态。例如,我最近在做审批中心相关的需求,审批这个过程就比较适合使用以消息为中心的系统。因为我们不仅仅需要知道当前在哪一个节点,还需要知道是如何达到当前节点,在达到当前节点之前都经过了那些人的审批。

另外,由于我们存储的是事件本身,那么事件的消费就是可以重复的。如果我们之前处理事件逻辑有问题,我们可以新创建一个节点,用新的逻辑重新消费一遍即可。对于关系型数据库来说就很难做到这一点。

这种事件处理系统也有其短板的。

  1. 事件的发布和处理之间是有延迟的,所以如果用户A发送了一个修改的事件的同时,用户B查询当前数据的状态,可能就是不准确的。
  2. 由于事件是永久写入到日志中的,所以没办法删除回滚,如果要回滚,只能发送一个与修改事件相反的事件。
  3. 以事件本身为核心的事件处理系统,其事件的设计必须要谨慎,在之后如果进行升级,那么使用之前版本消息的客户端可能就不能用了,兼容比较困难。
  4. 对于发送信息比较频繁的系统,如果需要更改事件处理逻辑,那么回溯的成本通常会特别高。
  5. 由于发送的事件还需要进行处理,所以事件和最终数据的状态之间的关系对开发者来说并不是那么直观。

基于偏序的事件处理系统

在数据库复制和SMR这类系统处理事件时有一个核心要求就是,事件发生的顺序和事件处理顺序必须是一致有序的。这种有序性如果是生产者和消费者在同一数据中心中,是可以保证的。但是如果机房位于不同的位置,或节点之间的网络不稳定,那么就无法保证事件的有序性。如果要求在极端情况,也就是在网络断开的情况下,仍可以产生和消费消息,也即仅满足CAP理论中的可用性和分区容错性,那么就不能满足一致性,也即不能满足消息是有序到达的。

在理论计算机科学中,CAP定理(CAP theorem)指出对于一个分布式计算系统来说,不可能同时满足以下三点:

  1. 一致性(Consistency) (等同于所有节点访问同一份最新的数据副本)
  2. 可用性(Availability)(每次请求都能获取到非错的响应——但是不保证获取的数据为最新数据)
  3. 分区容错性(Partition tolerance)(以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。)

我们下面来看一下,在无法保证消息有序到达的情况下的事件处理系统如何处理消息的。

完全有序、偏序以及因果序

在介绍存在无序消息的事件处理系统之前,我先介绍一下完全有序、偏序以及因果序的概念。完全有序比较好理解,就是在这个系统中的所有事件中,任意两个事件都可以比较先后或大小关系。偏序的要求就相对弱一些,只需要部分事件可以比较大小或先后关系即可。

偏序是指给定集合S,若S满足:

  • 自反性:∀a∈S,有a≤a;
  • 反对称性:∀a,b∈S,a≤b且b≤a,则a=b;
  • 传递性:∀a,b,c∈S,a≤b且b≤c,则a≤c; 则称S为偏序集合。 全序则比偏序的要求更为严格一些, 在偏序的基础上,多了一个条件:
  • 完全性: 对于 S 中的任意 a、b 元素,必然有 a≤b 或 b≤a.

因果序是指,用户发送了事件A,那么系统中:

  • 在处理A之前已经完成的操作集合X,称作因果先序,计作X->A
  • 在处理A之后进行的操作集合Y,称作因果后序,计作A->Y
  • 既不在A之前也不在A之后的集合Z,称A与Z之间是并发关系。

基于偏序的事件处理系统

由于网络延迟的存在,使得在分布式条件下事件顺序可满足的最强的有序性只能是因果序。因为网络延迟导致不同系统之间构造一个全局统一的时钟是非常困难的,所以我们不能仅根据不同系统本地时间戳对事件发生顺序做排序。如果按照该时间戳,那么位于不同的地区的系统处理事件的顺序是不一样的,各个节点也就无法收敛到同一个状态。

解决这个问题的办法有两种,一种是让两个并发发生的事件满足交换律,也即处理事件的顺序不影响最终结果,那么最终节点的状态也可以保证是一致的。另一种是设计一种排序规则,让不同系统的事件有序。我们这里先介绍第二种方法,第一种方法在下一小节讨论。

在数据库备份和SMR中,日志的写入是仅有主节点控制的,因此事件发生的时间戳也可以按照主节点的本地时间设置。但是在分布式系统中,位于不同地区的主机的时间戳无法达成统一,那么我们就不能根据主机的本地时间戳对事件进行排序。那么对于部分有序的系统,我们可不可以设计一种数据结构,让事件可以排序呢?

当然是可以的!我们可以通过给每个事件加一个逻辑时间戳,让事件根据逻辑时间戳进行排序。最简单的逻辑时间戳是Lamport 时间戳。由于这部分内容展开来讲还是很多的,所以这部分我会新开一篇博客介绍。这里直接丢出最终结论。我们首先给每个节点都分配一个唯一UUID,每个节点的时间戳不采用本地时间,而是一个初始值为0的计数器,当有事件发生时,计数器+1。在传播事件时,除了传每个节点本身的时间戳也即计数器外,还要传自身的UUID,在收到事件时,利用对方的计数器更新本地计数器。当两个计数器相等的时候,在按照UUID进行排序,由此可以让具有偏序关系的事件变成全序事件。

如下图所示,假设有两个Site A和B,初始状态相同,二者同时向最后追加内容。当二者进行数据交换时就会出现冲突,此时就可以先按照时间戳排序,再按照UUID排序,让二者收敛到相同的结果。

是用逻辑时间戳进行排序

虽然基于逻辑时间戳,我们得到了一个事件的全序关系,但是这个关系和SMR,数据库复制有一个明显的不同之处在于,这个全序关系并不满足仅追加的。例如,如果这时候A与B收到另外一个节点C发送的事件p,其事件戳为(3,C),那么A与B就要将事件p插入到x与d之间。所以如果要基于这种全序关系构造一个SMR,那么这个SMR还需要提供一种,事件回放的能力。也即将p插入到x与d之间之后,重新处理d与y。一般来说,回放的事件数量比较少。但是如果某一个节点长时间离线,当再上线时,事件回放的时间复杂度会接近$O(n^2)$。

另外,在有的时候,我们不仅仅要考虑事件回放的时间复杂度,还需要考虑回放对业务造成的影响。比如说,在秒杀系统中,用户在有库存的时候抢到了该物品,系统给该用户发送通知说抢到了,但是如果有另一位用户比该用户更先抢到,但是由于网络,该事件到达的时间比较晚,此时进行事件回放就会对第一个用户产生严重影响。

无冲突可复制数据类型(Conflict-free Replicated Data Type,CRDTs)

在基于偏序的事件处理系统中,我们是以事件为核心数据模型,而最终的系统状态是处理事件的副产物。那么我们仍然可以交换一下角色,我们可以定义一种数据结构,以该数据结构为核心,当数据结构中的内容发生更改时,就产生一个事件。这类事件处理系统的典型代表是无冲突可复制数据类型,简称CRDTs。

现在业界已经有很多这种数据结构的实现,如map,set,list等等,应用通过使用这些数据结构提供的方法,修改这些数据结构内的数据。例如对set我们可以添加或删除一个元素,map可以增加一个键值对等等。在基于操作的CRDT中,如果当数据被修改的时候,CRDT算法就会发出一个事件描述这个更改,事件与事件之间仍然是只满足偏序关系的。那么当一个节点收到另一个节点的消息时,通过CRDT算法将该事件的更改合并到当前数据上,其中CRDT算法在处理事件时要满足交换律,也即处理不同顺序的事件,最终的结果是一致的。通过CRDT算法满足交换律,最终数据的状态也可以收敛到一致。

相比于基于偏序的事件处理系统,CRDT有一个优点是,不需要事件的回放,直接进行处理即可,因此其效率通常比较高。但是,CRDT的一个缺点是,只能使用数据结构提供的操作。比如list结构,一般只满足插入删除,通常不满足对list进行重新排序。

CRDT算法最典型的应用场景就是多人协作软件。下面我就以多人合作编辑系统为例,简单介绍一下CRDT算法的实现。

假设我们现在有如下场景:

初始状态,数字为Lamport时间戳

Site1首先插入了CMD,然后传播给Site2和Site3,然后Site1,2,3同时对当前文本进行编辑,最终可能生成的合并结果是CTRLALTDEL或CTRLDELALT。

对于上面这个场景,我们先给定义一下用户操作事件的数据结构。一开始想到的最简单的方法就是,将这个文本编辑器看作是一个大型的字符数组,然后每次记录用户在某一个位置操作了什么。在上面这个例子中,Site2的操作序列就可以是{op:insert, position:3, char:D},{op:insert, position:4, char:E},{op:insert, position:5, char:L}。

那么在有了用户的操作序列后,剩下的问题就是如何对这些操作进行排序,我们这里仍然使用Lamport逻辑时间戳。我们用SX@TY来表示一个时间戳,其中X表示第几个站点,Y表示发生的时间。有了排序的依据之后,我们将所有站点的事件进行归并排序,就可以得到如下的事件序列:

使用Lamport时间戳排序

我们先观察这个序列,如果我们按照这个序列进行操作,那么最终得到的字符串是CTRLDATLEL,和我们需要的完全不一样。另外,每次更新时,我们都要将各个站点的变化按时间戳合并到一起,然后重新生成内容,此时的时间复杂度为O(n^2)

我们首先来解决第一个问题,为什么得到的结果和我们预期的不一样?原因在于我们使用index,也即数组下标来对字符进行更新,使用这种数据结构无法满足我们前面说的CRDT算法要保证操作满足交换律,比如两个事件都要在0这个位置分别插入A和B这两个字符,哪个事件先进行与后进行对最终结果的影响是完全不一样的。

那么我们使用下标是在干什么呢?我们是用下标来唯一的确定一个字符,利用下标找到这字符,在这个字符后进行操作。比如Site2在4这个位置插入D,但其实用户真正想做的是在D后面插入D,而不是在4这个位置插入。那么也就是说,如果我们先给文档中每一个字符添加一个唯一标识,然后修改我们事件的数据结构,index改为插入字符的唯一标识,那问题就迎刃而解了。

那么接下来,我们要解决,如何给每个字符添加一个唯一标识。这个标识其实我们已经定义好了,我们可以直接用Lamport时间戳来标识一个字符。那么按照这种事件的定义,我们就可以得到一个新的序列:

使用Lamport标识一个字符

其中箭头指向的就是要在哪一个字符后面插入。根据这个序列,我们就可以得到我们需要的CTRLALTDEL结果了!

ok,那接下来我们再分析第二个问题。在使用新的事件定义之后,我们的时间复杂度仍然不变,因为我们仍然需要归并排序使用逻辑时间戳进行排序,然后根据事件顺序进行操作。这里面最耗时的操作利用时间戳进行归并排序的过程。那么我们既然有了每个字符的唯一id,我们其实可以将每个字符放在他最终会出现的位置。我们知道,当前字符的位置,只依赖于他前一个字符的位置,所以,我们只需要将当前操作放在他前一个字符后面即可。对于目标字符为同一个字符的操作,按照时间戳进行排序。那么我们新的序列就变为:

更改排序方法后

如果我们把这个结构展开,我们其实就可以得到一个树,对这个树进行深度优先搜索就可以得到我们需要的结果。在这个过程中,我们不必对操作按照时间戳进行归并排序,只需定位字符位置并插入即可。

因果树

总结

本文首先根据事件是否具有通知属性和是否进行持久化存储进行分类,然后对持久化通知,继续根据是否给予窗口进行分类。然后根据消息到达的顺序是否有序,数据模型是数据本身还是事件本身,得到最终的4类处理系统。

总结

后记

本文是参考文献1的阅读笔记,以文献1的内容为主线,添加了一些自己的理解。由于我在写这篇博客时,我刚刚开始接触后端开发,因此难免会有不足和疏漏之处,欢迎大家指正讨论。

参考文献

  1. Thinking in Events: From Databases to Distributed Collaboration Software
  2. 逻辑时钟 - 如何刻画分布式中的事件顺序
  3. Yjs——一个基于CRDT的数据协同框架
  4. Data Laced with History: Causal Trees & Operational CRDTs
  5. 协同编辑冲突处理算法综述
  6. State machine replication
  7. 分布式领域最重要的一篇论文,到底讲了什么?
  8. Event Sourcing pattern
  9. CQRS Event Sourcing介绍
  10. MySQL 5.7 Reference Manual::Replication
  11. 看完这篇还不懂 MySQL 主从复制,可以回家躺平了~