DynamoDB ストリームのイベントをSNS経由で別アカウントのSQSに配信してみる

DynamoDB ストリームのイベントをSNS経由で別アカウントのSQSに配信してみる

Clock Icon2023.09.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

以下の構成を試す機会があったので、手順をブログにしてみます。

やってみた

PublisherアカウントとSubscriberアカウントでAWSアカウントが2つ必要になります。

作業は基本的にAWS CLIで行います。

作業の流れは以下になります。

  1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)
  2. SNSの作成(Publisherアカウント)
  3. Lambda用IAMロールの作成(Publisherアカウント)
  4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
  5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
  6. SQSの作成(Subscriberアカウント)

1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)

以下コマンドでDynamoDBストリームが有効になったDynamoDBテーブルを作成します。

 $ aws dynamodb create-table \
     --table-name sqs-dynamodb-table \
     --attribute-definitions \
         AttributeName=id,AttributeType=S \
     --key-schema \
         AttributeName=id,KeyType=HASH \
     --stream-specification \
        StreamEnabled=true,StreamViewType=KEYS_ONLY \
     --provisioned-throughput \
         ReadCapacityUnits=1,WriteCapacityUnits=1 \
     --table-class STANDARD

2. SNSの作成(Publisherアカウント)

SNSトピックを作成します。出力でSNSトピックARNが表示されるため、控えておきます。

$ aws sns create-topic --name sqs-dynamodb-topic

別アカウントのSQSからサブスクライブを可能にする必要があります。

そのため、SNSアクセスポリシーで別アカウントからのサブスクライブを許可します。

以下のファイルを用意します。アカウントIDの部分は各自の環境に合わせて置き換えてください。

{
    "Version": "2008-10-17",
    "Id": "__default_policy_ID",
    "Statement": [
      {
        "Sid": "AllowFromPublisherAccounts",
        "Effect": "Allow",
        "Principal": {
          "AWS": "*"
        },
        "Action": [
          "SNS:GetTopicAttributes",
          "SNS:SetTopicAttributes",
          "SNS:AddPermission",
          "SNS:RemovePermission",
          "SNS:DeleteTopic",
          "SNS:Subscribe",
          "SNS:ListSubscriptionsByTopic",
          "SNS:Publish"
        ],
        "Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic",
        "Condition": {
          "StringEquals": {
            "AWS:SourceOwner": "1234567890" # Publisherアカウント アカウントID
          }
        }
      },
      {
        "Sid": "AllowFromSubscriberAccounts",
        "Effect": "Allow",
        "Principal": {
           "AWS": "2345678901" # Subscriberアカウント アカウントID
        },
        "Action": "sns:Subscribe",
        "Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic"
     }
    ]
  }

以下のコマンドでアクセスポリシーを更新します。

$ TOPIC_ARN="<作成したSNSトピックのARN>"
$ aws sns set-topic-attributes --topic-arn $TOPIC_ARN --attribute-name Policy --attribute-value file://sns-access-policy.json

3. Lambda用IAMロールの作成(Publisherアカウント)

信頼ポリシーの設定のために、以下のファイルを用意します。

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
}

Lambda用のIAMロールを用意します。DynamoDBストリームのデータを取得して、CloudWatch Logsにログを出力できるようにマネージドポリシーAWSLambdaDynamoDBExecutionRoleをアタッチします。

$ aws iam create-role --role-name sqs-dynamodb-function-role --assume-role-policy-document file://lambda-trust-policy.json
$ aws iam attach-role-policy --role-name sqs-dynamodb-function-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole

LambdaからSNSトピックへのアクセスを可能にするために、LambdaのIAMロールにポリシーを追加します。

以下のファイルを用意します。SNSトピックARNのところは、先程出力されたSNSトピックARNに置き換えます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "<SNSトピックARN>"
        }
    ]
}

LambdaのIAMロールにポリシーを追加します。

