[CDK] クロスアカウントでSNSからKinesisDataFirehoseに配信する

2022.11.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWSアカウントを跨いでSNSからKinesisDataFirehoseにメッセージ配信する構成を紹介します。

やりたいこと

構成図をMermaidで書いてNotionで描画しています:

flowchart LR

subgraph AWSアカウントB
 firehose(KinesisDataFirehose配信ストリーム)--バッファリングして出力-->s3(S3バケット)
end

subgraph AWSアカウントA
  sns(SNSトピック)--配信-->firehose
end

AWSアカウントAからAWSアカウントBに対してSNSを介したデータ連携を行い、アカウントB側のS3バケットにメッセージを溜めておきたいとします。この要件は SNS-->Lambda-->S3バケット でも実現可能ですが、Lambdaの代わりにFirehoseを使うことでアプリケーションコードを1行も書かずに済むようになります。

本記事ではこの構成をCDKで構築していきたいと思います。

環境

  • node 16.15.0
  • typescript 4.7.4
  • aws-cdk-lib 2.46.0
  • constructs 10.1.43

やり方

[AWSアカウントA] SNSトピックを作成

まず、AWSアカウントAでSNSトピックを作成します。また、AWSアカウントBのリソースが本トピックを購読できるようにアクセスポリシーを追加します。

import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"

export class SnsStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props)

    // SNSトピック
    const topic = new cdk.aws_sns.Topic(this, "topic", {})

    // AWSアカウントBのFirehoseを許可するアクセスポリシーを設定
    topic.addToResourcePolicy(
      new cdk.aws_iam.PolicyStatement({
        effect: cdk.aws_iam.Effect.ALLOW,
        principals: [
          new cdk.aws_iam.AccountPrincipal("{AWS_ACCOUNT_B_ID}"),
        ],
        actions: ["SNS:Subscribe"],
        resources: [topic.topicArn],
      })
    )
  }
}

上記デプロイするとアクセスポリシーに以下のJSONが設定されます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "0",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::{AWS_ACCOUNT_ID_B}:root"
      },
      "Action": "SNS:Subscribe",
      "Resource": "arn:aws:sns:ap-northeast-1:{AWS_ACCOUNT_ID_A}:{SNS_TOPIC_NAME}"
    }
  ]
}

[AWSアカウントB] SNSサブスクリプション+Firehoseストリーム+S3バケットを作成

続いて、AWSアカウントBでSNSサブスクリプション+Firehoseストリーム+S3バケット等々を作成します。

import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"

export class KinesisFirehoseStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props)

    const region = cdk.Stack.of(this).region
    const accountId = cdk.Stack.of(this).account

    // S3バケット
    const streamDestinationBucket = new cdk.aws_s3.Bucket(
      this,
      "streamDestinationBucket",
      {
        bucketName: `stream-destination-bucket-${region}-${accountId}`,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
        blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL,
        encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED,
      }
    )

    // KinesisFirehose配信ストリーム用ロググループ
    const deliveryStreamFailLogGroup = new cdk.aws_logs.LogGroup(
      this,
      "deliveryStreamFailLogGroup",
      {
        logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`,
      }
    )

    // KinesisFirehose配信ストリーム用ログストリーム
    const deliveryStreamLogStream = new cdk.aws_logs.LogStream(
      this,
      "deliveryStreamLogStream",
      {
        logGroup: deliveryStreamFailLogGroup,
        logStreamName: "logs",
      }
    )

    // KinesisFirehose配信ストリーム用ロール
    const deliveryStreamRole = new cdk.aws_iam.Role(
      this,
      "deliveryStreamRole",
      {
        assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"),
      }
    )
    deliveryStreamRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: [
          "kinesis:DescribeStream",
          "kinesis:GetShardIterator",
          "kinesis:GetRecords",
        ],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`],
      })
    )
    deliveryStreamRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
        ],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [
          streamDestinationBucket.bucketArn,
          `${streamDestinationBucket.bucketArn}/*`,
        ],
      })
    )
    deliveryStreamRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: ["logs:PutLogEvents"],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [
          `arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`,
        ],
      })
    )

    // KinesisFirehose配信ストリーム
    const deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(
      this,
      "deliveryStream",
      {
        deliveryStreamName: "deliveryStream",
        deliveryStreamType: "DirectPut",
        s3DestinationConfiguration: {
          bucketArn: streamDestinationBucket.bucketArn,
          roleArn: deliveryStreamRole.roleArn,
          //S3出力失敗時のログ記録設定
          cloudWatchLoggingOptions: {
            enabled: true,
            logGroupName: deliveryStreamFailLogGroup.logGroupName,
            logStreamName: "logs",
          },
          compressionFormat: "GZIP",
          prefix: "items",
          errorOutputPrefix: "errorOutput",
          bufferingHints: {
            intervalInSeconds: 60,
          },
        },
      }
    )
    deliveryStream.addDependsOn(
      streamDestinationBucket.node.defaultChild as cdk.CfnResource
    )
    deliveryStream.addDependsOn(
      deliveryStreamFailLogGroup.node.defaultChild as cdk.CfnResource
    )
    deliveryStream.addDependsOn(
      deliveryStreamLogStream.node.defaultChild as cdk.CfnResource
    )

    // SNSサブスクリプション用ロール
    const subscriptionRole = new cdk.aws_iam.Role(this, "subscriptionRole", {
      assumedBy: new cdk.aws_iam.ServicePrincipal("sns.amazonaws.com"),
    })
    subscriptionRole.addToPolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: ["firehose:PutRecord"],
        effect: cdk.aws_iam.Effect.ALLOW,
        resources: [deliveryStream.attrArn],
      })
    )

    // AWSアカウントAのSNSトピックARN
    const topicArn = "arn:aws:sns:ap-northeast-1:{AWS_ACCOUNT_A_ID}:{SNS_TOPIC_NAME}"

    // SNSサブスクリプション
    const subscription = new cdk.aws_sns.CfnSubscription(this, "subscription", {
      topicArn: topicArn, // AWSアカウントAのSNSトピックと紐付ける
      protocol: "firehose",
      endpoint: deliveryStream.attrArn,
      subscriptionRoleArn: subscriptionRole.roleArn,
    })
    subscription.addDependsOn(deliveryStream)
    subscription.addDependsOn(
      subscriptionRole.node.defaultChild as cdk.CfnResource
    )
  }
}

これでアカウントAのSNSからアカウントBのFirehoseにメッセージ配信できる構成となります。

デバッグ時はSNS配信ステータスのログ記録をONにすると作業しやすい

検証時はSNSトピックの「配信ステータスのログ記録」をONにするとメッセージ配信の成否、そして失敗時はその理由を確認できるようになるため、非常に作業しやすくなります。

ただし、CloudFormation/CDKではまだこの設定を行うことができないようです(2022年11月)。

そのためAWSマネコンから設定します。

Amazon SNSのSMS配信のロギングを有効にしてみた | DevelopersIO

SNSトピックの編集ボタンを押下し、

  • 配信ステータスのログ記録
    • これらのプロトコルの配信ステータスをログに記録します:「Amazon Kinesis Data Firehose」にチェック
    • 成功サンプルレート: 「100」%
    • サービスロール: 新しいサービスロールの作成

と設定して保存します。

あとはSNSトピックでテスト用のメッセージを発行すると、CloudWatchロググループ sns/ap-northeast-1/{AWS_ACCOUNT_ID}/{SNS_TOPIC_NAME} にログ出力がなされるようになります。

以上、参考になれば幸いです。

参考