この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の若槻です。
Amazon Kinesis Data Firehoseでは、CloudWatch Logsを使用してデータ配信失敗時にエラーログの記録をすることができます。
今回は、AWS CDKでAmazon Kinesis Data FirehoseのデータのS3出力失敗時のログを記録する仕組みを作成して、実際に動作を確認してみました。
CDKスタック
CDKスタックは下記を使用します。
lib/kinesis-data-firehose-to-s3-stack.ts
import * as cdk from "@aws-cdk/core";
import * as iam from "@aws-cdk/aws-iam";
import * as s3 from "@aws-cdk/aws-s3";
import * as firehose from "@aws-cdk/aws-kinesisfirehose";
import * as logs from "@aws-cdk/aws-logs";
export class KinesisDataFirehoseToS3Stack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const region = cdk.Stack.of(this).region;
const accountId = cdk.Stack.of(this).account;
const streamDestinationBucket = new s3.Bucket(
this,
"streamDestinationBucket",
{
bucketName: `stream-destination-bucket-${region}-${accountId}`,
}
);
const deliveryStreamFailLogGroup = new logs.LogGroup(
this,
"deliveryStreamFailLogGroup",
{
logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`,
}
);
//LogStreamを予め作成しないとイベントログが出力されない
new logs.LogStream(this, "deliveryStreamLogStream", {
logGroup: deliveryStreamFailLogGroup,
logStreamName: "logs",
});
const deliveryStreamRole = new iam.Role(this, "deliveryStreamRole", {
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});
deliveryStreamRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
],
effect: iam.Effect.ALLOW,
resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`],
})
);
deliveryStreamRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
],
effect: iam.Effect.ALLOW,
resources: [
streamDestinationBucket.bucketArn,
`${streamDestinationBucket.bucketArn}/*`,
],
})
);
deliveryStreamRole.addToPolicy(
new iam.PolicyStatement({
actions: ["logs:PutLogEvents"],
effect: iam.Effect.ALLOW,
resources: [
`arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`,
],
})
);
new firehose.CfnDeliveryStream(this, "deliveryStream", {
deliveryStreamName: "deliveryStream",
deliveryStreamType: "DirectPut",
s3DestinationConfiguration: {
bucketArn: streamDestinationBucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
//S3出力失敗時のログ記録設定
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: deliveryStreamFailLogGroup.logGroupName,
logStreamName: "logs",
},
compressionFormat: "GZIP",
prefix: "logs/",
errorOutputPrefix: "errorOutput",
bufferingHints: {
intervalInSeconds: 60,
},
},
});
}
}
Kinesis Data Firehoseでは、配信ストリームのS3出力失敗時のログ記録設定はCloudWatchLoggingOptionsを使用する必要があるので設定しています。
動作確認
正常時の動作
配信ストリームにレコードをPutします。
% aws firehose put-record \
--delivery-stream-name deliveryStream \
--record '{"Data":"SGVsbG8gd29ybGQ="}'
レコードがS3バケットに出力されました。
% aws s3 ls stream-destination-bucket-${REGION}-${ACCOUNT_ID} --recursive
2021-06-29 23:28:39 31 logs/2021/06/29/14/deliveryStream-1-2021-06-29-14-27-36-1107f8cb-4bda-46a5-95be-0ed2bec84ffa.gz
CloudWatch LogsのStreamには何も出力されていません。
失敗時の動作
CDKスタックのコードで配信ストリームがS3バケットにPutするポリシーの追加をコメントアウトしてCDKデプロイしRoleから削除します。
lib/kinesis-data-firehose-to-s3-stack.ts
/*
deliveryStreamRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
],
effect: iam.Effect.ALLOW,
resources: [
streamDestinationBucket.bucketArn,
`${streamDestinationBucket.bucketArn}/*`,
],
})
);
*/
この状態で配信ストリームにレコードをPutします。
% aws firehose put-record \
--delivery-stream-name deliveryStream \
--record '{"Data":"SGVsbG8gd29ybGQ="}'
数分するとLogStreamに下記のようなエラーログのイベントが作成されました。
FirehoseがS3にアクセスを拒否されているためエラーとなったことが分かりますね。
{
"deliveryStreamARN": "arn:aws:firehose:ap-northeast-1:XXXXXXXXXXXX:deliverystream/deliveryStream",
"destination": "arn:aws:s3:::stream-destination-bucket-ap-northeast-1-XXXXXXXXXXXX",
"deliveryStreamVersionId": 1,
"message": "Access was denied. Ensure that the trust policy for the provided IAM role allows Firehose to assume the role, and the access policy allows access to the S3 bucket.",
"errorCode": "S3.AccessDenied"
}
Kinesis Data Firehoseの配信ストリームは、配信が失敗すると24時間のあいだリトライを行い続けます。
よって数分待つと2つ目のエラーログが出力されました。
ここでCDKスタックのコードを元に戻し再度デプロイして権限を戻してみます。
すると配信ストリーム内に滞留していたレコードが正常にS3に出力されました。
% aws s3 ls stream-destination-bucket-${REGION}-${ACCOUNT_ID} --recursive
2021-06-29 23:28:39 31 logs/2021/06/29/14/deliveryStream-1-2021-06-29-14-27-36-1107f8cb-4bda-46a5-95be-0ed2bec84ffa.gz
2021-06-30 00:03:56 31 logs/2021/06/29/14/deliveryStream-1-2021-06-29-14-38-45-0dd923b0-54b7-47f3-8479-670a0637caa4.gz
おわりに
AWS CDKでAmazon Kinesis Data FirehoseのデータのS3出力失敗時のログを記録する仕組みを作成して実際に動作を確認してみました。
はじめCloudWatchLoggingOptions
の用途がS3出力失敗時に使われるものだと分からなかったですが、確認が出来て良かったです。
参考
- CloudWatch Logs を使用した Kinesis Data Firehose モニタリング - Amazon Kinesis Data Firehose
- put-record — AWS CLI 1.19.101 Command Reference
- Amazon Kinesis Data Firehose トラブルシューティング - Amazon Kinesis Data Firehose
- データ配信の失敗の処理 | Amazon Kinesis Data Firehose データ配信 - Amazon Kinesis Data Firehose
- よくある質問 - Amazon Kinesis Data Firehose | AWS
以上