DynamoDB ストリームのイベントをEventBridge経由で別アカウントのSQSに配信してみる

2023.09.25

以下の構成を試す機会があったので、手順をブログにしてみます。

SNSを使ったパターンは以下をご覧ください。

EventBridgeのクロスアカウント部分だけ確認したい場合は、以下がおすすめです。

やってみた

PublisherアカウントとSubscriberアカウントでAWSアカウントが2つ必要になります。

作業は基本的にAWS CLIで行います。

作業の流れは以下になります。

  1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)
  2. EventBridgeイベントバス・ルールの作成(Publisherアカウント)
  3. Lambda用IAMロールの作成(Publisherアカウント)
  4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)
  5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)
  6. SQSキューの作成・ルールのターゲットにSQSキューを指定(Subscriberアカウント)
  7. ルールのターゲットにSubscriberアカウントのイベントバス指定(Publisherアカウント)

AWSアカウントIDが出てきますが、長くなってしまうので略称にしています。

略称 名称
AWSアカウントID(Pub) Publisherアカウント AWSアカウントID
AWSアカウントID(Sub) Subscriberアカウント AWSアカウントID

1. DynamoDB テーブル・ストリームの作成(Publisherアカウント)

DynamoDBストリームが有効になったDynamoDBテーブルを作成します。

 $ aws dynamodb create-table \
     --table-name pubsub-eventbridge-table \
     --attribute-definitions \
         AttributeName=id,AttributeType=S \
     --key-schema \
         AttributeName=id,KeyType=HASH \
     --stream-specification \
        StreamEnabled=true,StreamViewType=KEYS_ONLY \
     --provisioned-throughput \
         ReadCapacityUnits=1,WriteCapacityUnits=1 \
     --table-class STANDARD

2. EventBridgeイベントバス・ルールの作成(Publisherアカウント)

イベントバスを作成します。

$ aws events create-event-bus --name pubsub-eventbridge-publisher-eventbus

ルールを作成します。

イベントバスのパターン用のjsonファイルを用意します。

全てのイベントを受け取るようにしても動作はしますが、意図せず呼び出しが増えて課金が発生することを懸念してsourceを絞っておきます。

event-pattern.json

{
    "source": ["my.lambda"],
}

用意したファイルを使ってルールを作ります。

$ aws events put-rule --name pubsub-eventbridge-publisher-rule --event-pattern file://event-pattern.json --event-bus-name pubsub-eventbridge-sample-publisher-eventbus

この時点では、ルールのターゲットは設定していません。

Subscriber側でイベントバスを作成次第、このルールのターゲットに追加します。

3. Lambda用IAMロールの作成(Publisherアカウント)

信頼ポリシーの設定のために、以下のファイルを用意します。

lambda-trust-policy.json

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
}

Lambda用のIAMロールを作成します。DynamoDBストリームのデータを取得して、CloudWatch Logsにログを出力できるようにマネージドポリシーAWSLambdaDynamoDBExecutionRoleをアタッチします。(SNSを使用するパターンと同様)

$ aws iam create-role --role-name pubsub-eventbridge-function-role --assume-role-policy-document file://lambda-trust-policy.json
$ aws iam attach-role-policy --role-name pubsub-eventbridge-function-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole

LambdaからEventBridgeイベントバスへアクセスを可能にするために、LambdaのIAMロールにポリシーを追加します。

以下のファイルを用意します。

eventbus-putevents-policy.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "events:PutEvents",
            "Resource": "<イベントバスARN>"
        }
    ]
}

LambdaのIAMロールにポリシーを追加します。

$ aws iam put-role-policy --role-name pubsub-eventbridge-function-role --policy-name eventbus-putevents-policy --policy-document file://eventbus-putevents-policy.json

4. DynamoDB ストリームをトリガーとするLambdaの作成(Publisherアカウント)

DynamoDBストリームのイベントを受け取って、EventBridge イベントバスに送信するLambdaを用意します。

DynamoDBに新しいアイテムが追加された際に、イベントバスにアイテムのIDを送るという内容です。

TODO:コードの内容を考える

index.py

import boto3

def lambda_handler(event, context):
    eventbridge = boto3.client('events')

    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            ddb_item_id = record['dynamodb']['Keys']['id']['S']

            message = f'New item with id {ddb_item_id}'

            response = eventbridge.put_events(
                Entries=[
                    {
                        'Source': 'lambda',
                        'DetailType': 'MessageFromLambda',
                        'Detail': message,
                        'EventBusName': 'pubsub-eventbridge-publisher-eventbus'
                    }
                ]
            )

            print(response)
    print('Successfully processed %s records.' % str(len(event['Records'])))

Lambdaにアップロードするために、.pyのファイルをzipにします。

$ zip function.zip index.py

Lambda関数を作成します。

$ aws lambda create-function --function-name pubsub-eventbridge-function \
--zip-file fileb://function.zip --handler index.lambda_handler --runtime python3.9 \
--role arn:aws:iam::<AWSアカウント>:role/pubsub-eventbridge-function-role

