AWSアカウントを跨いでSNSからKinesisDataFirehoseにメッセージ配信する構成を紹介します。
やりたいこと
構成図をMermaidで書いてNotionで描画しています:
flowchart LR
subgraph AWSアカウントB
firehose(KinesisDataFirehose配信ストリーム)--バッファリングして出力-->s3(S3バケット)
end
subgraph AWSアカウントA
sns(SNSトピック)--配信-->firehose
end
AWSアカウントAからAWSアカウントBに対してSNSを介したデータ連携を行い、アカウントB側のS3バケットにメッセージを溜めておきたいとします。この要件は SNS-->Lambda-->S3バケット
でも実現可能ですが、Lambdaの代わりにFirehoseを使うことでアプリケーションコードを1行も書かずに済むようになります。
本記事ではこの構成をCDKで構築していきたいと思います。
環境
- node 16.15.0
- typescript 4.7.4
- aws-cdk-lib 2.46.0
- constructs 10.1.43
やり方
[AWSアカウントA] SNSトピックを作成
まず、AWSアカウントAでSNSトピックを作成します。また、AWSアカウントBのリソースが本トピックを購読できるようにアクセスポリシーを追加します。
import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"
export class SnsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props)
// SNSトピック
const topic = new cdk.aws_sns.Topic(this, "topic", {})
// AWSアカウントBのFirehoseを許可するアクセスポリシーを設定
topic.addToResourcePolicy(
new cdk.aws_iam.PolicyStatement({
effect: cdk.aws_iam.Effect.ALLOW,
principals: [
new cdk.aws_iam.AccountPrincipal("{AWS_ACCOUNT_B_ID}"),
],
actions: ["SNS:Subscribe"],
resources: [topic.topicArn],
})
)
}
}
上記デプロイするとアクセスポリシーに以下のJSONが設定されます。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "0",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{AWS_ACCOUNT_ID_B}:root"
},
"Action": "SNS:Subscribe",
"Resource": "arn:aws:sns:ap-northeast-1:{AWS_ACCOUNT_ID_A}:{SNS_TOPIC_NAME}"
}
]
}
[AWSアカウントB] SNSサブスクリプション+Firehoseストリーム+S3バケットを作成
続いて、AWSアカウントBでSNSサブスクリプション+Firehoseストリーム+S3バケット等々を作成します。
import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"
export class KinesisFirehoseStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props)
const region = cdk.Stack.of(this).region
const accountId = cdk.Stack.of(this).account
// S3バケット
const streamDestinationBucket = new cdk.aws_s3.Bucket(
this,
"streamDestinationBucket",
{
bucketName: `stream-destination-bucket-${region}-${accountId}`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: cdk.aws_s3.BlockPublicAccess.BLOCK_ALL,
encryption: cdk.aws_s3.BucketEncryption.S3_MANAGED,
}
)
// KinesisFirehose配信ストリーム用ロググループ
const deliveryStreamFailLogGroup = new cdk.aws_logs.LogGroup(
this,
"deliveryStreamFailLogGroup",
{
logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`,
}
)
// KinesisFirehose配信ストリーム用ログストリーム
const deliveryStreamLogStream = new cdk.aws_logs.LogStream(
this,
"deliveryStreamLogStream",
{
logGroup: deliveryStreamFailLogGroup,
logStreamName: "logs",
}
)
// KinesisFirehose配信ストリーム用ロール
const deliveryStreamRole = new cdk.aws_iam.Role(
this,
"deliveryStreamRole",
{
assumedBy: new cdk.aws_iam.ServicePrincipal("firehose.amazonaws.com"),
}
)
deliveryStreamRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`],
})
)
deliveryStreamRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [
streamDestinationBucket.bucketArn,
`${streamDestinationBucket.bucketArn}/*`,
],
})
)
deliveryStreamRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: ["logs:PutLogEvents"],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [
`arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`,
],
})
)
// KinesisFirehose配信ストリーム
const deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(
this,
"deliveryStream",
{
deliveryStreamName: "deliveryStream",
deliveryStreamType: "DirectPut",
s3DestinationConfiguration: {
bucketArn: streamDestinationBucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
//S3出力失敗時のログ記録設定
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: deliveryStreamFailLogGroup.logGroupName,
logStreamName: "logs",
},
compressionFormat: "GZIP",
prefix: "items",
errorOutputPrefix: "errorOutput",
bufferingHints: {
intervalInSeconds: 60,
},
},
}
)
deliveryStream.addDependsOn(
streamDestinationBucket.node.defaultChild as cdk.CfnResource
)
deliveryStream.addDependsOn(
deliveryStreamFailLogGroup.node.defaultChild as cdk.CfnResource
)
deliveryStream.addDependsOn(
deliveryStreamLogStream.node.defaultChild as cdk.CfnResource
)
// SNSサブスクリプション用ロール
const subscriptionRole = new cdk.aws_iam.Role(this, "subscriptionRole", {
assumedBy: new cdk.aws_iam.ServicePrincipal("sns.amazonaws.com"),
})
subscriptionRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
actions: ["firehose:PutRecord"],
effect: cdk.aws_iam.Effect.ALLOW,
resources: [deliveryStream.attrArn],
})
)
// AWSアカウントAのSNSトピックARN
const topicArn = "arn:aws:sns:ap-northeast-1:{AWS_ACCOUNT_A_ID}:{SNS_TOPIC_NAME}"
// SNSサブスクリプション
const subscription = new cdk.aws_sns.CfnSubscription(this, "subscription", {
topicArn: topicArn, // AWSアカウントAのSNSトピックと紐付ける
protocol: "firehose",
endpoint: deliveryStream.attrArn,
subscriptionRoleArn: subscriptionRole.roleArn,
})
subscription.addDependsOn(deliveryStream)
subscription.addDependsOn(
subscriptionRole.node.defaultChild as cdk.CfnResource
)
}
}
これでアカウントAのSNSからアカウントBのFirehoseにメッセージ配信できる構成となります。
デバッグ時はSNS配信ステータスのログ記録をONにすると作業しやすい
検証時はSNSトピックの「配信ステータスのログ記録」をONにするとメッセージ配信の成否、そして失敗時はその理由を確認できるようになるため、非常に作業しやすくなります。
ただし、CloudFormation/CDKではまだこの設定を行うことができないようです(2022年11月)。
- aws-sns: Support setting of delivery status logging with the CDK · Issue #21971 · aws/aws-cdk
- [sns] add sns service trust to key when subscribing to an encrypted queue · Issue #2504 · aws/aws-cdk
- AWS::SNS::Topic - DeliveryStatusLogging (new property) · Issue #66 · aws-cloudformation/cloudformation-coverage-roadmap
そのためAWSマネコンから設定します。
Amazon SNSのSMS配信のロギングを有効にしてみた | DevelopersIO
SNSトピックの編集ボタンを押下し、
- 配信ステータスのログ記録
- これらのプロトコルの配信ステータスをログに記録します:「Amazon Kinesis Data Firehose」にチェック
- 成功サンプルレート: 「100」%
- サービスロール: 新しいサービスロールの作成
と設定して保存します。
あとはSNSトピックでテスト用のメッセージを発行すると、CloudWatchロググループ sns/ap-northeast-1/{AWS_ACCOUNT_ID}/{SNS_TOPIC_NAME}
にログ出力がなされるようになります。
以上、参考になれば幸いです。
参考
- [AWS CDK] Kinesis Data FirehoseのデータのS3出力失敗時のログを記録してみた | DevelopersIO
- [CDK] SNSメッセージをKinesisFirehoseでバッファリングしてS3へ出力する
- クロスアカウントなLambdaをSNS TopicにSubscribeする | DevelopersIO
- How to subscribe Kinesis Data Firehose to SNS in other account - DEV Community 👩💻👨💻
- チュートリアル: Amazon Simple Notification Service での AWS Lambda の使用 - AWS Lambda