DynamoDB ストリームのイベントをSNS経由で別アカウントのSQSに配信してみる

2023.09.19

以下の構成を試す機会があったので、手順をブログにしてみます。

やってみた

PublisherアカウントとSubscriberアカウントでAWSアカウントが2つ必要になります。

作業は基本的にAWS CLIで行います。

作業の流れは以下になります。

  1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)
  2. SNSの作成(Publisherアカウント)
  3. Lambda用IAMロールの作成(Publisherアカウント)
  4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
  5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
  6. SQSの作成(Subscriberアカウント)

1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)

以下コマンドでDynamoDBストリームが有効になったDynamoDBテーブルを作成します。

 $ aws dynamodb create-table \
     --table-name sqs-dynamodb-table \
     --attribute-definitions \
         AttributeName=id,AttributeType=S \
     --key-schema \
         AttributeName=id,KeyType=HASH \
     --stream-specification \
        StreamEnabled=true,StreamViewType=KEYS_ONLY \
     --provisioned-throughput \
         ReadCapacityUnits=1,WriteCapacityUnits=1 \
     --table-class STANDARD

2. SNSの作成(Publisherアカウント)

SNSトピックを作成します。出力でSNSトピックARNが表示されるため、控えておきます。

$ aws sns create-topic --name sqs-dynamodb-topic

別アカウントのSQSからサブスクライブを可能にする必要があります。

そのため、SNSアクセスポリシーで別アカウントからのサブスクライブを許可します。

以下のファイルを用意します。アカウントIDの部分は各自の環境に合わせて置き換えてください。

sns-access-policy.json

{
    "Version": "2008-10-17",
    "Id": "__default_policy_ID",
    "Statement": [
      {
        "Sid": "AllowFromPublisherAccounts",
        "Effect": "Allow",
        "Principal": {
          "AWS": "*"
        },
        "Action": [
          "SNS:GetTopicAttributes",
          "SNS:SetTopicAttributes",
          "SNS:AddPermission",
          "SNS:RemovePermission",
          "SNS:DeleteTopic",
          "SNS:Subscribe",
          "SNS:ListSubscriptionsByTopic",
          "SNS:Publish"
        ],
        "Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic",
        "Condition": {
          "StringEquals": {
            "AWS:SourceOwner": "1234567890" # Publisherアカウント アカウントID
          }
        }
      },
      {
        "Sid": "AllowFromSubscriberAccounts",
        "Effect": "Allow",
        "Principal": {
           "AWS": "2345678901" # Subscriberアカウント アカウントID
        },
        "Action": "sns:Subscribe",
        "Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic"
     }
    ]
  }

以下のコマンドでアクセスポリシーを更新します。

$ TOPIC_ARN="<作成したSNSトピックのARN>"
$ aws sns set-topic-attributes --topic-arn $TOPIC_ARN --attribute-name Policy --attribute-value file://sns-access-policy.json

3. Lambda用IAMロールの作成(Publisherアカウント)

信頼ポリシーの設定のために、以下のファイルを用意します。

lambda-trust-policy.json

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
}

Lambda用のIAMロールを用意します。DynamoDBストリームのデータを取得して、CloudWatch Logsにログを出力できるようにマネージドポリシーAWSLambdaDynamoDBExecutionRoleをアタッチします。

$ aws iam create-role --role-name sqs-dynamodb-function-role --assume-role-policy-document file://lambda-trust-policy.json
$ aws iam attach-role-policy --role-name sqs-dynamodb-function-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole

LambdaからSNSトピックへのアクセスを可能にするために、LambdaのIAMロールにポリシーを追加します。

以下のファイルを用意します。SNSトピックARNのところは、先程出力されたSNSトピックARNに置き換えます。

sns-publish-policy.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "<SNSトピックARN>"
        }
    ]
}

LambdaのIAMロールにポリシーを追加します。

$ aws iam put-role-policy --role-name sqs-dynamodb-function-role --policy-name sns-publish-policy --policy-document file://sns-publish-policy.json