5. DynamoDB ストリームを Lambda 関数に関連付ける(Publisherアカウント)

DynamoDB ストリームをトリガーにLambdaを実行するために、DynamoDBストリームとLambda関数を関連付けます。

$ STREAM_ARN=`aws dynamodb describe-table --table-name pubsub-eventbridge-table --query "Table.LatestStreamArn" --output text`
$ aws lambda create-event-source-mapping --function-name pubsub-eventbridge-function \
 --batch-size 100 --starting-position LATEST --event-source $STREAM_ARN

6. EventBridgeイベントバス・ルールの作成(Subscriberアカウント)

Subscriber側のアカウントにログインして作業を行います。

$ aws events create-event-bus --name pubsub-eventbridge-subscriber-eventbus

デフォルトではイベントバスは他のアカウントからイベントを受け取れないため、Publisherアカウントからのアクセスを許可します。

$ aws events put-permission --event-bus-name pubsub-eventbridge-subscriber-eventbus \ 
--statement-id sample --action "events:PutEvents" --principal <AWSアカウントID(Pub)>

Publisherアカウント側と同様にルールを作成します。

イベントバスのパターン用のjsonファイルは、Subscriberのものと同様です。

$ aws events put-rule --name pubsub-eventbridge-subscriber-rule --event-pattern file://event-pattern.json --event-bus-name pubsub-eventbridge-subscriber-sample-eventbus

7. SQSキューの作成・ルールのターゲットにSQSキューを指定(Subscriberアカウント)

SQSキューを作成します。

$ aws sqs create-queue --queue-name pubsub-eventbridge-queue

SQSキューにEventbridgeルールからアクセス許可するために、アクセスポリシーを設定します。

以下のファイルを用意します。

sqs-access-policy.json

{
    "Version": "2012-10-17",
    "Id": "pubsub-eventbridge-subscriber-queue-policy",
    "Statement": [
      {
        "Sid": "AWSEvents_pubsub-eventbridge-subscriber-ule",
        "Effect": "Allow",
        "Principal": {
          "Service": "events.amazonaws.com"
        },
        "Action": "sqs:SendMessage",
        "Resource": "arn:aws:sqs:ap-northeast-1:<AWSアカウントID(Sub)>1:pubsub-eventbridge-queue-manual",
        "Condition": {
          "ArnEquals": {
            "aws:SourceArn": "arn:aws:events:ap-northeast-1:<AWSアカウントID(Sub)>1:rule/pubsub-eventbridge-subscriber-eventbus/pubsub-eventbridge-subscriber-sample-rule"
          }
        }
      }
    ]
  }

用意したファイルを使って、ポリシーを設定します。

$ QUEUE_URL=`aws sqs get-queue-url --queue-name pubsub-eventbridge-queue --output text`
$ aws sqs set-queue-attributes --queue-url $QUEUE_URL --attributes file://sqs-access-policy.json

前の手順で作ったルールのターゲットに、SQSキューを指定します。

$ aws events put-targets --rule pubsub-eventbridge-subscriber-rule \
  --event-bus-name pubsub-eventbridge-subscriber-eventbus \
  --targets "Id"="1","Arn"="arn:aws:sqs:ap-northeast-1:<AWSアカウントID(Sub)>1:pubsub-eventbridge-queue"

これでSubscriber側の設定は完了です。

補足: SQSアクセスポリシーの設定でエラーがでる

私の環境ではSQSのアクセスポリシーのダブルクォートをエスケープしてあげないと、aws sqs set-queue-attributesコマンド実行時にエラーになりました。

Parameter validation failed:
Invalid type for parameter Attributes.Statement, value: [OrderedDict([('Effect', 'Allow'), ('Principal', OrderedDict([('AWS', '*')])), ('Action', 'sqs:SendMessage'), ('Resource', 'arn:aws:sqs:us-east-1:<AWSアカウントID(Pub)>12:MyQueue')])], type: <class 'list'>, valid types: <class 'str'>

同様の事象が発生する場合は、以下のファイルを使用してください。

sqs-access-policy-json

{
    "Policy": "{\"Version\": \"2012-10-17\",\"Id\": \"sample-policy-id\",\"Statement\": [{\"Sid\": \"AWSEvents_pubsub-eventbridge-subscriber-rule\",\"Effect\": \"Allow\",\"Principal\": {\"Service\": \"events.amazonaws.com\"},\"Action\": \"SQS:SendMessage\",\"Resource\": \"arn:aws:sqs:ap-northeast-1:<AWSアカウントID(Sub)>1:pubsub-eventbridge-sample-queue\",\"Condition\": {\"ArnEquals\": {\"aws:SourceArn\": \"arn:aws:events:ap-northeast-1:<AWSアカウントID(Sub)>1:rule/pubsub-eventbridge-subscriber-sample-eventbus/pubsub-eventbridge-subscriber-sample-rule\"}}}]}"
}

