[AWS] Amazon DocumentDB の変更ストリームを試す

2020.01.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、菊池です。

少し前ですが、Amazon DocumentDB(MongoDB互換)で変更ストリームのサポートが追加されていました。

DocumentDBに対するデータの更新をキャッチすることで、それをトリガーとする処理を実装可能になります。

変更ストリーム

DocumentDBおよびMongoDBのドキュメントを参考に試してみます。

まずはDocumentDBをサクッと起動して、mongoshellで接続します。

$ mongo --host docdb.xxxxxxxxxxxxx.ap-northeast-1.docdb.amazonaws.com:27017 --username skikuchi --password

注意点としては、変更ストリームを取得できるのはPrimaryノードのみということです。DocumentDBの書き込みエンドポイントを使って接続します。

testdbにDBを移動し、modifyChangeStreamsで変更ストリームを有効化します。

rs0:PRIMARY> use testdb
switched to db testdb
rs0:PRIMARY> db.adminCommand({modifyChangeStreams: 1,
...     database: "testdb",
...     collection: "",
...     enable: true});
{ "ok" : 1 }

変更ストリームを取得するためには、db.collection.watch()メソッドを使用します。

rs0:PRIMARY> watchCursor = db.coll.watch()
rs0:PRIMARY> while (!watchCursor.isExhausted()){
...    if (watchCursor.hasNext()){
...       printjson(watchCursor.next());
...    }
... }

これで変更ストリームを監視している状態となりましたので、別のセッションでDBに接続し、ドキュメントをインサートします。

rs0:PRIMARY> db.coll.insert({'y': 1})
WriteResult({ "nInserted" : 1 })

すると、 変更ストリームとして以下のように取得されました。

{
	"_id" : {
		"_data" : "015e33cda40000000601000000060000500b"
	},
	"operationType" : "insert",
	"clusterTime" : Timestamp(1580453284, 6),
	"ns" : {
		"db" : "testdb",
		"coll" : "coll"
	},
	"documentKey" : {
		"_id" : ObjectId("5e33cda48a335b7a8fc8119a")
	},
	"fullDocument" : {
		"_id" : ObjectId("5e33cda48a335b7a8fc8119a"),
		"y" : 1
	}
}

まとめ

DocumentDBの変更ストリームを試してみました。変更ストリームを監視することで、別のデータストアに同期して分析や検索などの用途に使ったり、後続のデータ処理を実行することが容易になるでしょう。

他にも、以下のre:Inventセッションにてユースケースが紹介されています。