[AWS CDK] Kinesis Data FirehoseのデータのS3出力失敗時のログを記録してみた

2021.06.29

こんにちは、CX事業本部の若槻です。

Amazon Kinesis Data Firehoseでは、CloudWatch Logsを使用してデータ配信失敗時にエラーログの記録をすることができます。

今回は、AWS CDKでAmazon Kinesis Data FirehoseのデータのS3出力失敗時のログを記録する仕組みを作成して、実際に動作を確認してみました。

CDKスタック

CDKスタックは下記を使用します。

lib/kinesis-data-firehose-to-s3-stack.ts

import * as cdk from "@aws-cdk/core";
import * as iam from "@aws-cdk/aws-iam";
import * as s3 from "@aws-cdk/aws-s3";
import * as firehose from "@aws-cdk/aws-kinesisfirehose";
import * as logs from "@aws-cdk/aws-logs";

export class KinesisDataFirehoseToS3Stack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const region = cdk.Stack.of(this).region;
    const accountId = cdk.Stack.of(this).account;

    const streamDestinationBucket = new s3.Bucket(
      this,
      "streamDestinationBucket",
      {
        bucketName: `stream-destination-bucket-${region}-${accountId}`,
      }
    );

    const deliveryStreamFailLogGroup = new logs.LogGroup(
      this,
      "deliveryStreamFailLogGroup",
      {
        logGroupName: `/aws/kinesisfirehose/sample-stream-fail-log`,
      }
    );

    //LogStreamを予め作成しないとイベントログが出力されない
    new logs.LogStream(this, "deliveryStreamLogStream", {
      logGroup: deliveryStreamFailLogGroup,
      logStreamName: "logs",
    });

    const deliveryStreamRole = new iam.Role(this, "deliveryStreamRole", {
      assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
    });

    deliveryStreamRole.addToPolicy(
      new iam.PolicyStatement({
        actions: [
          "kinesis:DescribeStream",
          "kinesis:GetShardIterator",
          "kinesis:GetRecords",
        ],
        effect: iam.Effect.ALLOW,
        resources: [`arn:aws:kinesis:${region}:${accountId}:stream/*`],
      })
    );

    deliveryStreamRole.addToPolicy(
      new iam.PolicyStatement({
        actions: [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
        ],
        effect: iam.Effect.ALLOW,
        resources: [
          streamDestinationBucket.bucketArn,
          `${streamDestinationBucket.bucketArn}/*`,
        ],
      })
    );

    deliveryStreamRole.addToPolicy(
      new iam.PolicyStatement({
        actions: ["logs:PutLogEvents"],
        effect: iam.Effect.ALLOW,
        resources: [
          `arn:aws:logs:${region}:${accountId}:log-group:/aws/kinesisfirehose/*`,
        ],
      })
    );

    new firehose.CfnDeliveryStream(this, "deliveryStream", {
      deliveryStreamName: "deliveryStream",
      deliveryStreamType: "DirectPut",
      s3DestinationConfiguration: {
        bucketArn: streamDestinationBucket.bucketArn,
        roleArn: deliveryStreamRole.roleArn,
        //S3出力失敗時のログ記録設定
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: deliveryStreamFailLogGroup.logGroupName,
          logStreamName: "logs",
        },
        compressionFormat: "GZIP",
        prefix: "logs/",
        errorOutputPrefix: "errorOutput",
        bufferingHints: {
          intervalInSeconds: 60,
        },
      },
    });
  }
}

Kinesis Data Firehoseでは、配信ストリームのS3出力失敗時のログ記録設定はCloudWatchLoggingOptionsを使用する必要があるので設定しています。

動作確認

正常時の動作

配信ストリームにレコードをPutします。

% aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --record '{"Data":"SGVsbG8gd29ybGQ="}'

レコードがS3バケットに出力されました。

% aws s3 ls stream-destination-bucket-${REGION}-${ACCOUNT_ID} --recursive
2021-06-29 23:28:39         31 logs/2021/06/29/14/deliveryStream-1-2021-06-29-14-27-36-1107f8cb-4bda-46a5-95be-0ed2bec84ffa.gz

CloudWatch LogsのStreamには何も出力されていません。

失敗時の動作

CDKスタックのコードで配信ストリームがS3バケットにPutするポリシーの追加をコメントアウトしてCDKデプロイしRoleから削除します。

lib/kinesis-data-firehose-to-s3-stack.ts

    /*
    deliveryStreamRole.addToPolicy(
      new iam.PolicyStatement({
        actions: [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
        ],
        effect: iam.Effect.ALLOW,
        resources: [
          streamDestinationBucket.bucketArn,
          `${streamDestinationBucket.bucketArn}/*`,
        ],
      })
    );
    */

この状態で配信ストリームにレコードをPutします。

% aws firehose put-record \
  --delivery-stream-name deliveryStream \
  --record '{"Data":"SGVsbG8gd29ybGQ="}'

数分するとLogStreamに下記のようなエラーログのイベントが作成されました。

FirehoseがS3にアクセスを拒否されているためエラーとなったことが分かりますね。

{
    "deliveryStreamARN": "arn:aws:firehose:ap-northeast-1:XXXXXXXXXXXX:deliverystream/deliveryStream",
    "destination": "arn:aws:s3:::stream-destination-bucket-ap-northeast-1-XXXXXXXXXXXX",
    "deliveryStreamVersionId": 1,
    "message": "Access was denied. Ensure that the trust policy for the provided IAM role allows Firehose to assume the role, and the access policy allows access to the S3 bucket.",
    "errorCode": "S3.AccessDenied"
}

Kinesis Data Firehoseの配信ストリームは、配信が失敗すると24時間のあいだリトライを行い続けます。

よって数分待つと2つ目のエラーログが出力されました。

ここでCDKスタックのコードを元に戻し再度デプロイして権限を戻してみます。

すると配信ストリーム内に滞留していたレコードが正常にS3に出力されました。

% aws s3 ls stream-destination-bucket-${REGION}-${ACCOUNT_ID} --recursive
2021-06-29 23:28:39         31 logs/2021/06/29/14/deliveryStream-1-2021-06-29-14-27-36-1107f8cb-4bda-46a5-95be-0ed2bec84ffa.gz
2021-06-30 00:03:56         31 logs/2021/06/29/14/deliveryStream-1-2021-06-29-14-38-45-0dd923b0-54b7-47f3-8479-670a0637caa4.gz

おわりに

AWS CDKでAmazon Kinesis Data FirehoseのデータのS3出力失敗時のログを記録する仕組みを作成して実際に動作を確認してみました。

はじめCloudWatchLoggingOptionsの用途がS3出力失敗時に使われるものだと分からなかったですが、確認が出来て良かったです。

参考

以上