Aurora DSQL CDC を動かしてみる
Introduction
Amazon Aurora DSQL の Change Data Capture (CDC) が
2026 年 5 月に preview で公開されました。
CDC は DB の committed write を(ほぼ)リアルタイム に Amazon Kinesis Data Streams へ流す仕組みで、
以下のような用途に使えます。
- 検索 index / キャッシュ / 分析基盤などにおけるシステム同期
- event-driven なワークフロー駆動(Lambda/SQSなど)
- Producer-Consumer の疎結合化
公式ドキュメントにはペイロード仕様の詳細がまとまっているので参照してください。
本記事では、以下の内容を確認しました。
- 同一トランザクションのCDC
- 並行にcommitしたときのCDC処理
- PKなしテーブルで試してみる
- DDL 境界(ADD/RENAME COLUMN)+ NULL 列 omit の挙動
Amazon Aurora DSQL - Change Data Capture
DSQL
Aurora DSQL は、サーバレスの分散型RDBです。
PostgreSQL互換なので普通のSQLや既存ツールを使いやすく、複数リージョンにも対応しています。
本記事ではSingle-Regionで使います。
Kinesis Data Streams
マネージドストリーミングデータサービスです。
CDCに関連するところは以下。
- shard単位で record を保持する
- DSQL CDC は ランダムな partition key で record を全 shard に分散する
- ※Consumer は 全 shard を消費しないと record を漏らすので注意(公式: "consume all shards on the Kinesis data stream")
- record サイズ上限は default 1 MiB、パラメータで 10 MiB まで引き上げ可能
Change Data Capture
DB の committed change を near real-time に外部へ流す仕組みです。
Aurora DSQL の CDC は managed&bring-your-own-target(BYOT)モデルです。
※現状ターゲットは Kinesis Data Streams のみ
CDC payload
公式ドキュメント からpayloadのおさらい。
envelope 構造
検証で実際に届いたレコードは以下。
{
"type": "full",
"op": "c",
"before": null,
"after": { "id": 1, "v": "hello-cdc" },
"source": {
"version": "1.0",
"ts_ms": 1779259861157,
"ts_ns": 1779259861157725700,
"txId": "ubtzccn2ehtmtplzjbhyavlkrm",
"schema": "public",
"table": "smoke_test",
"db": "postgres",
"cluster": "..."
},
"ts_ms": 1779259861153,
"ts_ns": 1779259861153444900
}
主要フィールド
| field | 公式の意味 |
|---|---|
type |
full / chunked / fragment |
op |
INSERT/UPDATE は現状 c、DELETE は d。将来 UPDATE は u に分離予定 |
source.txId |
同一 tx 内の全 record で 共有(grouping 用) |
source.ts_ns |
commit timestamp。全順序キーとして公式が推奨 |
source.cluster |
DSQL cluster identifier |
before |
DELETE で PK のみ。INSERT/UPDATE / PK なしテーブルの DELETE では null |
after |
公式上は post-change row state の全カラム。DELETE では null |
ts_ms / ts_ns |
CDC system が record を処理した時刻 |
afterフィールドですが、検証時は NULL 列が omit される挙動を確認しました。
delivery / ordering
- at-least-once。同じ record が複数回届く可能性あり
- 公式が推奨する dedup key =
(source.ts_ns, primary key) - 9 MiB 超のレコードは
chunked+fragmentに分割。chunk_idで再構成、crc32cで整合性検証可 - ordering モードは
UNORDEREDで random partition key 配信 → 全 shard 消費が必須 - 異なる tx 間で record の順序は前後し得る
Environment
| 項目 | 値 |
|---|---|
| AWS Region | us-east-1 |
| DSQL cluster | Single-Region |
| Kinesis Data Streams | on-demand mode / max record size = 10240 KiB |
| psql | libpq 18.4 |
| Node.js | v20 (Consumer は @aws-sdk/client-kinesis 3.1050 ) |
| Aurora DSQL CDC | Preview(検証時点ではCDC が preview) |
Setup
ではDSQL CDCを試してみます。
1. DSQL クラスター作成
まずは DSQL クラスターを作成します。
% aws dsql create-cluster \
--region us-east-1 \
--no-deletion-protection-enabled \
--tags Project=dsql-cdc-verify,Purpose=blog-article-verification
# → identifier と endpoint 取得
% aws dsql wait cluster-active \
--identifier "$CLUSTER_ID" --region us-east-1
2. Kinesis Data Streams (on-demand)
次にKinesisを作成。
% aws kinesis create-stream \
--stream-name dsql-cdc-verify-kinesis \
--stream-mode-details StreamMode=ON_DEMAND \
--max-record-size-in-ki-b 10240 \
--tags Project=dsql-cdc-verify \
--region us-east-1
--max-record-size-in-ki-b 10240(10 MiB)を明示し、
DSQL CDC の chunked record 閾値(9 MiB)より上にする。
3. CDC service role (IAM)
DSQL が Kinesis に書き込むための role を作成します。
Trust policy / permission policy の正確な JSON 雛形はドキュメント にそのまま使える形で載っているので、
そちらをコピーして trust-policy.json / inline-policy.json として保存しましょう。
要点は以下。
- Trust policy:
Service: dsql.amazonaws.comにsts:AssumeRoleを許可- confused deputy 対策で
aws:SourceAccountとaws:SourceArn(cluster/<CLUSTER_ID>/stream/*) を付与
- Permission policy:
kinesis:PutRecord/PutRecords/DescribeStreamSummary/ListShardsをKinesis stream ARN に許可- Kinesis stream が KMS CMK で暗号化されている場合は
kms:GenerateDataKeyも追加
% aws iam create-role \
--role-name dsql-cdc-verify-role \
--assume-role-policy-document file://trust-policy.json \
--tags Key=Project,Value=dsql-cdc-verify
% aws iam put-role-policy \
--role-name dsql-cdc-verify-role \
--policy-name KinesisAccess \
--policy-document file://inline-policy.json
4. CDC stream
% aws dsql create-stream \
--cluster-identifier "$CLUSTER_ID" \
--target-definition "kinesis={streamArn=$KINESIS_STREAM_ARN,roleArn=$CDC_ROLE_ARN}" \
--ordering UNORDERED \
--format JSON \
--tags Project=dsql-cdc-verify \
--region us-east-1
# ACTIVE になるのを待ちましょう
% aws dsql get-stream \
--cluster-identifier "$CLUSTER_ID" \
--stream-identifier "$STREAM_ID" \
--region us-east-1
preview 時点で --ordering は UNORDERED のみ、
--format は JSON のみが選択可能。
5. DSQL へ接続(psql + IAM auth)
DSQL は IAM token を PostgreSQL パスワードとして渡す方式です。
# 都度トークンを取り直して psql 実行
dsql_psql() {
PGPASSWORD="$(aws dsql generate-db-connect-admin-auth-token \
--hostname "$CLUSTER_ENDPOINT" --region us-east-1)" \
psql -h "$CLUSTER_ENDPOINT" -U admin -d postgres "$@"
}
# 使い方
% dsql_psql -c "SELECT now();"
% dsql_psql -f some-script.sql
psql の -c "DDL; DML;" は複数文を 1トランザクションで投げます。
DSQL は DDL と DML を同一 transaction にできないため、
CREATE TABLE foo(...); INSERT INTO foo ...; を -c で一度に実行しようとするとエラーになります。
ERROR: ddl and dml are not supported in the same transaction
DETAIL: ddl count: 1, dml count: 1
同じ -c 文字列に DDL と DML を混ぜないようにしましょう。
6. ローカル Consumer
Consumer は Node で作成します。
依存は @aws-sdk/client-kinesis を使用。
ListShards → GetShardIterator → GetRecords を全 shard で並列にポーリングして、
1 record = 1 行の JSON で標準出力に流します。
ログには以下のフィールドを含めます。
| フィールド | 内容 |
|---|---|
receivedAt |
Consumer が GetRecords で受信した時刻 |
arrival |
Kinesis ApproximateArrivalTimestamp |
shardId / seq |
shard 内 sequence number |
type / op / table |
CDC envelope |
txId / src_ts_ns |
transaction identifier / commit timestamp |
cdc_ts_ns |
CDC system が record を処理した時刻 |
pk |
after(INSERT/UPDATE)または before(DELETE)から抽出 |
raw |
元の JSON payload 全体 |
検証ごとに test_run_<n>_<sub>_ のような table prefix を使い、
Consumer 側に --filter <prefix> 引数を入れて他検証の record と混ざらないようにします。
7. Cleanup
検証完了時は忘れずに削除。
# 1. CDC stream を削除
% aws dsql delete-stream \
--cluster-identifier "$CLUSTER_ID" \
--stream-identifier "$STREAM_ID" \
--region us-east-1
# 消えたのを確認
while aws dsql get-stream \
--cluster-identifier "$CLUSTER_ID" \
--stream-identifier "$STREAM_ID" \
--region us-east-1 >/dev/null 2>&1; do
sleep 3
done
# 2. CDC stream が完全に消えてから Kinesis を消す
% aws kinesis delete-stream \
--stream-name dsql-cdc-verify-kinesis \
--region us-east-1
% aws kinesis wait stream-not-exists \
--stream-name dsql-cdc-verify-kinesis \
--region us-east-1
# 3. cluster と IAM を削除
% aws dsql delete-cluster \
--identifier "$CLUSTER_ID" --region us-east-1
% aws iam delete-role-policy \
--role-name dsql-cdc-verify-role --policy-name KinesisAccess
% aws iam delete-role --role-name dsql-cdc-verify-role
Try
では実際に確認してみましょう。
同一トランザクションの CDC 粒度
1 つの tx を commit すると CDC に何件の record が流れ、
それらのフィールド(txId / source.ts_ns)がどのようになっているか確認します。
以下テーブルで確認。
CREATE TABLE test_run_1_kv (id INT PRIMARY KEY, v TEXT);
同一 tx 内の複数行で source.ts_ns は同一値か
INSERT INTO test_run_1_kv VALUES (1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e');
を 1 文(=1 tx)で流します。
txId が 5 record で共有されるのは公式明記どおり。
source.ts_ns も「commit timestamp」なら同一値になるはずなので確認してみましょう。
結果、5 record すべて
txId=grtzcfcecctfoso4t6cklkhxwm と src_ts_ns=1779262623863649300 が一致しました。
一方 top-level の cdc_ts_ns(CDC system の処理時刻)は record 個別で、約 11μs程度ばらつきありです。partition key はランダムなので 5 行は 2 shard(shardId-000000000000 と ...02)に分散し、
PK の受信順も 3 → 2 → 4 → 5 → 1 で VALUES の記述順とも PK 順とも無関係でした。
この結果から、同一 tx 内の record をまとめて扱いたいときは txId で グループ化すればよく、
source.ts_ns は tx 単位の commit 時刻として使えます(行単位の production order には使えない)。
同一 PK を同一 tx 内で複数回 UPDATE したときの中間状態
psql の -c "..."(1 回の呼び出し = 1 トランザクション)の中に 3 つの UPDATE を入れ、
UPDATE id=1 SET v='UPDATED_1'; UPDATE id=1 SET v='UPDATED_2'; UPDATE id=1 SET v='UPDATED_3';
を実行します。最終状態 1 件なのか中間状態込みで複数件流れるのか確認してみます。
結果は1 record のみでした。(after.v="UPDATED_3" / before:null / op:"c")
同一 tx 内の同一 PK 多重 UPDATE は commit 時点の最終状態 1 件に畳まれ、
中間状態は CDC からは観測できません。
LWW(per-PK の source.ts_ns)を使うなら漏れた中間状態を追う設計は不要です。
ただし「同じ tx 内で何回更新したか」を後段で集計したい(audit / step replay)場合は
DSQL CDC 単独では足りず、アプリ側で別途対応が必要です。
並行 commit と CDC 到着順
多数の tx を同時に commit したとき、source.ts_ns が順序キーとして使えるか(衝突せず commit 順に並ぶか)を確認します。別 tx に分けた連続 UPDATE は低並行なら source.ts_ns が commit 順に単調増加しますが、これは保証ではないので、高並行でも崩れないかを以下で見ます。
検証用テーブル:
CREATE TABLE test_run_2_load (id INT PRIMARY KEY, v TEXT, ts_client_ms BIGINT);
負荷は Node + pg の connection pool で N 並列に独立した tx の INSERT を投げます。
PK は並列度ごとに別 range(100 → 0..99、500 → 200..699、1000 → 1000..1999)にして衝突を避けます。
Consumer は --filter test_run_2_ で LATEST から待機させておき、
INSERT 後 10〜25 秒待って全 record の到着を確認します。
並行 commit で source.ts_ns は順序キーとして問題ない?
| 並列度 | ok / ng | CDC 受信 record | src_ts_ns ユニーク数 |
重複 | 4 shard 分布 | commit p50 / p99 (client) |
|---|---|---|---|---|---|---|
| 100 | 100 / 0 | 100 | 100 | 0 | 23 / 23 / 27 / 27 | 1765 / 2013 ms |
| 500 | 500 / 0 | 500 | 500 | 0 | 137 / 125 / 120 / 118 | 2044 / 2300 ms |
| 1000 | 1000 / 0 | 1000 | 1000 | 0 | 236 / 262 / 256 / 246 | 1986 / 2559 ms |
1000 並列まで上げても src_ts_ns 衝突は観測されません。
さらに 1000 並列の record を src_ts_ns でソートすると2ns 間隔の連続値が並びました。
1779264828731646281
1779264828731646283
1779264828731646285
1779264828731646287
1779264828731646289
...
公式は source.ts_ns を「commit timestamp in nanoseconds」と説明していますが、
本検証範囲ではcommit 順序を反映する単調な値と見えました。
(「衝突を避けるよう間隔を空けて発行されているように見える」挙動)
なので、以下のことが言えそうです。
source.ts_nsは transaction 単位で全順序が取得可能(同一 tx 内の record は同値なので、record 単位の厳密な全順序ではない)- 公式が dedup key として推奨する
(source.ts_ns, primary key)は(確認内では)衝突する組み合わせが出ない
公式推奨の dedup key (source.ts_ns, primary key) は 1000程度の並列なら有効でした。
LWW では per-PK の source.ts_ns 最大値を採用すれば commit 順を保ったまま重複を弾くことができそうです。
PK なしテーブルのケース
公式 docs は PK なしテーブルの DELETE について「before も null」と書いています。
以下のようにテーブルを作成しましょう。
-- PK 無しテーブル
CREATE TABLE test_run_3_nopk (logical_id INT, v TEXT, marker TEXT);
このテーブルに データ(logical_id・v は同じで marker だけ違う)投入。
INSERT INTO test_run_3_nopk VALUES (1,'same','left'),(1,'same','right');
↓のSQLで片方だけ消します。
DELETE FROM test_run_3_nopk WHERE logical_id=1 AND marker='left';
INSERT は 2 record(同一 tx・同一 src_ts_ns、after に全カラム)。
DELETE は 1 record ですが op:"d" / before:null / after:nullで、
source.table と source.ts_ns は付くものの、logical_id や marker を識別できる情報は
payload にでてきません。
「どの論理行が消えたか」を知る手段がなく、同じ値の行が複数あると Consumer はレコードを特定できず、
downstream(検索 index / キャッシュ / 集約テーブル)との同期は不可能となります。
このため、CDC 対象テーブルには PRIMARY KEY を必ず付けるようにしましょう。
(どうしても PK が無い場合は、DELETE を「テーブル単位の再フェッチ」にフォールバックさせるくらいしかなさそう)
実際の DELETE payload:
{
"type": "full",
"op": "d",
"before": null,
"after": null,
"source": {
"version": "1.0",
"ts_ns": "1779265893296385464",
"txId": "uvtzcif42zbncboxlpe4hza7ni",
"schema": "public",
"table": "test_run_3_nopk",
"...": "..."
}
}
※(logical_id, v, marker)のどれも payload に出てこない
DDL 境界
ADD COLUMN 前後の payload
以下を 1 文ずつ別トランザクションで 実行します(DSQL は DDL と DML を同一 tx にできないため、まとめて 1 回の -c に渡さない)。
CREATE TABLE test_run_4_ddl (id INT PRIMARY KEY, v TEXT);
INSERT INTO test_run_4_ddl VALUES (1, 'before_add');
ALTER TABLE test_run_4_ddl ADD COLUMN extra TEXT;
INSERT INTO test_run_4_ddl VALUES (2, 'after_add', 'extra_val');
pre-ADD record は after={id:1, v:"before_add"}、
post-ADD record は after={id:2, v:"after_add", extra:"extra_val"} で、
txId / src_ts_ns も別です。
公式どおり ADD COLUMN の commit timestamp を境に after の列構成がクリーンに切り替わり、record が混ざることはありませんでした。
Consumer は record ごとに after のキーを読んで現在 schema を判断します。
ただし後述の「NULL 列 omit 仕様」と組み合わさると「キーが無い = そのカラムが NULL」とも解釈できるため、
列の存在チェックだけでは ADD 前/後を区別できない点に注意です。
RENAME COLUMN 前後の payload
続けて以下SQLを順に実行します。
ALTER TABLE test_run_4_ddl RENAME COLUMN v TO new_v;
INSERT INTO test_run_4_ddl (id, new_v, extra) VALUES (3, 'after_rename', 'extra_3');
RENAME後の record は after={id:3, new_v:"after_rename", extra:"extra_3"} で、
変更前の名前であるv は消え、混在期間はありませんでした。
RENAME も DDL commit 境界でクリーンに切り替わります。
また、downstream に列名でマッピングしている場合は RENAME 前後で sink 側のマッピングを切り替える必要があり、
切替時点は source.ts_ns で判断できます。
DDL commit と別セッションの DML を時間的に重ねた時の境界
並列に DML を流しつつ途中で ALTER を入れたところ、ALTER の commit が全 DML より後ろに着地し、
src_ts_ns 順に並べれば DDL/DML の境界はクリーンに判定できました(ADD/RENAME COLUMN と同じく)。
この過程で、DSQL は「1 トランザクションに複数 DDL を置けない」ことも確認。
Setup で踏んだ「DDL + DML を同一 tx に置けない」と合わせ、
DSQL の DDL 制約は 「1 tx あたり DDL 1 文まで、DML との混在も不可」 と整理できます。
補足: NULL 列は after から omit される
NULL 列の扱いを確認します。col_4c という nullable 列を持つテーブルを用意し、
値あり / NULL 明示 / 列省略 の 3 パターンを INSERT しました。
CREATE TABLE test_run_4_null (id INT PRIMARY KEY, v TEXT, col_4c TEXT);
| INSERT 文 | 観測された after |
|---|---|
INSERT INTO test_run_4_null (id, v, col_4c) VALUES (1001, 'with_value', 'present'); |
{id:1001, v:"with_value", col_4c:"present"} |
INSERT INTO test_run_4_null (id, v, col_4c) VALUES (1002, 'with_null', NULL); |
{id:1002, v:"with_null"} ← col_4c キーごと無し |
INSERT INTO test_run_4_null (id, v) VALUES (1003, 'unspecified'); |
{id:1003, v:"unspecified"} ← 同じく無し |
公式 は For any data type, NULL column values are represented as JSON null と明記していますが、
実機では NULL 値の列が after から omit され、"col_4c": null という JSON キーは現れませんでした。
(2026年5月時点では)CDC が preview のため、実装と docs のどちらかが GA までに変わる可能性があります。
今後どちらに寄せられるか(実装が "col": null を出すよう変わる or docs が「NULL 列は omit」と更新される)は
要確認です。
なお、Consumer 側で当面取れる対策は次の通り。
afterに列キーが 無い = NULL と解釈する("col" in afterでの存在チェックを基本にする)- 万一 GA で
"col": nullが来るようになっても壊れないよう、キー無し もnull値 もどちらも NULL として扱うような ロジックにしておく
ADD COLUMN 直前の record と「ADD COLUMN 後だがその列が NULL の record」は payload 上区別できません。
(両方ともキーが無い)。
区別したい場合は source.ts_ns を ALTER の commit timestamp と比較するなどの方法を。
Consumer 設計指針
公式 には以下2つの Consumer 設計指針が示されています。
- Last-writer-wins (LWW。後勝ち): PK ごとに
source.ts_nsの最大値を持ち、それ以下の record は破棄。DELETE は tombstone(削除済みマーク) としてsource.ts_ns付きで保持。(キャッシュ向け) - Every-change processing:
(source.ts_ns, primary key)で dedup(重複排除) したあとsource.ts_nsで sort してから処理。(監査ログやイベントソーシング向け)
検証から、これらをさらに以下の点で補強できます。
| 設計ポイント | 内容 |
|---|---|
dedup key は (source.ts_ns, primary key) |
1000 並列まで src_ts_ns 衝突は観測されず、公式推奨の dedup key で十分。txId は dedup の補助・tx grouping 用途に留める |
同一 tx 内の record は source.ts_ns が同値 |
tx 単位で扱いたい処理は txId で グルーピング |
| 同一 PK の同 tx 内多重更新は最終状態のみ | LWW で漏れる中間状態は無いが、tx 内ステップ replay が必要な場合などは別途アプリ側で実装 |
| CDC 対象テーブルには PK必須 | PK なし DELETE は payload に何ものらない |
schema evolution は source.ts_ns を境に切り替える |
Consumer は record ごとに after のキー集合と source.ts_ns の両方を見て schema を判定 |
NULL 列は after から omit される |
after にキーが無い = NULL。ADD COLUMN 前後の record を payload 構造だけで区別するのは不可能で、source.ts_ns と ALTER の commit timestamp を比較する必要あり |
source.ts_ns の値域は 64-bit。Number で扱うと丸まる |
今回の検証で json-bigint を使った理由。TypeScript / JavaScript の Consumer では BigInt 必須 |
Summary
今回確認したAurora DSQL CDC の動作は以下です。
- preview のため将来変わる可能性あり
- 同一トランザクション内の record は
source.ts_nsとsource.txIdを完全に共有する。Consumer はtxIdで tx 単位の grouping ができる - 同一 PK の同 tx 内多重 UPDATE は最終状態のみ流れる。LWW で漏れる中間状態は無い
- 本検証範囲(最大 1000 並列)では
source.ts_nsの衝突を観測せず、ナノ秒精度の単調値として振る舞った。公式推奨の dedup key(source.ts_ns, primary key)でOK - UNORDERED + random partition key で配信されるため、Kinesis の到着順は commit 順と一致しない。順序が必要なら
source.ts_nsで再ソートが必要 - PK なしテーブルの DELETE は payload に何も乗らず同期不能。(CDC を有効化するテーブルはPK必須)
- ADD / RENAME COLUMN は commit を境に payload schema がクリーンに切り替わる。旧名と新名が混在するタイミングは観測されなかった
- NULL 列は
afterから omit される("col":nullではなくキーごと無し)。公式 docs の「NULL = JSON null」記述と違う。 - 1 トランザクションに DDL + DML、または複数 DDL を混在させることは不可
繰り返しになりますが、これらは preview の挙動として確認したものです。
GA 時の挙動と差異がある可能性があるため、実際に使用するときは
最新の公式ドキュメントと挙動を再確認してください。
Memo
検証の実測値メモ。
| 観測 | 値 |
|---|---|
IsImpaired (Max) |
常時 0 |
BehindSourceLag |
開始直後に max 2,648 ms のスパイク、以降ほぼ 0 ms |
PublishedRecords ピーク |
1,500 records/min(1000 並列) |
BytesStreamed / StreamDPU 累計 |
~865 KiB / 2.06 |
throttle 系 (Write/ReadProvisionedThroughputExceeded, PutRecords.ThrottledRecords) |
すべて 0(on-demand) |
- 最大 1000 並列・累計 ~2,500 record の規模では backlog もスロットルも出なかった
- DSQL 側の
BehindSourceLagと Consumer 側のGetRecords.IteratorAgeMillisecondsを別々に確認
(前者が伸びれば CDC/Kinesis 書き込み側、後者が伸びれば Consumer 読み取り側)
References
- AWS What's New — Amazon Aurora DSQL now supports change data capture (Preview)
- Aurora DSQL CDC streams
- Understanding CDC records
- Getting started with CDC streams
- Configuring IAM for CDC
- Monitoring streams
- ALTER TABLE syntax support
- Migrating from PostgreSQL to Aurora DSQL
- AWS Database Blog — Getting started with Change Data Capture in Amazon Aurora DSQL
- Monitoring Amazon Kinesis Data Streams with CloudWatch








