以下の構成を試す機会があったので、手順をブログにしてみます。
やってみた
PublisherアカウントとSubscriberアカウントでAWSアカウントが2つ必要になります。
作業は基本的にAWS CLIで行います。
作業の流れは以下になります。
- DynamoDB テーブル・ストリームの作成(Publisherアカウント)
- SNSの作成(Publisherアカウント)
- Lambda用IAMロールの作成(Publisherアカウント)
- DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
- DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
- SQSの作成(Subscriberアカウント)
1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)
以下コマンドでDynamoDBストリームが有効になったDynamoDBテーブルを作成します。
$ aws dynamodb create-table \
--table-name sqs-dynamodb-table \
--attribute-definitions \
AttributeName=id,AttributeType=S \
--key-schema \
AttributeName=id,KeyType=HASH \
--stream-specification \
StreamEnabled=true,StreamViewType=KEYS_ONLY \
--provisioned-throughput \
ReadCapacityUnits=1,WriteCapacityUnits=1 \
--table-class STANDARD
2. SNSの作成(Publisherアカウント)
SNSトピックを作成します。出力でSNSトピックARNが表示されるため、控えておきます。
$ aws sns create-topic --name sqs-dynamodb-topic
別アカウントのSQSからサブスクライブを可能にする必要があります。
そのため、SNSアクセスポリシーで別アカウントからのサブスクライブを許可します。
以下のファイルを用意します。アカウントIDの部分は各自の環境に合わせて置き換えてください。
sns-access-policy.json
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "AllowFromPublisherAccounts",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish"
],
"Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "1234567890" # Publisherアカウント アカウントID
}
}
},
{
"Sid": "AllowFromSubscriberAccounts",
"Effect": "Allow",
"Principal": {
"AWS": "2345678901" # Subscriberアカウント アカウントID
},
"Action": "sns:Subscribe",
"Resource": "arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic"
}
]
}
以下のコマンドでアクセスポリシーを更新します。
$ TOPIC_ARN="<作成したSNSトピックのARN>"
$ aws sns set-topic-attributes --topic-arn $TOPIC_ARN --attribute-name Policy --attribute-value file://sns-access-policy.json
3. Lambda用IAMロールの作成(Publisherアカウント)
信頼ポリシーの設定のために、以下のファイルを用意します。
lambda-trust-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Lambda用のIAMロールを用意します。DynamoDBストリームのデータを取得して、CloudWatch Logsにログを出力できるようにマネージドポリシーAWSLambdaDynamoDBExecutionRole
をアタッチします。
$ aws iam create-role --role-name sqs-dynamodb-function-role --assume-role-policy-document file://lambda-trust-policy.json
$ aws iam attach-role-policy --role-name sqs-dynamodb-function-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole
LambdaからSNSトピックへのアクセスを可能にするために、LambdaのIAMロールにポリシーを追加します。
以下のファイルを用意します。SNSトピックARNのところは、先程出力されたSNSトピックARNに置き換えます。
sns-publish-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "<SNSトピックARN>"
}
]
}
LambdaのIAMロールにポリシーを追加します。
$ aws iam put-role-policy --role-name sqs-dynamodb-function-role --policy-name sns-publish-policy --policy-document file://sns-publish-policy.json
4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
DynamoDBストリームのイベントを受け取って、SNSに送信するLambdaを用意します。
DynamoDBに新しいアイテムが追加された際に、SNSにアイテムのIDを送るという内容です。
index.py
import boto3
def lambda_handler(event, context):
sns = boto3.client('sns')
for record in event['Records']:
if record['eventName'] == 'INSERT':
ddb_item_id = record['dynamodb']['Keys']['id']['S']
message = f'New item with id {ddb_item_id}'
response = sns.publish(
TopicArn='arn:aws:sns:ap-northeast-1:1234567890:sqs-dynamodb-topic', # 置き換えが必要
Message=message
)
print(response)
print('Successfully processed %s records.' % str(len(event['Records'])))
Lambdaにアップロードするために、.py
のファイルをzipにします。
$ zip function.zip index.py
Lambda関数を作成します。
$ aws lambda create-function --function-name sqs-dynamodb-function \
--zip-file fileb://function.zip --handler index.lambda_handler --runtime python3.9 \
--role arn:aws:iam::1234567890:role/lambda-dynamodb-role
5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
DynamoDB ストリームをトリガーにLambdaを実行するために、DynamoDBストリームとLambda関数を関連付けます。
$ STREAM_ARN=`aws dynamodb describe-table --table-name sqs-dynamodb-table --query "Table.LatestStreamArn" --output text`
$ aws lambda create-event-source-mapping --function-name sqs-dynamodb-function \
--batch-size 100 --starting-position LATEST --event-source $STREAM_ARN
6. SQSの作成(Subscriberアカウント)
Subscriberアカウントの作業です。キューを作成して、SNSトピックにサブスクライブします。
$ aws sqs create-queue --queue-name sqs-dynamodb-queue
$ aws sns subscribe --topic-arn <SNSトピックARN> --protocol sqs --notification-endpoint <SQSキューARN>
このままでは、SNSからSQSにアクセスができないため、SQSアクセスポリシーを設定します。
以下の、ファイルを用意します。
sqs-access-policy.json
{
"Version": "2012-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "AllowFromSubscriberAccounts",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::2345678901:root" # Subscriberアカウント アカウントID
},
"Action": "SQS:*",
"Resource": "<SQSキューARN>"
},
{
"Sid": "AllowFromPublisherAccounts",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "SQS:SendMessage",
"Resource": "<SQSキューARN>",
"Condition": {
"ArnLike": {
"aws:SourceArn": "<SNSトピックARN>"
}
}
}]
}
作成したファイルを使って、ポリシーを設定します。
$ QUEUE_URL=`aws sqs get-queue-url --queue-name sqs-dynamodb-queue --output text`
$ aws sqs set-queue-attributes --queue-url $QUEUE_URL --attributes file://sqs-access-policy.json
ちなみにマネジメントコンソール経由でサブスクライブすると、AllowFromPublisherAccounts
相当の部分が自動的に追加されます。AWS CLIだと自動では追加されません。
これで一通りの設定が完了です。
動作確認
マネジメントコンソールから動作確認します。
Publisherアカウントにログインして、DynamoDB
-> sqs-dynamodb-table
-> 項目を作成
を選択します。
値を「hoge」として項目を作成します。
ちなみにAWS CLIだと以下のコマンドでアイテムを追加できます。
$ aws dynamodb put-item --table-name sqs-dynamodb-table --item '{"id": {"S": "hoge"}}
続いてSubscriberアカウントにログインして、SQS
-> sqs-dynamodb-queue
-> メッセージを送受信
->メッセージをポーリング
を選択します。
メッセージが受信されて、中身が先程DynamoDB Tableに追加したid hogeがメッセージに含まれていることが分かります。
これで、DynamoDB ストリームから別アカウントのSQSキューにメッセージが配信できたことを確認しました。
おわりに
DynamoDBストリーム -> SNS -> SQSのクロスアカウント構成についてでした。
実際にやってみるとコンポーネント数が多いため意外と設定が大変でした、本番運用するならIaC化したいところです。
参考になれば幸いです。
以上、AWS事業本部の佐藤(@chari7311)でした。