DynamoDB ストリームのイベントをSNS経由で別アカウントのSQSに配信してみる
以下の構成を試す機会があったので、手順をブログにしてみます。
やってみた
PublisherアカウントとSubscriberアカウントでAWSアカウントが2つ必要になります。
作業は基本的にAWS CLIで行います。
作業の流れは以下になります。
- DynamoDB テーブル・ストリームの作成(Publisherアカウント)
- SNSの作成(Publisherアカウント)
- Lambda用IAMロールの作成(Publisherアカウント)
- DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
- DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
- SQSの作成(Subscriberアカウント)
1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)
以下コマンドでDynamoDBストリームが有効になったDynamoDBテーブルを作成します。
$ aws dynamodb create-table \ --table-name sqs-dynamodb-table \ --attribute-definitions \ AttributeName=id,AttributeType=S \ --key-schema \ AttributeName=id,KeyType=HASH \ --stream-specification \ StreamEnabled=true,StreamViewType=KEYS_ONLY \ --provisioned-throughput \ ReadCapacityUnits=1,WriteCapacityUnits=1 \ --table-class STANDARD
2. SNSの作成(Publisherアカウント)
SNSトピックを作成します。出力でSNSトピックARNが表示されるため、控えておきます。
$ aws sns create-topic --name sqs-dynamodb-topic
別アカウントのSQSからサブスクライブを可能にする必要があります。
そのため、SNSアクセスポリシーで別アカウントからのサブスクライブを許可します。
以下のファイルを用意します。アカウントIDの部分は各自の環境に合わせて置き換えてください。
{ "Version": "2008-10-17", "Id": "__default_policy_ID", "Statement": [ { "Sid": "AllowFromPublisherAccounts", "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": [ "SNS:GetTopicAttributes", "SNS:SetTopicAttributes", "SNS:AddPermission", "SNS:RemovePermission", "SNS:DeleteTopic", "SNS:Subscribe", "SNS:ListSubscriptionsByTopic", "SNS:Publish" ], "Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic", "Condition": { "StringEquals": { "AWS:SourceOwner": "1234567890" # Publisherアカウント アカウントID } } }, { "Sid": "AllowFromSubscriberAccounts", "Effect": "Allow", "Principal": { "AWS": "2345678901" # Subscriberアカウント アカウントID }, "Action": "sns:Subscribe", "Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic" } ] }
以下のコマンドでアクセスポリシーを更新します。
$ TOPIC_ARN="<作成したSNSトピックのARN>" $ aws sns set-topic-attributes --topic-arn $TOPIC_ARN --attribute-name Policy --attribute-value file://sns-access-policy.json
3. Lambda用IAMロールの作成(Publisherアカウント)
信頼ポリシーの設定のために、以下のファイルを用意します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
Lambda用のIAMロールを用意します。DynamoDBストリームのデータを取得して、CloudWatch Logsにログを出力できるようにマネージドポリシーAWSLambdaDynamoDBExecutionRole
をアタッチします。
$ aws iam create-role --role-name sqs-dynamodb-function-role --assume-role-policy-document file://lambda-trust-policy.json $ aws iam attach-role-policy --role-name sqs-dynamodb-function-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole
LambdaからSNSトピックへのアクセスを可能にするために、LambdaのIAMロールにポリシーを追加します。
以下のファイルを用意します。SNSトピックARNのところは、先程出力されたSNSトピックARNに置き換えます。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "sns:Publish", "Resource": "<SNSトピックARN>" } ] }
LambdaのIAMロールにポリシーを追加します。
$ aws iam put-role-policy --role-name sqs-dynamodb-function-role --policy-name sns-publish-policy --policy-document file://sns-publish-policy.json
4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
DynamoDBストリームのイベントを受け取って、SNSに送信するLambdaを用意します。
DynamoDBに新しいアイテムが追加された際に、SNSにアイテムのIDを送るという内容です。
import boto3 def lambda_handler(event, context): sns = boto3.client('sns') for record in event['Records']: if record['eventName'] == 'INSERT': ddb_item_id = record['dynamodb']['Keys']['id']['S'] message = f'New item with id {ddb_item_id}' response = sns.publish( TopicArn='arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic', # 置き換えが必要 Message=message ) print(response) print('Successfully processed %s records.' % str(len(event['Records'])))
Lambdaにアップロードするために、.py
のファイルをzipにします。
$ zip function.zip index.py
Lambda関数を作成します。
$ aws lambda create-function --function-name sqs-dynamodb-function \ --zip-file fileb://function.zip --handler index.lambda_handler --runtime python3.9 \ --role arn:aws:iam::1234567890:role/lambda-dynamodb-role
5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
DynamoDB ストリームをトリガーにLambdaを実行するために、DynamoDBストリームとLambda関数を関連付けます。
$ STREAM_ARN=`aws dynamodb describe-table --table-name sqs-dynamodb-table --query "Table.LatestStreamArn" --output text` $ aws lambda create-event-source-mapping --function-name sqs-dynamodb-function \ --batch-size 100 --starting-position LATEST --event-source $STREAM_ARN
6. SQSの作成(Subscriberアカウント)
Subscriberアカウントの作業です。キューを作成して、SNSトピックにサブスクライブします。
$ aws sqs create-queue --queue-name sqs-dynamodb-queue $ aws sns subscribe --topic-arn <SNSトピックARN> --protocol sqs --notification-endpoint <SQSキューARN>
このままでは、SNSからSQSにアクセスができないため、SQSアクセスポリシーを設定します。
以下の、ファイルを用意します。
{ "Version": "2012-10-17", "Id": "__default_policy_ID", "Statement": [ { "Sid": "AllowFromSubscriberAccounts", "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::2345678901:root" # Subscriberアカウント アカウントID }, "Action": "SQS:*", "Resource": "<SQSキューARN>" }, { "Sid": "AllowFromPublisherAccounts", "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SQS:SendMessage", "Resource": "<SQSキューARN>", "Condition": { "ArnLike": { "aws:SourceArn": "<SNSトピックARN>" } } }] }
作成したファイルを使って、ポリシーを設定します。
$ QUEUE_URL=`aws sqs get-queue-url --queue-name sqs-dynamodb-queue --output text` $ aws sqs set-queue-attributes --queue-url $QUEUE_URL --attributes file://sqs-access-policy.json
ちなみにマネジメントコンソール経由でサブスクライブすると、AllowFromPublisherAccounts
相当の部分が自動的に追加されます。AWS CLIだと自動では追加されません。
これで一通りの設定が完了です。
動作確認
マネジメントコンソールから動作確認します。
Publisherアカウントにログインして、DynamoDB
-> sqs-dynamodb-table
-> 項目を作成
を選択します。
値を「hoge」として項目を作成します。
ちなみにAWS CLIだと以下のコマンドでアイテムを追加できます。
$ aws dynamodb put-item --table-name sqs-dynamodb-table --item '{"id": {"S": "hoge"}}
続いてSubscriberアカウントにログインして、SQS
-> sqs-dynamodb-queue
-> メッセージを送受信
->メッセージをポーリング
を選択します。
メッセージが受信されて、中身が先程DynamoDB Tableに追加したid hogeがメッセージに含まれていることが分かります。
これで、DynamoDB ストリームから別アカウントのSQSキューにメッセージが配信できたことを確認しました。
おわりに
DynamoDBストリーム -> SNS -> SQSのクロスアカウント構成についてでした。
実際にやってみるとコンポーネント数が多いため意外と設定が大変でした、本番運用するならIaC化したいところです。
参考になれば幸いです。
以上、AWS事業本部の佐藤(@chari7311)でした。