TypeScriptでCQRS & Event Sourcing

2020.12.11

Introduction

一般的になってきたCQRSとEvent Sourcing。
これらのアーキテクチャは、RDBMSやNoSQLデータベースをデータストアとして持っている、
昔ながらのCRUDデータモデルの代案として近年採用が進んでいます。

本稿ではCQRS・EventSourcingの簡単な説明と、
それらを用いたサンプルをTypeScriptで実装してみます。

What is CQRS?

CRUDと違い、CQRS(Command and Query Responsibility Segregation)は
データの読み込みと書き込みは本来違うものである、という前提に基づく考え方で、
2010年にGreg yang氏が考案していたものです。

あらゆるメソッドは、アクションを実行するコマンドか、
呼び出し元にデータを返すクエリかのいずれかであって、両方を行ってはならない。
これは、質問をすることで回答を変化させてはならないということだ。
by Greg yang

CQRSでは、データストアの操作をコマンド(登録・更新・削除)と
クエリ(参照)に分類します。
コマンドは基本、システムの状態を変更して、操作の結果を返します。
クエリは冪等であり、システムの状態を変更せずに結果を返します。
間にコマンドが実行されない限り、同じクエリであれば結果は同じになります。

クエリとコマンド、2つの役割を分離することでシンプルなアプリケーションを構築でき、
例えば書き込み用・読み込み用に
それぞれ異なるデータベースを持つケースなどで使われています。

CQRSにするメリットとして、コマンドとクエリを分離しないとQueryが複雑になったりとか、
オブジェクトデータストア間のインピーダンスミスマッチがあったりとかがありますが、
詳しくはこのへん を参照してください。

ちなみに、弊社で開発してるEC/CRMプラットフォーム、Prismatixにおいても
一部CQRSを取り入れており、RDBMS更新時に参照用データストア(Elasticsearch)へ
データを同期して検索はそちらで対応させる、というようなことを行っています。
※Prismatixについての詳細はこちら

What is Event Sourcing?

Event Sourcingでは、状態(ステート)ではなくイベントを中心としてデータを扱います。
データストアにデータの「現在の状態」だけを格納するのではなく、
そのデータに対して実行された一連のすべての実行を記録しておきます。

RDBMSをデータストアとしたCRUDモデルを例にすると、

  • 登録時にはinsertでDBにレコードを作成
  • 更新時はupdateでDBのレコードを更新
  • 削除時はdeleteでDBからレコードを削除

というような処理が普通だと思います。
1件データ登録後にそのデータの更新が行われれた場合、
最新状態のデータがDBに1件存在します。

これに対してEvent Sourcingだと、
すべての実行を追記形式で記録していきます。
「1件データが登録された」という記録と、
「対象データが○○に更新された」という記録が保存されるわけです。
そのため、対象データの最新状態を取得するには、
登録された記録と更新された記録が必要になります。

Event Sourcingのメリット・デメリットは下記。

メリット

  • 完全な履歴を保存可能
  • 任意の過去の時点に戻れる

デメリット

  • 現在の状態を得るためにログをたどる必要がある
  • イベントを全て保存するため、ログが大きくなりがち
  • イベント欠損時のリカバリが難しい
  • バージョンアップ時の整合性を考慮する

また、Event Sourcing単体で使うすると、
クエリメソッドが複雑になったり、インピーダンスミスマッチが発生したりとかの
問題が発生しやすいのですが、CQRS + Event Sourcingにすることで、
それらの問題も解決できます。

Event Sourcingについての詳細はこのへんとかこのへんを参照してください。

Sample Architecture

CQRSとEvent Sourcingの簡単な説明をしたところで、
表題のサンプルアプリを実装してみます。 構成は下記。

User情報の登録・更新・取得用のWeb APIを実装します。 表の左(Create、Update)がコマンド、右(Query)がクエリです。
コマンドが実行されるとUser情報がredis-streamsを通じてSqliteに同期されます。

Environment

  • OS : MacOS 10.15.7
  • node : v14.4.0
  • sqlite3 : 3.28.0
  • redis : 6.0.9

node、redis、sqlite3はHomebrewでインストール済み。

setup