ちなみに変換は以下のコマンドでできました。

$ new_policy=$(jq -c '.Policy' sqs-access-policy.json | sed 's/"/\\"/g'); echo "{ \"Policy\": \"$new_policy\" }" > sqs-access-policy.json

8. ルールのターゲットにSubscriberアカウントのイベントバス指定(Publisherアカウント)

Publisherアカウントに戻ります。

イベントルールで別のアカウントにイベントを送信するために、IAMロールが必要です。

IAMロールを作成して、別アカウントのイベントバスにイベントを送信するポリシーをアタッチします。

eventbridge-trust-policy.json

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Effect": "Allow",
          "Principal": {
              "Service": "events.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
      }
  ]
}

event-rule-eventbus-putevents-policy.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "events:PutEvents"
            ],
            "Resource": [
                "arn:aws:events:ap-northeast-1:<AWSアカウントID(Sub)>:event-bus/pubsub-eventbridge-subscriber-eventbus"
            ]
        }
    ]
}
# IAMロールの作成
$ aws iam create-role --role-name pubsub-eventbridge-rule-role --assume-role-policy-document file://eventbridge-trust-policy.json
# ポリシーのアタッチ
$ aws iam put-role-policy --role-name pubsub-eventbridge-rule-role --policy-name event-rule-eventbus-putevents-policy --policy-document file://event-rule-eventbus-putevents-policy.json

ルールのターゲットにSubscriberアカウントのイベントバスを設定します。

$ aws events put-targets --rule pubsub-eventbridge-publisher-rule \
  --event-bus-name pubsub-eventbridge-publisher-eventbus \
  --targets "Id"="1","Arn"="arn:aws:events:ap-northeast-1:<AWSアカウントID(Sub)>:event-bus/pubsub-eventbridge-subscriber-eventbus","RoleArn"="arn:aws:iam::<AWSアカウントID(Pub)>:role/pubsub-eventbridge-sample-rule-role"

動作確認

AWS CLIで動作確認を行います。マネジメントコンソールで確認したい場合は、こちらを参照ください。

PublisherアカウントのDynamoDBテーブルにアイテムを追加して、SubscriberアカウントのSQSキューでメッセージを受信できるか確認します。

Publisherアカウントでコマンドを実行して、DynamoDBテーブルにアイテムを追加します。

作成するアイテムはid: testとします。

$ aws dynamodb put-item --table-name pubsub-eventbridge-table --item '{"id": {"S": "test"}}'

Subscriberアカウントでコマンドを実行しまして、SQSキューのメッセージの受信を確認します。

{\"message\":\"New item with id test\"}}"からid: testのメッセージの受信を確認しました。

$ aws sqs receive-message --queue-url $QUEUE_URL

{
    "Messages": [
        {
            "MessageId": "33d08fd9-0323-45f9-8c6b-c28a507abd70",
            "ReceiptHandle": "AQEBx82UucW1WNOdPp3g1p0l1MjBy+lbGEiJesqN/wRQdwD+QYy/4xGbtSP6uqzbuWXk36L/zA3iXNw/CiO5zT2veSK9ZhVXdmrmH5UoDO1paPnRJ4RmE6phkMWUWs4Y9f2jWql8Rpp0rEEIz5szOhe7/UsU4dXeP0+Bj7fQpaXH6U4AH0l3zUwYI+dUm/7EfUbQNTizW8c/e3DCcTByFsKwkkpvkQvgZK5nML8yrhhLoXLH2XbbdA0TUwrIFW2+UV5MIg0Ov9aYa7I2sCEb6TB8nm2XwQqrBxfGeR5OAlNGbPzlv52kSM67U+/gAsV4WbL8Vy12IIqOWDtdoh+vWzGIwv0FIN2psb2oWp9r/H7JCU/mHH8IFhEpeAq70l/fm6+rKelnhjaEX53vy/c++9Tfz2fEyl2jurOfmyd05hFs23I=",
            "MD5OfBody": "b75785b2250882872640eea17d986ed4",
            "Body": "{\"version\":\"0\",\"id\":\"ce20e81b-dcb9-f432-e841-799c0a10f448\",\"detail-type\":\"MessageFromLambda\",\"source\":\"my.lambda\",\"account\":\"<AWSアカウントID(Pub)>\",\"time\":\"2023-09-25T02:17:05Z\",\"region\":\"ap-northeast-1\",\"resources\":[],\"detail\":{\"message\":\"New item with id test\"}}"
        }
    ]
}

おわりに

DynamoDBストリーム -> EventBridge -> SQSのクロスアカウント構成についてでした。

EventBridgeでクロスアカウントを行う場合、クロスアカウントの設定がEventBridge内で完結するのがいいですね。

誰かの参考になれば幸いです。

以上、AWS事業本部の佐藤(@chari7311)でした。

参考