Amazon DynamoDB zero-ETL integration with Amazon OpenSearch Service を試してみた

Amazon DynamoDB と Amazon OpenSearch Service のゼロ ETL 統合が利用可能になったので試してみました。
2023.12.07

CDKでやってみた版はこちらをご参照ください。

はじめに

DynamoDBのクエリ柔軟性を補うために、OpenSearch Serviceの検索機能を利用するパターンは知られていましたが、以前はそのアーキテクチャのためにLambdaなどを用意する必要がありました。(参考1, 参考2)

参考1:

参考2:

今回のアップデートにより、Lambdaなどのグルーコードを用いずに、DynamoDBのテーブルをOpenSearch Serviceに同期することが可能になりました。

まずはDynamoDBを準備

以下のブログにあるCDKでDynamoDBを用意します。

やってみる

Ingestion Pipelineのコンソールを開く

DynamoDBのコンソールのサイドメニューに「Integrations」が増えているので、

Integrations > 任意のテーブル > Create

の順でクリックしていきます。

以下のような画面が出てくるので、まずはロールなど、必要なリソースを作成します。 画面の手順に書かれている内容を確認してみましょう。

1: DynamoDBテーブルを選択する

OpenSearchのindexにレプリケートするDynamoDBテーブルを作成または選択します。PITR(ポイントインタイムリカバリ)とDynamoDBストリームは、あらかじめ有効にしておく必要があります。

こちらはすでに作成されています。また用意に用いたCDKの中でPITRとStreamの有効化も完了しています。

2: パーミッションの設定

pipelineがテーブルとそのstreamにアクセスするためのIAMロールと、テーブルのフルエクスポートに使用するAmazon S3バケットを作成します。

以下のドキュメントに従って、IAMロールを作成します。

参考3:

参考4:

3: OpenSearchの保存先を選択する

OpenSearchの保存先を選択します。デフォルトでは、テーブルをもとに名付けられたインデックスに、OpenSearchはDynamoDBのアイテムを動的にマッピングします。

この手順では、表示されているIngestion Pipelineのコンソールに対して入力していきます。 名前などは適当に入力していきますが、yamlを入力する箇所があるので、これについては以下のように入力する必要があります。

version: "2"
dynamodb-pipeline:
  source:
    dynamodb:
      acknowledgments: true
      tables:
        # 必須: 用意したDynamoDBテーブルのARNをします。おそらく既に入力されているので、そのままでOKです。
        - table_arn: <<dynamodbのARNを入力します>>
          # DynamoDB Streamsを用いてニアリアルタイムでデータを統合する場合は、以下のように設定します。
          # 不要である場合は行ごと削除してください。
          stream:
            start_position: "LATEST"
          # 既存データをまるごと統合する場合には、以下のように設定します。
          # 不要である場合は行ごと削除してください。
          export:
            # 必須: DynamoDBのデータをエクスポートするs3 bucketを指定します。入力する内容はbucket名です。(ARNではありません)
            s3_bucket: "<<my-bucket>>"
            # 必須:バケットのリージョンを指定します。
            s3_region: "<<us-east-1>>"
            # 任意: DynamoDBからS3バケットにデータをエクスポートする際のプレフィックスを指定します。
            s3_prefix: "ddb-to-opensearch-export/"
      aws:
        # 必須: 上記の手順2で作成したIAMロールのARNを指定します。
        sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
        # pipelineこのロールを使用する際のクレデンシャルのregionを指定します。pipelineのリージョンと揃えておけば大丈夫です。
        region: "ap-northeast-1"
  sink:
    - opensearch:
        # 必須: AWS OpenSearch endpointを指定します。
        hosts:
          [
            `https`から始まるAWS OpenSearch endpointを指定します。 OpenSearchドメインのwebコンソールで手に入ります。
          ]
        index: "table-index"
        index_type: custom
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          # 必須: 上記の手順2で作成したIAMロールのARNを指定します。
          sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
          # OpenSearchドメインのリージョンを指定します。
          region: "<<us-east-1>>"

          # 以下の機能は筆者がまだ試せていません。追ってブログ化したいと考えています。。。
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          # serverless: true
          # serverless_options:
          # Specify a name here to create or update network policy for the serverless collection
          # network_policy_name: "network-policy-name"
          # Enable the S3 DLQ to capture any failed requests in an S3 bucket. This is recommended as a best practice for all pipelines.
          # dlq:
          # s3:
          # Provide an S3 bucket
          # bucket: "your-dlq-bucket-name"
          # Provide a key path prefix for the failed requests
          # key_path_prefix: "dynamodb-pipeline/dlq"
          # Provide the region of the bucket.
          # region: "us-east-1"
          # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"

