Amazon DynamoDB zero-ETL integration with Amazon OpenSearch Service をCDKで書いてみた

公開されたばかりの機能だけど、CDKで書けたので紹介します。
2023.12.07

はじめに

以下のブログで紹介したDynamoDBからOpenSearchへのデータ連携機能について、CDKでの書き方を紹介します。

コード全量

import { Construct } from "constructs";
import * as cdk from "aws-cdk-lib";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as iam from "aws-cdk-lib/aws-iam";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import * as osis from "aws-cdk-lib/aws-osis";

type Props = cdk.StackProps & {
  table: dynamodb.ITable;
};
export class SimpleStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: Props) {
    super(scope, id, props);
    const { table } = props;

    const domain = new opensearch.Domain(this, "Domain", {
      version: opensearch.EngineVersion.OPENSEARCH_2_11,
      capacity: {
        multiAzWithStandbyEnabled: false,
      },
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    /**
     * 既存のDynamoDB ItemsをOpenSearchに同期するためのS3バケット
     */
    const bucket = new s3.Bucket(this, "Bucket", {
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      encryption: s3.BucketEncryption.S3_MANAGED,
      autoDeleteObjects: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    /**
     * OSIS Pipeline用のIAM Role
     */
    const pipelineRole = new iam.Role(this, "IngestionRole", {
      assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com", {
        conditions: {
          StringEquals: {
            "aws:SourceAccount": this.account,
          },
          ArnLike: {
            "aws:SourceArn": this.formatArn({
              service: "osis",
              resource: "pipeline",
              resourceName: "*",
            }),
          },
        },
      }),
      inlinePolicies: {
        /**
         * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-configure
         */
        ingestionPipeline: new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              actions: ["es:DescribeDomain"],
              resources: [domain.domainArn],
            }),
            new iam.PolicyStatement({
              actions: ["es:ESHttp*"],
              resources: [`${domain.domainArn}/*`],
            }),
          ],
        }),
        /**
         * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/configure-client-ddb.html#ddb-pipeline-role
         */
        dynamodbIngestion: new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              sid: "allowRunExportJob",
              actions: [
                "dynamodb:DescribeTable",
                "dynamodb:DescribeContinuousBackups",
                "dynamodb:ExportTableToPointInTime",
              ],
              resources: [table.tableArn + ""],
            }),
            new iam.PolicyStatement({
              sid: "allowCheckExportjob",
              actions: ["dynamodb:DescribeExport"],
              resources: [table.tableArn + "/export/*"],
            }),
            new iam.PolicyStatement({
              sid: "allowReadFromStream",
              actions: [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
              ],
              resources: [table.tableArn + "/stream/*"],
            }),
            new iam.PolicyStatement({
              sid: "allowReadAndWriteToS3ForExport",
              actions: [
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutObjectAcl",
              ],
              resources: [bucket.bucketArn + "/*"],
            }),
          ],
        }),
      },
    });

    /**
     * OSIS PipelineのためのOpenSearchドメインのリソースポリシー
     * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-domain
     */
    domain.addAccessPolicies(
      new iam.PolicyStatement({
        principals: [pipelineRole],
        actions: ["es:DescribeDomain", "es:ESHttp*"],
        resources: [`${domain.domainArn}/*`],
      }),
    );

    /**
     * OSIS Pipeline
     */
    new osis.CfnPipeline(this, "OSISPipeline", {
      pipelineName: "simple-osis-pipeline",
      minUnits: 1,
      maxUnits: 4,
      pipelineConfigurationBody: `
        version: "2"
        dynamodb-pipeline:
          source:
            dynamodb:
              acknowledgments: true
              tables:
                - table_arn: ${table.tableArn}
                  stream:
                    start_position: LATEST
                  export:
                    s3_bucket: ${bucket.bucketName}
                    s3_region: ${this.region}
              aws:
                sts_role_arn: ${pipelineRole.roleArn}
                region: ${this.region}
          sink:
            - opensearch:
                hosts:
                  - https://${domain.domainEndpoint}
                index: table-index
                index_type: custom
                document_id: \${getMetadata("primary_key")}
                action: \${getMetadata("opensearch_action")}
                document_version: \${getMetadata("document_version")}
                document_version_type: external
                aws:
                  sts_role_arn: ${pipelineRole.roleArn}
                  region: ${this.region}
      `,
    });
  }
}