まずは必要なモジュールのインストールです。
TypeScriptに加え、RedisやSqlite用のモジュールをインストールします。
Webサーバとしての機能はnode最速といわれるfastifyを使用します。

% npm install --save fastify
% npm install --save ioredis
% npm install --save sqlite3

% npm install --save-dev @types/node 
% npm install --save-dev @types/ioredis
% npm install --save-dev@types/sqlite3
% npm install --save-dev typescript
# tsc --initしておく

sqlite3でテーブル作成しておきます。

% sqlite3 cqrs.db
sqlite > create table user (code text primary key,name text, mail text,create_timestamp text,update_timestamp text);

redisも起動しておきましょう。

% redis-server

Implement Web API

まずはコマンドやクエリで使用するUser用のクラスや共通インターフェイスを定義します。
※余計なimportとかは省略してます

export interface Model {}
export interface Query {}

//コマンド用Userモデル
export class User implements Model {
  constructor(public code: string, public name: string, public mail: string) {}
}

//クエリ用Userモデル
export class QueryUser implements Query {
  constructor(
    public code: string,
    public name: string,
    public mail: string,
    public create_timestamp: string,
    public update_timestamp: string
  ) {}
}

コマンド実行時にRedisにpublishするクラスを定義します。

import Redis from 'ioredis'

export class RedisUtil {
  private static instance: RedisUtil
  private redis: any

  private constructor() {
    this.redis = new Redis()
  }
  static getInstance() {
    if (!RedisUtil.instance) {
      RedisUtil.instance = new RedisUtil()
    }
    return RedisUtil.instance
  }

  async publish(
    stream: string,
    eventName: string,
    code: string,
    model: Model
  ): Promise<string> {
    const id = await this.redis.xadd(
      stream,
      '*',
      'eventName',
      eventName,
      'code',
      code,
      'json',
      JSON.stringify(model)
    )
    return id.toString()
  }
}

xadd関数を使えば指定したstreamに対してデータを追加します。
イベント名とJson形式のデータをredis-streamsに送ります。

サーバのエントリポイントとなるindex.tsの定義です。
POST(Create)、PUT(Update) GET(Query)用の処理をそれぞれ定義します。

import fastify from 'fastify'
import sqlite3 from 'sqlite3'

interface ICreateUserBody {
  code: string
  name: string
  mail: string
}

interface IUpdateUserBody {
  code: string
  name: string
  mail: string
}

interface IGetUserParam {
  code: string
}

const server = fastify()

server.get<{ Params: IGetUserParam }>('/user/:code', async (request, reply) => {
  const code: string = request.params.code
  const db = new sqlite3.Database('cqrs.db')
  //get data from sqlite3 by code
  db.serialize(function () {
    const stmt = db.prepare(
      'select code,name, mail, create_timestamp,update_timestamp from user where code =?'
    )
    stmt.get(code, function (err, row) {
      if (err) {
        throw err
      }
      const qu: QueryUser = new QueryUser(
        row.code,
        row.name,
        row.mail,
        row.create_timestamp,
        row.update_timestamp
      )
      reply
        .code(200)
        .header('Content-Type', 'application/json; charset=utf-8')
        .send(qu)
    })
  })
})

server.post<{ Body: ICreateUserBody }>('/user', async (request, reply) => {
  const ins: RedisUtil = RedisUtil.getInstance()
  const user: User = request.body
  const publishId: string = await ins.publish(
    'user-stream',
    'UserCreated',
    user.code,
    user
  )
  reply
    .code(200)
    .header('Content-Type', 'application/json; charset=utf-8')
    .send({ eventType: 'UserCreated', id: publishId })
})

server.put<{ Body: IUpdateUserBody }>('/user', async (request, reply) => {
  const ins: RedisUtil = RedisUtil.getInstance()
  const user: User = request.body
  const publishId: string = await ins.publish(
    'user-stream',
    'UserUpdated',
    user.code,
    user
  )
  reply
    .code(200)
    .header('Content-Type', 'application/json; charset=utf-8')
    .send({ eventType: 'UserUpdated', id: publishId })
})

server.listen(8080, (err, address) => {
  if (err) {
    console.error(err)
    process.exit(1)
  }
  console.log(`Server listening at ${address}`)
})