4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)

DynamoDBストリームのイベントを受け取って、SNSに送信するLambdaを用意します。

DynamoDBに新しいアイテムが追加された際に、SNSにアイテムのIDを送るという内容です。

index.py

import boto3

def lambda_handler(event, context):
    sns = boto3.client('sns')

    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            ddb_item_id = record['dynamodb']['Keys']['id']['S']

            message = f'New item with id {ddb_item_id}'

            response = sns.publish(
                TopicArn='arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic', # 置き換えが必要
                Message=message
            )

            print(response)
    print('Successfully processed %s records.' % str(len(event['Records'])))

Lambdaにアップロードするために、.pyのファイルをzipにします。

$ zip function.zip index.py

Lambda関数を作成します。

$ aws lambda create-function --function-name sqs-dynamodb-function \
--zip-file fileb://function.zip --handler index.lambda_handler --runtime python3.9 \
--role arn:aws:iam::1234567890:role/lambda-dynamodb-role

5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)

DynamoDB ストリームをトリガーにLambdaを実行するために、DynamoDBストリームとLambda関数を関連付けます。

$ STREAM_ARN=`aws dynamodb describe-table --table-name sqs-dynamodb-table --query "Table.LatestStreamArn" --output text`
$ aws lambda create-event-source-mapping --function-name sqs-dynamodb-function \
 --batch-size 100 --starting-position LATEST --event-source $STREAM_ARN

6. SQSの作成(Subscriberアカウント)

Subscriberアカウントの作業です。キューを作成して、SNSトピックにサブスクライブします。

$ aws sqs create-queue --queue-name sqs-dynamodb-queue
$ aws sns subscribe --topic-arn <SNSトピックARN> --protocol sqs --notification-endpoint <SQSキューARN>

このままでは、SNSからSQSにアクセスができないため、SQSアクセスポリシーを設定します。

以下の、ファイルを用意します。

sqs-access-policy.json

{
  "Version": "2012-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "AllowFromSubscriberAccounts",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::2345678901:root" # Subscriberアカウント アカウントID
      },
      "Action": "SQS:*",
      "Resource": "<SQSキューARN>"
    },
    {
      "Sid": "AllowFromPublisherAccounts",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<SQSキューARN>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<SNSトピックARN>"
      }
    }
  }]
}

作成したファイルを使って、ポリシーを設定します。

$ QUEUE_URL=`aws sqs get-queue-url --queue-name sqs-dynamodb-queue --output text`
$ aws sqs set-queue-attributes --queue-url $QUEUE_URL --attributes file://sqs-access-policy.json

ちなみにマネジメントコンソール経由でサブスクライブすると、AllowFromPublisherAccounts相当の部分が自動的に追加されます。AWS CLIだと自動では追加されません。

これで一通りの設定が完了です。

動作確認

マネジメントコンソールから動作確認します。

Publisherアカウントにログインして、DynamoDB -> sqs-dynamodb-table -> 項目を作成を選択します。

値を「hoge」として項目を作成します。

ちなみにAWS CLIだと以下のコマンドでアイテムを追加できます。

$ aws dynamodb put-item --table-name sqs-dynamodb-table --item '{"id": {"S": "hoge"}}

続いてSubscriberアカウントにログインして、SQS -> sqs-dynamodb-queue -> メッセージを送受信->メッセージをポーリングを選択します。

メッセージが受信されて、中身が先程DynamoDB Tableに追加したid hogeがメッセージに含まれていることが分かります。

これで、DynamoDB ストリームから別アカウントのSQSキューにメッセージが配信できたことを確認しました。

おわりに

DynamoDBストリーム -> SNS -> SQSのクロスアカウント構成についてでした。

実際にやってみるとコンポーネント数が多いため意外と設定が大変でした、本番運用するならIaC化したいところです。

参考になれば幸いです。

以上、AWS事業本部の佐藤(@chari7311)でした。

参考