$ aws iam put-role-policy --role-name sqs-dynamodb-function-role --policy-name sns-publish-policy --policy-document file://sns-publish-policy.json

4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)

DynamoDBストリームのイベントを受け取って、SNSに送信するLambdaを用意します。

DynamoDBに新しいアイテムが追加された際に、SNSにアイテムのIDを送るという内容です。

import boto3

def lambda_handler(event, context):
    sns = boto3.client('sns')

    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            ddb_item_id = record['dynamodb']['Keys']['id']['S']

            message = f'New item with id {ddb_item_id}'

            response = sns.publish(
                TopicArn='arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic', # 置き換えが必要
                Message=message
            )

            print(response)
    print('Successfully processed %s records.' % str(len(event['Records'])))

Lambdaにアップロードするために、.pyのファイルをzipにします。

$ zip function.zip index.py

Lambda関数を作成します。

$ aws lambda create-function --function-name sqs-dynamodb-function \
--zip-file fileb://function.zip --handler index.lambda_handler --runtime python3.9 \
--role arn:aws:iam::1234567890:role/lambda-dynamodb-role

5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)

DynamoDB ストリームをトリガーにLambdaを実行するために、DynamoDBストリームとLambda関数を関連付けます。

$ STREAM_ARN=`aws dynamodb describe-table --table-name sqs-dynamodb-table --query "Table.LatestStreamArn" --output text`
$ aws lambda create-event-source-mapping --function-name sqs-dynamodb-function \
 --batch-size 100 --starting-position LATEST --event-source $STREAM_ARN

6. SQSの作成(Subscriberアカウント)

Subscriberアカウントの作業です。キューを作成して、SNSトピックにサブスクライブします。

$ aws sqs create-queue --queue-name sqs-dynamodb-queue
$ aws sns subscribe --topic-arn <SNSトピックARN> --protocol sqs --notification-endpoint <SQSキューARN>

このままでは、SNSからSQSにアクセスができないため、SQSアクセスポリシーを設定します。

以下の、ファイルを用意します。

{
  "Version": "2012-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "AllowFromSubscriberAccounts",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::2345678901:root" # Subscriberアカウント アカウントID
      },
      "Action": "SQS:*",
      "Resource": "<SQSキューARN>"
    },
    {
      "Sid": "AllowFromPublisherAccounts",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<SQSキューARN>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<SNSトピックARN>"
      }
    }
  }]
}

作成したファイルを使って、ポリシーを設定します。

$ QUEUE_URL=`aws sqs get-queue-url --queue-name sqs-dynamodb-queue --output text`
$ aws sqs set-queue-attributes --queue-url $QUEUE_URL --attributes file://sqs-access-policy.json

ちなみにマネジメントコンソール経由でサブスクライブすると、AllowFromPublisherAccounts相当の部分が自動的に追加されます。AWS CLIだと自動では追加されません。

これで一通りの設定が完了です。

動作確認

マネジメントコンソールから動作確認します。

Publisherアカウントにログインして、DynamoDB -> sqs-dynamodb-table -> 項目を作成を選択します。

値を「hoge」として項目を作成します。

ちなみにAWS CLIだと以下のコマンドでアイテムを追加できます。

$ aws dynamodb put-item --table-name sqs-dynamodb-table --item '{"id": {"S": "hoge"}}

続いてSubscriberアカウントにログインして、SQS -> sqs-dynamodb-queue -> メッセージを送受信->メッセージをポーリングを選択します。

メッセージが受信されて、中身が先程DynamoDB Tableに追加したid hogeがメッセージに含まれていることが分かります。

これで、DynamoDB ストリームから別アカウントのSQSキューにメッセージが配信できたことを確認しました。

おわりに

DynamoDBストリーム -> SNS -> SQSのクロスアカウント構成についてでした。

実際にやってみるとコンポーネント数が多いため意外と設定が大変でした、本番運用するならIaC化したいところです。

参考になれば幸いです。

以上、AWS事業本部の佐藤(@chari7311)でした。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.