これでWeb APIの準備はOKです。

Implement redis-streams to sqlite program

次はredis-streamsを見てsqliteへ更新するプログラムです。
redis-streamsにデータが追加されるたびにsqliteへinsertかupdateを実行します。

//read-redis-stream.ts
import Redis from 'ioredis'
const readRedis = new Redis()
import sqlite3 from 'sqlite3'

//UserCreatedイベント時にsqliteへinsert
function insert(dbName: string, json: string) {
  const db = new sqlite3.Database(dbName)
  db.serialize(function () {
    const now = "datetime('now', '+9 hours')"
    const sql =
      'insert into user (code, name,mail,create_timestamp,update_timestamp) VALUES (?,?,?,' +
      now +
      ',' +
      now +
      ');'
    const stmt = db.prepare(sql)
    const jsonObj = JSON.parse(json)
    stmt.run(jsonObj.code, jsonObj.name, jsonObj.mail)
    stmt.finalize()
    console.log('insert to sqlite : ' + jsonObj.code)
  })
  db.close()
}

//UserUpdatedイベント時にsqliteへupdate
function update(dbName: string, json: string) {
  const db = new sqlite3.Database(dbName)
  db.serialize(function () {
    const now = "datetime('now', '+9 hours')"
    const sql =
      'update user set name=?,mail=?,update_timestamp=' + now + 'where code=?;'
    const stmt = db.prepare(sql)
    const jsonObj = JSON.parse(json)
    stmt.run(jsonObj.name, jsonObj.mail, jsonObj.code)
    stmt.finalize()
    console.log('update to sqlite : ' + jsonObj.code)
  })
  db.close()
}

//xreadでredis-streamからデータを読み取る
async function subscribeStream(stream: string) {
  let lastID = '$'
  while (true) {
    const reply = await readRedis.xread('BLOCK', 10000, 'STREAMS', stream, '$')
    if (!reply) {
      continue;
    }
    const results = reply[0][1]
    const { length } = results
    if (!results.length) {
      continue;
    }
    lastID = results[length - 1][0]
    //update sqlite
    for (const row of results) {
      console.log('eventName :' + row[1][1])
      if (row[1][1] === 'UserCreated') {
        insert('cqrs.db', row[1][5])
      } else if (row[1][1] === 'UserUpdated') {
        update('cqrs.db', row[1][5])
      }
    }
  }
}

subscribeStream('user-stream');

このプログラムは、Webサーバとは別に起動しておく必要があります。

Execute Program

プログラムの記述ができたので動作確認をします。
まずはpackage.jsonに↓のscriptsを追記します。

・・・ 
 "scripts": {
    "build": "tsc -p tsconfig.json",
    "start": "node index.js"
 },
・・・

buildコマンドでTypescriptのコンパイル。

% npm run build

startコマンドでWebサーバを起動します。

% npm run start

redis-streamsからデータを取得してsqliteへ更新するプログラムの起動もしておきます。

% node read-redis-streams.js

curlを使ってAPIにアクセスしてみます。
POST APIでユーザーの登録を実行。

% curl -X POST -H "Content-Type: application/json" -d '{"code" : "USER0001","name":"syuta", "mail":"syuta@cm.com"}' localhost:8080/user
{"eventType":"UserCreated","id":"1607687729494-0"}

PUT APIで登録したユーザーの更新。

% curl -X PUT -H "Content-Type: application/json" -d '{"code" : "USER0001","name":"syuta-update", "mail":"syuta@cm.com"}' localhost:8080/user
{"eventType":"UserUpdated","id":"1607687906732-0"}%

クエリを実行すると最新状態のユーザー情報が取得できます。

% curl -X GET localhost:8080/user/USER0001
{"code":"USER0001","name":"syuta-update","mail":"syuta@cm.com","create_timestamp":"2020-12-11 20:55:29","update_timestamp":"2020-12-11 21:04:46"}

Summary

今回はCQRS + Event Sourcingの簡単なサンプルを実装してみました。
実際にCQRS+ESで実装できるフレームワークとして、
AxonAkkatectureLagomなどがあるので、
興味があるかたはそれらも確認してみてください。

Reference