Aurora DSQL CDC を動かしてみる

Aurora DSQL CDC を動かしてみる

2026.05.25

Introduction

Amazon Aurora DSQL の Change Data Capture (CDC) が
2026 年 5 月に preview で公開されました。
CDC は DB の committed write を(ほぼ)リアルタイム に Amazon Kinesis Data Streams へ流す仕組みで、
以下のような用途に使えます。

  • 検索 index / キャッシュ / 分析基盤などにおけるシステム同期
  • event-driven なワークフロー駆動(Lambda/SQSなど)
  • Producer-Consumer の疎結合化

公式ドキュメントにはペイロード仕様の詳細がまとまっているので参照してください。

本記事では、以下の内容を確認しました。

  1. 同一トランザクションのCDC
  2. 並行にcommitしたときのCDC処理
  3. PKなしテーブルで試してみる
  4. DDL 境界(ADD/RENAME COLUMN)+ NULL 列 omit の挙動

Amazon Aurora DSQL - Change Data Capture

DSQL

Aurora DSQL は、サーバレスの分散型RDBです。
PostgreSQL互換なので普通のSQLや既存ツールを使いやすく、複数リージョンにも対応しています。

本記事ではSingle-Regionで使います。

Kinesis Data Streams

マネージドストリーミングデータサービスです。
CDCに関連するところは以下。

Change Data Capture

DB の committed change を near real-time に外部へ流す仕組みです。
Aurora DSQL の CDC は managed&bring-your-own-target(BYOT)モデルです。
※現状ターゲットは Kinesis Data Streams のみ

CDC payload

公式ドキュメント からpayloadのおさらい。

envelope 構造

検証で実際に届いたレコードは以下。

{
  "type": "full",
  "op": "c",
  "before": null,
  "after": { "id": 1, "v": "hello-cdc" },
  "source": {
    "version": "1.0",
    "ts_ms": 1779259861157,
    "ts_ns": 1779259861157725700,
    "txId": "ubtzccn2ehtmtplzjbhyavlkrm",
    "schema": "public",
    "table": "smoke_test",
    "db": "postgres",
    "cluster": "..."
  },
  "ts_ms": 1779259861153,
  "ts_ns": 1779259861153444900
}

主要フィールド

field 公式の意味
type full / chunked / fragment
op INSERT/UPDATE は現状 c、DELETE は d。将来 UPDATE は u に分離予定
source.txId 同一 tx 内の全 record で 共有(grouping 用)
source.ts_ns commit timestamp。全順序キーとして公式が推奨
source.cluster DSQL cluster identifier
before DELETE で PK のみ。INSERT/UPDATE / PK なしテーブルの DELETE では null
after 公式上は post-change row state の全カラム。DELETE では null
ts_ms / ts_ns CDC system が record を処理した時刻

afterフィールドですが、検証時は NULL 列が omit される挙動を確認しました。

delivery / ordering

  • at-least-once。同じ record が複数回届く可能性あり
  • 公式が推奨する dedup key = (source.ts_ns, primary key)
  • 9 MiB 超のレコードは chunked + fragment に分割。chunk_id で再構成、crc32c で整合性検証可
  • ordering モードは UNORDERED で random partition key 配信 → 全 shard 消費が必須
  • 異なる tx 間で record の順序は前後し得る

詳細はここここ を参照してください。

Environment

項目
AWS Region us-east-1
DSQL cluster Single-Region
Kinesis Data Streams on-demand mode / max record size = 10240 KiB
psql libpq 18.4
Node.js v20 (Consumer は @aws-sdk/client-kinesis 3.1050 )
Aurora DSQL CDC Preview(検証時点ではCDC が preview)

Setup

ではDSQL CDCを試してみます。

1. DSQL クラスター作成

まずは DSQL クラスターを作成します。

% aws dsql create-cluster \
  --region us-east-1 \
  --no-deletion-protection-enabled \
  --tags Project=dsql-cdc-verify,Purpose=blog-article-verification
# → identifier と endpoint 取得

