浅谈查询职责分离(CQRS)模式

作者 周蕾 (ArcBlock 后端工程师)

最近几年,在DDD的领域,我们经常会看到CQRS架构的概念, CQRS 是查询职责分离模式(Command Query Responsibility Segregation)的缩写。正好这些日子Arcblock的后端的服务有考虑使用CQRS 的架构,所以今天和大家一起分享一下最近的研读收获。今天文章会从Event Sourcing出发 介绍CQRS,以及通过Commanded (Elixir 的库),一起看一看如何遵循ES/CQRS 的概念开发应用程序。

什么是Event Sourcing (事件溯源)?

Event Sourcing本质来说是保存了发生事件的本身,而不是当前的事物的状态。在Event Sourcing的概念里,Event作为既定的发生之后的事情,也是最小的单位。 比如:

1

Event Sourcing 的工作模式:在下面这条数据流里面,由4个发生的事件(event)组成,进而每一次改变当前的状态,同时事件们的相对顺序也是我们需要保证的。

1

我们会得到这样的总结:

Sn = apply(Sn-1, En) 或者 Sn = reduce(E, S0, apply)

其中: (S: state, E: Event)

现在我们可以发现Event Sourcing 的优点:

  • 每个状态发生的改变都有完备的日志记录,可追溯
  • 优化了的写的操作,提高了性能

我们身边的Event Sourcing

  • 每个程序员的每天都离不开的 Github。 在Git 的世界里, Events(事件) 是Commits, State(状态) 是文件系统。
  • Blockchain 每次保存的是 transaction 而不是一个现在的状态,从这个角度出发, Events(事件) 是transaction, State(状态) 是用户的账户信息。
  • WAL: 是Write-ahead logging, 在数据写入到数据库之前,先写入到日志, 再将日志记录变更到存储器中。Events(事件) 是每一个操作, State(状态) 是数据库。

对于Event Sourcing 来说,想做查询(query) 怎么办?

试想一下,在一个银行系统里面,如果我们想要查询账户余额在1000块以上的用户,那我们难道需要把每个账户的按照 Sn = reduce(E, S0, apply) 这个公式在重新计算一遍吗? 如果我们考虑用一个DataStore 来保存 Event,再用另外一个DataStore 去专门为Query 提供数据,同时两个Datastore 通过发送消息 进行信息同步,如何? CQRS 某种程度上就解决了这样的问题。

CQRS 是什么?

CQRS 全称是 Command Query Rsponsibility Segregation,将应用程序分为两部分:命令端(Command) 和查询端 (Query)。命令端处理程序创建,更新和删除请求,并在数据更改时发出事件。查询端通过执行查询来处理查询,并且通过订阅数据更改时发出的事件流而保持最新。CQRS使用分离的接口将数据查询操作(Queries) 和数据修改操作(Commands) 分离开来,这也意味着在查询和更新过程中使用的数据模型也是不一样的。这样读和写逻辑就隔离开来了。

1

CQRS 里面的一些概念:

  • Command (命令): 不返回任何结果,被校验成功后但会改变对象的状态。
  • Query (查询): 有返回结果,但是不会改变对象的状态。
  • Aggregate (聚合): 保存状态, 处理command,和改变状态。
  • Event Store: 存储Events。

怎么遵循CQRS 模式建立应用程序?

首先我们会基于一个Commanded, 一个Elixir 遵循CQRS/ES 模式 实现 Command side 的库。

1. Commands

Commands 是用户发送给应用程序的指令,表示用户的一种请求,当然请求是有可能失败的,如果想在余额有10的账户里面取出1000块这样的操作。每一个指令对应是一个module,然后使用defstruct定义域,命名方式是MineCoin,动名结构。

2. Events

Events 是由Command产生,最终导致状态改变。会最终在eventstore 里面序列化存储,可以用于日后想要恢复状态。命名方式相比于Command 来说发生了变化,CoinMined, 过去式表达一种过去发生的事实。

3. Aggregates

Aggregates 作为接受处理Command,产生或者引起对应事件的发生,以及一些改变状态的处理器。

里面包含两个函数:execute方法使用来添加我们的校验Command的一些逻辑,输入时状态和command,如果成功输出就是Event。 apply 函数用来更改状态,注意这里的对象是已经是生成出来的event。 ` @spec execute(state, command) ::{ok, [event]} | {:error, term()}

@spec apply(state, event) ::state `

现在我们有了Command, Event, Aggregates ...

那我们还需要一个派遣的角色帮助我们把Command 走向对应的Aggregates。Commanded 库提供了Router: ` defmodule Coins.Router do use Commanded.Commands.Router