以上を設定して、Next > Create Pipelineの順でクリックします。

動作確認してみる

pipelineがActiveになっても、すぐにはデータ連携されないようです。 体感ですが10数分ほど待つと、データが連携が完了しました。データが多いほど時間がかかると思います。

既存データを確認する

まずは既存のデータが入力されていることを確認してみます。 OpenSearchのデータの確認にはOpenSearch Cliを利用します。 OpenSearch Dashboardが整備できている場合はそちらで確認するのがスムーズだと思います。

> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty
{
  "_index" : "table-index-1",
  "_id" : "205",
  "_version" : 1701916967207000,
  "_seq_no" : 2,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "Id" : 205,
    "Title" : "18-Bike-204",
    "Price" : 500.0,
    "Brand" : "Brand-Company C",
    "Description" : "205 Description",
    "Color" : [
      "Red",
      "Black"
    ],
    "ProductCategory" : "Bicycle",
    "BicycleType" : "Hybrid"
  }
}

データが確認できました!

既存データを削除してみる

DynamoDBのデータの操作にはdyneinを利用します。

> dy del -t ProductCatalog 205
Successfully deleted an item from the table 'ProductCatalog'.

> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty
{
  "_id": "205",
  "_index": "table-index-2",
  "found": false
}

データが削除されていることが確認できました! この連携は数秒で完了しているようです。連携時間にどれほどのばらつきがあるかは未検証です。

新規データを追加してみる

先程削除したデータを再投入してみます。

> dy put -t ProductCatalog 205 -i '{
  "Brand": "Brand-Company C",
  "ProductCategory": "Bicycle",
  "Description": "205 Description",
  "Price": 500,
  "Id": 205,
  "BicycleType": "Hybrid",
  "Color": [
    "Red",
    "Black"
  ],
  "Title": "18-Bike-204"
}'
Successfully put an item to the table 'ProductCatalog'.

> opensearch-cli curl get -p play-opensearch -P table-index/_doc/205 --pretty
{
  "_index" : "table-index-2",
  "_id" : "205",
  "_version" : 1701919401000000,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "Brand" : "Brand-Company C",
    "Description" : "205 Description",
    "Price" : 500,
    "Color" : [
      "Black",
      "Red"
    ],
    "ProductCategory" : "Bicycle",
    "Title" : "18-Bike-204",
    "Id" : 205,
    "BicycleType" : "Hybrid"
  }
}

データが追加されていることが確認できました!

まとめ

以上の通り、既存データも新規操作もOpenSearchに連携されることが確認できました。

OpenSearchはとても多機能な検索エンジンなので、この機にいろいろ触ってみてはいかがでしょうか。(筆者はいろいろ勉強が必要であると痛感しています。。。)

以上でした!

余談: 設定したyamlは何者?

今回の設定に使用したyamlの仕様はどこに書いてあるのかというと、OSSとしてのOpenSearchのドキュメントに書いてありました。

OSSのOpenSearchの機能の中ではData Prepperと呼ばれる機能で、DynamoDB連携を実現する前から様々なデータソースからOpenSearchへとデータを統合する機能として存在していたようです。

yamlの書き方について不明な点がある場合は、上記のドキュメントを参照すると良いと思います。