% aws dsql wait cluster-active \
  --identifier "$CLUSTER_ID" --region us-east-1

2. Kinesis Data Streams (on-demand)

次にKinesisを作成。

% aws kinesis create-stream \
  --stream-name dsql-cdc-verify-kinesis \
  --stream-mode-details StreamMode=ON_DEMAND \
  --max-record-size-in-ki-b 10240 \
  --tags Project=dsql-cdc-verify \
  --region us-east-1

--max-record-size-in-ki-b 10240(10 MiB)を明示し、
DSQL CDC の chunked record 閾値(9 MiB)より上にする。

3. CDC service role (IAM)

DSQL が Kinesis に書き込むための role を作成します。
Trust policy / permission policy の正確な JSON 雛形はドキュメント にそのまま使える形で載っているので、
そちらをコピーして trust-policy.json / inline-policy.json として保存しましょう。

要点は以下。

  • Trust policy:
    • Service: dsql.amazonaws.comsts:AssumeRole を許可
    • confused deputy 対策で aws:SourceAccountaws:SourceArn (cluster/<CLUSTER_ID>/stream/*) を付与
  • Permission policy:
    • kinesis:PutRecord / PutRecords / DescribeStreamSummary / ListShards をKinesis stream ARN に許可
    • Kinesis stream が KMS CMK で暗号化されている場合は kms:GenerateDataKey も追加
% aws iam create-role \
  --role-name dsql-cdc-verify-role \
  --assume-role-policy-document file://trust-policy.json \
  --tags Key=Project,Value=dsql-cdc-verify

% aws iam put-role-policy \
  --role-name dsql-cdc-verify-role \
  --policy-name KinesisAccess \
  --policy-document file://inline-policy.json

4. CDC stream

% aws dsql create-stream \
  --cluster-identifier "$CLUSTER_ID" \
  --target-definition "kinesis={streamArn=$KINESIS_STREAM_ARN,roleArn=$CDC_ROLE_ARN}" \
  --ordering UNORDERED \
  --format JSON \
  --tags Project=dsql-cdc-verify \
  --region us-east-1

# ACTIVE になるのを待ちましょう
% aws dsql get-stream \
  --cluster-identifier "$CLUSTER_ID" \
  --stream-identifier "$STREAM_ID" \
  --region us-east-1

preview 時点で --orderingUNORDERED のみ、
--formatJSON のみが選択可能。

5. DSQL へ接続(psql + IAM auth)

DSQL は IAM token を PostgreSQL パスワードとして渡す方式です。

# 都度トークンを取り直して psql 実行
dsql_psql() {
  PGPASSWORD="$(aws dsql generate-db-connect-admin-auth-token \
    --hostname "$CLUSTER_ENDPOINT" --region us-east-1)" \
    psql -h "$CLUSTER_ENDPOINT" -U admin -d postgres "$@"
}

# 使い方
% dsql_psql -c "SELECT now();"
% dsql_psql -f some-script.sql

psql-c "DDL; DML;" は複数文を 1トランザクションで投げます。
DSQL は DDL と DML を同一 transaction にできないため、
CREATE TABLE foo(...); INSERT INTO foo ...;-c で一度に実行しようとするとエラーになります。

ERROR: ddl and dml are not supported in the same transaction
DETAIL: ddl count: 1, dml count: 1

同じ -c 文字列に DDL と DML を混ぜないようにしましょう。

6. ローカル Consumer

Consumer は Node で作成します。
依存は @aws-sdk/client-kinesis を使用。
ListShardsGetShardIteratorGetRecords を全 shard で並列にポーリングして、
1 record = 1 行の JSON で標準出力に流します。
ログには以下のフィールドを含めます。

フィールド 内容
receivedAt Consumer が GetRecords で受信した時刻
arrival Kinesis ApproximateArrivalTimestamp
shardId / seq shard 内 sequence number
type / op / table CDC envelope
txId / src_ts_ns transaction identifier / commit timestamp
cdc_ts_ns CDC system が record を処理した時刻
pk after(INSERT/UPDATE)または before(DELETE)から抽出
raw 元の JSON payload 全体

検証ごとに test_run_<n>_<sub>_ のような table prefix を使い、
Consumer 側に --filter <prefix> 引数を入れて他検証の record と混ざらないようにします。

7. Cleanup

検証完了時は忘れずに削除。

# 1. CDC stream を削除
% aws dsql delete-stream \
  --cluster-identifier "$CLUSTER_ID" \
  --stream-identifier "$STREAM_ID" \
  --region us-east-1

# 消えたのを確認
while aws dsql get-stream \
        --cluster-identifier "$CLUSTER_ID" \
        --stream-identifier "$STREAM_ID" \
        --region us-east-1 >/dev/null 2>&1; do
  sleep 3
done

# 2. CDC stream が完全に消えてから Kinesis を消す
% aws kinesis delete-stream \
  --stream-name dsql-cdc-verify-kinesis \
  --region us-east-1

% aws kinesis wait stream-not-exists \
  --stream-name dsql-cdc-verify-kinesis \
  --region us-east-1

# 3. cluster と IAM を削除
% aws dsql delete-cluster \
  --identifier "$CLUSTER_ID" --region us-east-1

% aws iam delete-role-policy \
  --role-name dsql-cdc-verify-role --policy-name KinesisAccess

% aws iam delete-role --role-name dsql-cdc-verify-role

Try

では実際に確認してみましょう。

同一トランザクションの CDC 粒度

1 つの tx を commit すると CDC に何件の record が流れ、
それらのフィールド(txId / source.ts_ns)がどのようになっているか確認します。

以下テーブルで確認。

CREATE TABLE test_run_1_kv (id INT PRIMARY KEY, v TEXT);

同一 tx 内の複数行で source.ts_ns は同一値か

INSERT INTO test_run_1_kv VALUES (1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e');
を 1 文(=1 tx)で流します。
txId が 5 record で共有されるのは公式明記どおり。
source.ts_ns も「commit timestamp」なら同一値になるはずなので確認してみましょう。

結果、5 record すべて
txId=grtzcfcecctfoso4t6cklkhxwmsrc_ts_ns=1779262623863649300 が一致しました。
一方 top-level の cdc_ts_ns(CDC system の処理時刻)は record 個別で、約 11μs程度ばらつきありです。partition key はランダムなので 5 行は 2 shard(shardId-000000000000...02)に分散し、
PK の受信順も 3 → 2 → 4 → 5 → 1 で VALUES の記述順とも PK 順とも無関係でした。

この結果から、同一 tx 内の record をまとめて扱いたいときは txId で グループ化すればよく、
source.ts_ns は tx 単位の commit 時刻として使えます(行単位の production order には使えない)。

同一 PK を同一 tx 内で複数回 UPDATE したときの中間状態

psql の -c "..."(1 回の呼び出し = 1 トランザクション)の中に 3 つの UPDATE を入れ、
UPDATE id=1 SET v='UPDATED_1'; UPDATE id=1 SET v='UPDATED_2'; UPDATE id=1 SET v='UPDATED_3';
を実行します。最終状態 1 件なのか中間状態込みで複数件流れるのか確認してみます。

結果は1 record のみでした。(after.v="UPDATED_3" / before:null / op:"c"
同一 tx 内の同一 PK 多重 UPDATE は commit 時点の最終状態 1 件に畳まれ、
中間状態は CDC からは観測できません。

LWW(per-PK の source.ts_ns)を使うなら漏れた中間状態を追う設計は不要です。
ただし「同じ tx 内で何回更新したか」を後段で集計したい(audit / step replay)場合は
DSQL CDC 単独では足りず、アプリ側で別途対応が必要です。

並行 commit と CDC 到着順

多数の tx を同時に commit したとき、source.ts_ns が順序キーとして使えるか(衝突せず commit 順に並ぶか)を確認します。別 tx に分けた連続 UPDATE は低並行なら source.ts_ns が commit 順に単調増加しますが、これは保証ではないので、高並行でも崩れないかを以下で見ます。

検証用テーブル:

CREATE TABLE test_run_2_load (id INT PRIMARY KEY, v TEXT, ts_client_ms BIGINT);

負荷は Node + pg の connection pool で N 並列に独立した tx の INSERT を投げます。
PK は並列度ごとに別 range(100 → 0..99、500 → 200..699、1000 → 1000..1999)にして衝突を避けます。
Consumer は --filter test_run_2_ で LATEST から待機させておき、
INSERT 後 10〜25 秒待って全 record の到着を確認します。

並行 commit で source.ts_ns は順序キーとして問題ない?

並列度 ok / ng CDC 受信 record src_ts_ns ユニーク数 重複 4 shard 分布 commit p50 / p99 (client)
100 100 / 0 100 100 0 23 / 23 / 27 / 27 1765 / 2013 ms
500 500 / 0 500 500 0 137 / 125 / 120 / 118 2044 / 2300 ms
1000 1000 / 0 1000 1000 0 236 / 262 / 256 / 246 1986 / 2559 ms

1000 並列まで上げても src_ts_ns 衝突は観測されません。
さらに 1000 並列の record を src_ts_ns でソートすると2ns 間隔の連続値が並びました。

1779264828731646281
1779264828731646283
1779264828731646285
1779264828731646287
1779264828731646289
...

公式は source.ts_ns を「commit timestamp in nanoseconds」と説明していますが、
本検証範囲ではcommit 順序を反映する単調な値と見えました。
(「衝突を避けるよう間隔を空けて発行されているように見える」挙動)

なので、以下のことが言えそうです。

  • source.ts_ns は transaction 単位で全順序が取得可能(同一 tx 内の record は同値なので、record 単位の厳密な全順序ではない)
  • 公式が dedup key として推奨する (source.ts_ns, primary key) は(確認内では)衝突する組み合わせが出ない

公式推奨の dedup key (source.ts_ns, primary key) は 1000程度の並列なら有効でした。
LWW では per-PK の source.ts_ns 最大値を採用すれば commit 順を保ったまま重複を弾くことができそうです。

PK なしテーブルのケース

公式 docs は PK なしテーブルの DELETE について「beforenull」と書いています。

以下のようにテーブルを作成しましょう。

-- PK 無しテーブル
CREATE TABLE test_run_3_nopk (logical_id INT, v TEXT, marker TEXT);

このテーブルに データ(logical_idv は同じで marker だけ違う)投入。

INSERT INTO test_run_3_nopk VALUES (1,'same','left'),(1,'same','right');

↓のSQLで片方だけ消します。

DELETE FROM test_run_3_nopk WHERE logical_id=1 AND marker='left';

INSERT は 2 record(同一 tx・同一 src_ts_nsafter に全カラム)。
DELETE は 1 record ですが op:"d" / before:null / after:nullで、
source.tablesource.ts_ns は付くものの、logical_idmarker を識別できる情報は
payload にでてきません。
「どの論理行が消えたか」を知る手段がなく、同じ値の行が複数あると Consumer はレコードを特定できず、
downstream(検索 index / キャッシュ / 集約テーブル)との同期は不可能となります。

このため、CDC 対象テーブルには PRIMARY KEY を必ず付けるようにしましょう。
(どうしても PK が無い場合は、DELETE を「テーブル単位の再フェッチ」にフォールバックさせるくらいしかなさそう)

実際の DELETE payload:


{
  "type": "full",
  "op": "d",
  "before": null,
  "after": null,
  "source": {
    "version": "1.0",
    "ts_ns": "1779265893296385464",
    "txId": "uvtzcif42zbncboxlpe4hza7ni",
    "schema": "public",
    "table": "test_run_3_nopk",
    "...": "..."
  }
}

※(logical_id, v, marker)のどれも payload に出てこない

DDL 境界

ADD COLUMN 前後の payload

以下を 1 文ずつ別トランザクションで 実行します(DSQL は DDL と DML を同一 tx にできないため、まとめて 1 回の -c に渡さない)。

CREATE TABLE test_run_4_ddl (id INT PRIMARY KEY, v TEXT);
INSERT INTO test_run_4_ddl VALUES (1, 'before_add');
ALTER TABLE test_run_4_ddl ADD COLUMN extra TEXT;
INSERT INTO test_run_4_ddl VALUES (2, 'after_add', 'extra_val');

pre-ADD record は after={id:1, v:"before_add"}
post-ADD record は after={id:2, v:"after_add", extra:"extra_val"} で、
txId / src_ts_ns も別です。
公式どおり ADD COLUMN の commit timestamp を境に after の列構成がクリーンに切り替わり、record が混ざることはありませんでした。

Consumer は record ごとに after のキーを読んで現在 schema を判断します。
ただし後述の「NULL 列 omit 仕様」と組み合わさると「キーが無い = そのカラムが NULL」とも解釈できるため、
列の存在チェックだけでは ADD 前/後を区別できない点に注意です。

RENAME COLUMN 前後の payload

続けて以下SQLを順に実行します。

ALTER TABLE test_run_4_ddl RENAME COLUMN v TO new_v;
INSERT INTO test_run_4_ddl (id, new_v, extra) VALUES (3, 'after_rename', 'extra_3');

RENAME後の record は after={id:3, new_v:"after_rename", extra:"extra_3"} で、
変更前の名前であるv は消え、混在期間はありませんでした。
RENAME も DDL commit 境界でクリーンに切り替わります。

また、downstream に列名でマッピングしている場合は RENAME 前後で sink 側のマッピングを切り替える必要があり、
切替時点は source.ts_ns で判断できます。

DDL commit と別セッションの DML を時間的に重ねた時の境界

並列に DML を流しつつ途中で ALTER を入れたところ、ALTER の commit が全 DML より後ろに着地し、
src_ts_ns 順に並べれば DDL/DML の境界はクリーンに判定できました(ADD/RENAME COLUMN と同じく)。
この過程で、DSQL は「1 トランザクションに複数 DDL を置けない」ことも確認。
Setup で踏んだ「DDL + DML を同一 tx に置けない」と合わせ、
DSQL の DDL 制約は 「1 tx あたり DDL 1 文まで、DML との混在も不可」 と整理できます。

補足: NULL 列は after から omit される

NULL 列の扱いを確認します。col_4c という nullable 列を持つテーブルを用意し、
値あり / NULL 明示 / 列省略 の 3 パターンを INSERT しました。

CREATE TABLE test_run_4_null (id INT PRIMARY KEY, v TEXT, col_4c TEXT);
INSERT 文 観測された after
INSERT INTO test_run_4_null (id, v, col_4c) VALUES (1001, 'with_value', 'present'); {id:1001, v:"with_value", col_4c:"present"}
INSERT INTO test_run_4_null (id, v, col_4c) VALUES (1002, 'with_null', NULL); {id:1002, v:"with_null"}col_4c キーごと無し
INSERT INTO test_run_4_null (id, v) VALUES (1003, 'unspecified'); {id:1003, v:"unspecified"} ← 同じく無し

公式For any data type, NULL column values are represented as JSON null と明記していますが、
実機では NULL 値の列が after から omit され、"col_4c": null という JSON キーは現れませんでした。

(2026年5月時点では)CDC が preview のため、実装と docs のどちらかが GA までに変わる可能性があります。
今後どちらに寄せられるか(実装が "col": null を出すよう変わる or docs が「NULL 列は omit」と更新される)は
要確認です。

なお、Consumer 側で当面取れる対策は次の通り。

  • after に列キーが 無い = NULL と解釈する("col" in after での存在チェックを基本にする)
  • 万一 GA で "col": null が来るようになっても壊れないよう、キー無し も null 値 もどちらも NULL として扱うような ロジックにしておく

ADD COLUMN 直前の record と「ADD COLUMN 後だがその列が NULL の record」は payload 上区別できません。
(両方ともキーが無い)。
区別したい場合は source.ts_ns を ALTER の commit timestamp と比較するなどの方法を。

Consumer 設計指針

公式 には以下2つの Consumer 設計指針が示されています。

  • Last-writer-wins (LWW。後勝ち): PK ごとに source.ts_ns の最大値を持ち、それ以下の record は破棄。DELETE は tombstone(削除済みマーク) として source.ts_ns 付きで保持。(キャッシュ向け)
  • Every-change processing: (source.ts_ns, primary key) で dedup(重複排除) したあと source.ts_ns で sort してから処理。(監査ログやイベントソーシング向け)

検証から、これらをさらに以下の点で補強できます。

設計ポイント 内容
dedup key は (source.ts_ns, primary key) 1000 並列まで src_ts_ns 衝突は観測されず、公式推奨の dedup key で十分。txId は dedup の補助・tx grouping 用途に留める
同一 tx 内の record は source.ts_ns が同値 tx 単位で扱いたい処理は txId で グルーピング
同一 PK の同 tx 内多重更新は最終状態のみ LWW で漏れる中間状態は無いが、tx 内ステップ replay が必要な場合などは別途アプリ側で実装
CDC 対象テーブルには PK必須 PK なし DELETE は payload に何ものらない
schema evolution は source.ts_ns を境に切り替える Consumer は record ごとに after のキー集合と source.ts_ns の両方を見て schema を判定
NULL 列は after から omit される after にキーが無い = NULL。ADD COLUMN 前後の record を payload 構造だけで区別するのは不可能で、source.ts_ns と ALTER の commit timestamp を比較する必要あり
source.ts_ns の値域は 64-bit。Number で扱うと丸まる 今回の検証で json-bigint を使った理由。TypeScript / JavaScript の Consumer では BigInt 必須

Summary

今回確認したAurora DSQL CDC の動作は以下です。

  • preview のため将来変わる可能性あり
  • 同一トランザクション内の record は source.ts_nssource.txId を完全に共有する。Consumer は txId で tx 単位の grouping ができる
  • 同一 PK の同 tx 内多重 UPDATE は最終状態のみ流れる。LWW で漏れる中間状態は無い
  • 本検証範囲(最大 1000 並列)では source.ts_ns の衝突を観測せず、ナノ秒精度の単調値として振る舞った。公式推奨の dedup key (source.ts_ns, primary key) でOK
  • UNORDERED + random partition key で配信されるため、Kinesis の到着順は commit 順と一致しない。順序が必要なら source.ts_ns で再ソートが必要
  • PK なしテーブルの DELETE は payload に何も乗らず同期不能。(CDC を有効化するテーブルはPK必須)
  • ADD / RENAME COLUMN は commit を境に payload schema がクリーンに切り替わる。旧名と新名が混在するタイミングは観測されなかった
  • NULL 列は after から omit される("col":null ではなくキーごと無し)。公式 docs の「NULL = JSON null」記述と違う。
  • 1 トランザクションに DDL + DML、または複数 DDL を混在させることは不可

繰り返しになりますが、これらは preview の挙動として確認したものです。
GA 時の挙動と差異がある可能性があるため、実際に使用するときは
最新の公式ドキュメントと挙動を再確認してください。

Memo

検証の実測値メモ。

観測
IsImpaired (Max) 常時 0
BehindSourceLag 開始直後に max 2,648 ms のスパイク、以降ほぼ 0 ms
PublishedRecords ピーク 1,500 records/min(1000 並列)
BytesStreamed / StreamDPU 累計 ~865 KiB / 2.06
throttle 系 (Write/ReadProvisionedThroughputExceeded, PutRecords.ThrottledRecords) すべて 0(on-demand)
  • 最大 1000 並列・累計 ~2,500 record の規模では backlog もスロットルも出なかった
  • DSQL 側の BehindSourceLag と Consumer 側の GetRecords.IteratorAgeMilliseconds を別々に確認
    (前者が伸びれば CDC/Kinesis 書き込み側、後者が伸びれば Consumer 読み取り側)

メトリクス一覧は 公式1 / 公式2 を参照。

References

この記事をシェアする

関連記事