CDKでやってみた版はこちらをご参照ください。
はじめに
DynamoDBのクエリ柔軟性を補うために、OpenSearch Serviceの検索機能を利用するパターンは知られていましたが、以前はそのアーキテクチャのためにLambdaなどを用意する必要がありました。(参考1, 参考2)
参考1:
参考2:
今回のアップデートにより、Lambdaなどのグルーコードを用いずに、DynamoDBのテーブルをOpenSearch Serviceに同期することが可能になりました。
まずはDynamoDBを準備
以下のブログにあるCDKでDynamoDBを用意します。
やってみる
Ingestion Pipelineのコンソールを開く
DynamoDBのコンソールのサイドメニューに「Integrations」が増えているので、
Integrations
> 任意のテーブル > Create
の順でクリックしていきます。
以下のような画面が出てくるので、まずはロールなど、必要なリソースを作成します。 画面の手順に書かれている内容を確認してみましょう。
1: DynamoDBテーブルを選択する
OpenSearchのindexにレプリケートするDynamoDBテーブルを作成または選択します。PITR(ポイントインタイムリカバリ)とDynamoDBストリームは、あらかじめ有効にしておく必要があります。
こちらはすでに作成されています。また用意に用いたCDKの中でPITRとStreamの有効化も完了しています。
2: パーミッションの設定
pipelineがテーブルとそのstreamにアクセスするためのIAMロールと、テーブルのフルエクスポートに使用するAmazon S3バケットを作成します。
以下のドキュメントに従って、IAMロールを作成します。
参考3:
参考4:
3: OpenSearchの保存先を選択する
OpenSearchの保存先を選択します。デフォルトでは、テーブルをもとに名付けられたインデックスに、OpenSearchはDynamoDBのアイテムを動的にマッピングします。
この手順では、表示されているIngestion Pipelineのコンソールに対して入力していきます。 名前などは適当に入力していきますが、yamlを入力する箇所があるので、これについては以下のように入力する必要があります。
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
# 必須: 用意したDynamoDBテーブルのARNをします。おそらく既に入力されているので、そのままでOKです。
- table_arn: <<dynamodbのARNを入力します>>
# DynamoDB Streamsを用いてニアリアルタイムでデータを統合する場合は、以下のように設定します。
# 不要である場合は行ごと削除してください。
stream:
start_position: "LATEST"
# 既存データをまるごと統合する場合には、以下のように設定します。
# 不要である場合は行ごと削除してください。
export:
# 必須: DynamoDBのデータをエクスポートするs3 bucketを指定します。入力する内容はbucket名です。(ARNではありません)
s3_bucket: "<<my-bucket>>"
# 必須:バケットのリージョンを指定します。
s3_region: "<<us-east-1>>"
# 任意: DynamoDBからS3バケットにデータをエクスポートする際のプレフィックスを指定します。
s3_prefix: "ddb-to-opensearch-export/"
aws:
# 必須: 上記の手順2で作成したIAMロールのARNを指定します。
sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
# pipelineこのロールを使用する際のクレデンシャルのregionを指定します。pipelineのリージョンと揃えておけば大丈夫です。
region: "ap-northeast-1"
sink:
- opensearch:
# 必須: AWS OpenSearch endpointを指定します。
hosts:
[
`https`から始まるAWS OpenSearch endpointを指定します。 OpenSearchドメインのwebコンソールで手に入ります。
]
index: "table-index"
index_type: custom
document_id: "${getMetadata(\"primary_key\")}"
action: "${getMetadata(\"opensearch_action\")}"
document_version: "${getMetadata(\"document_version\")}"
document_version_type: "external"
aws:
# 必須: 上記の手順2で作成したIAMロールのARNを指定します。
sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
# OpenSearchドメインのリージョンを指定します。
region: "<<us-east-1>>"
# 以下の機能は筆者がまだ試せていません。追ってブログ化したいと考えています。。。
# Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
# serverless: true
# serverless_options:
# Specify a name here to create or update network policy for the serverless collection
# network_policy_name: "network-policy-name"
# Enable the S3 DLQ to capture any failed requests in an S3 bucket. This is recommended as a best practice for all pipelines.
# dlq:
# s3:
# Provide an S3 bucket
# bucket: "your-dlq-bucket-name"
# Provide a key path prefix for the failed requests
# key_path_prefix: "dynamodb-pipeline/dlq"
# Provide the region of the bucket.
# region: "us-east-1"
# Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
# sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
以上を設定して、Next
> Create Pipeline
の順でクリックします。
動作確認してみる
pipelineがActiveになっても、すぐにはデータ連携されないようです。 体感ですが10数分ほど待つと、データが連携が完了しました。データが多いほど時間がかかると思います。
既存データを確認する
まずは既存のデータが入力されていることを確認してみます。 OpenSearchのデータの確認にはOpenSearch Cliを利用します。 OpenSearch Dashboardが整備できている場合はそちらで確認するのがスムーズだと思います。
> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty
{
"_index" : "table-index-1",
"_id" : "205",
"_version" : 1701916967207000,
"_seq_no" : 2,
"_primary_term" : 1,
"found" : true,
"_source" : {
"Id" : 205,
"Title" : "18-Bike-204",
"Price" : 500.0,
"Brand" : "Brand-Company C",
"Description" : "205 Description",
"Color" : [
"Red",
"Black"
],
"ProductCategory" : "Bicycle",
"BicycleType" : "Hybrid"
}
}
データが確認できました!
既存データを削除してみる
DynamoDBのデータの操作にはdyneinを利用します。
> dy del -t ProductCatalog 205
Successfully deleted an item from the table 'ProductCatalog'.
> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty
{
"_id": "205",
"_index": "table-index-2",
"found": false
}
データが削除されていることが確認できました! この連携は数秒で完了しているようです。連携時間にどれほどのばらつきがあるかは未検証です。
新規データを追加してみる
先程削除したデータを再投入してみます。
> dy put -t ProductCatalog 205 -i '{
"Brand": "Brand-Company C",
"ProductCategory": "Bicycle",
"Description": "205 Description",
"Price": 500,
"Id": 205,
"BicycleType": "Hybrid",
"Color": [
"Red",
"Black"
],
"Title": "18-Bike-204"
}'
Successfully put an item to the table 'ProductCatalog'.
> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty
{
"_index" : "table-index-2",
"_id" : "205",
"_version" : 1701919401000000,
"_seq_no" : 4,
"_primary_term" : 1,
"found" : true,
"_source" : {
"Brand" : "Brand-Company C",
"Description" : "205 Description",
"Price" : 500,
"Color" : [
"Black",
"Red"
],
"ProductCategory" : "Bicycle",
"Title" : "18-Bike-204",
"Id" : 205,
"BicycleType" : "Hybrid"
}
}
データが追加されていることが確認できました!
まとめ
以上の通り、既存データも新規操作もOpenSearchに連携されることが確認できました。
OpenSearchはとても多機能な検索エンジンなので、この機にいろいろ触ってみてはいかがでしょうか。(筆者はいろいろ勉強が必要であると痛感しています。。。)
以上でした!
余談: 設定したyamlは何者?
今回の設定に使用したyamlの仕様はどこに書いてあるのかというと、OSSとしてのOpenSearchのドキュメントに書いてありました。
OSSのOpenSearchの機能の中ではData Prepperと呼ばれる機能で、DynamoDB連携を実現する前から様々なデータソースからOpenSearchへとデータを統合する機能として存在していたようです。
yamlの書き方について不明な点がある場合は、上記のドキュメントを参照すると良いと思います。