解説

IAM Role

DynamoDBからOpenSearchへのデータ連携を行うための権限を持ったIAM Roleを作成します。

const pipelineRole = new iam.Role(this, "IngestionRole", {
  /**
   * 以下のドキュメントのとおり、`osis-pipelines.amazonaws.com`を信頼するService Principalを指定します。
   * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-configure
   */
  assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com", {
    conditions: {
      StringEquals: {
        "aws:SourceAccount": this.account,
      },
      ArnLike: {
        "aws:SourceArn": this.formatArn({
          service: "osis",
          resource: "pipeline",
          resourceName: "*",
        }),
      },
    },
  }),
  inlinePolicies: {
    /**
     * OpenSearchにデータを投入するための権限
     * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-configure
     */
    ingestionPipeline: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ["es:DescribeDomain"],
          resources: [domain.domainArn],
        }),
        new iam.PolicyStatement({
          actions: ["es:ESHttp*"],
          resources: [`${domain.domainArn}/*`],
        }),
      ],
    }),
    /**
     * DynamoDBからデータを取得するための権限
     * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/configure-client-ddb.html#ddb-pipeline-role
     */
    dynamodbIngestion: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          sid: "allowRunExportJob",
          actions: [
            "dynamodb:DescribeTable",
            "dynamodb:DescribeContinuousBackups",
            "dynamodb:ExportTableToPointInTime",
          ],
          resources: [table.tableArn + ""],
        }),
        new iam.PolicyStatement({
          sid: "allowCheckExportjob",
          actions: ["dynamodb:DescribeExport"],
          resources: [table.tableArn + "/export/*"],
        }),
        new iam.PolicyStatement({
          sid: "allowReadFromStream",
          actions: [
            "dynamodb:DescribeStream",
            "dynamodb:GetRecords",
            "dynamodb:GetShardIterator",
          ],
          resources: [table.tableArn + "/stream/*"],
        }),
        new iam.PolicyStatement({
          sid: "allowReadAndWriteToS3ForExport",
          actions: [
            "s3:GetObject",
            "s3:AbortMultipartUpload",
            "s3:PutObject",
            "s3:PutObjectAcl",
          ],
          resources: [bucket.bucketArn + "/*"],
        }),
      ],
    }),
  },
});

OpenSearch Domainのリソースポリシー

ドキュメントの内容に従って、以下のようにリソースポリシーを設定します。

domain.addAccessPolicies(
  new iam.PolicyStatement({
    principals: [pipelineRole],
    actions: ["es:DescribeDomain", "es:ESHttp*"],
    resources: [`${domain.domainArn}/*`],
  }),
);

OSIS Pipeline

以下がOpenSearchのIngestion Pipelineを設定するためのCDKのコードです。 まだL2が作成されていないので、L1のosis.CfnPipelineクラスで作成しています

それでもDynamoDB TableのARNや、S3 Bucketの名前や、OpenSearchドメインのエンドポイントなどを簡単に参照できるので、CDKで書くメリットはあると思います。

new osis.CfnPipeline(this, "OSISPipeline", {
  pipelineName: "simple-osis-pipeline",
  minUnits: 1,
  maxUnits: 4,
  pipelineConfigurationBody: `
    version: "2"
    dynamodb-pipeline:
      source:
        dynamodb:
          acknowledgments: true
          tables:
            - table_arn: ${table.tableArn}
              stream:
                start_position: LATEST
              export:
                s3_bucket: ${bucket.bucketName}
                s3_region: ${this.region}
          aws:
            sts_role_arn: ${pipelineRole.roleArn}
            region: ${this.region}
      sink:
        - opensearch:
            hosts:
              - https://${domain.domainEndpoint}
            index: table-index
            index_type: custom
            document_id: \${getMetadata("primary_key")}
            action: \${getMetadata("opensearch_action")}
            document_version: \${getMetadata("document_version")}
            document_version_type: external
            aws:
              sts_role_arn: ${pipelineRole.roleArn}
              region: ${this.region}
  `,
});

まとめ

個人的にCDKが手に馴染んでいるのもありますが、CDKで書くことで何のリソースが必要でそれぞれどのように依存しているのかが俯瞰できるのが魅力だと思います。

このデータ連携に使われているOpenSearchの機能であるData Prepperにはまだまだ多くの機能があるようなので、引き続き検証していきたいと思っています。

以上でした!