Kinesis Data Firehose をゼロからざっくり理解する

Kinesis Data Firehose をゼロから勉強してみたので、学んだことをまとめました。
2020.06.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

コンサル部@大阪オフィスのYui(@MayForBlue)です。

最近、CloudWatch Logs から S3 へのログデータの転送に Kinesis Data Firehose を初めて使ってみたので、勉強したことをまとめてみました。
ドキュメントを読んだり概要を学んだだけではなかなか理解しにくかったので、実際に触ってみた内容が中心となります。

Kinesis Data Firehose とは

リアルタイムのストリーミングデータをS3やRedShift、Elasticsearchなどのデータストア、分析ツールに配信するAWSのマネージドサービス。

複雑な設定をすることなく、データ送信元から送信先へのデータの転送を実現することができます。

イメージ

Kinesis Data Firehose は、Kinesis ストリーミングデータプラットフォームの一部で、他に、Kinesis Data Streams、Kinesis ビデオストリーム、Amazon Kinesis Data Analytics があります。

ストリーミングデータとは

大量のデータソースによって継続的に生成されるデータのこと(ログ、IoTデータ、etc...)

主な構成要素

  • 配信ストリーム(Delivery Stream)
    • データ配信の単位
  • データプロデューサー
    • Kinesis Data Firehose へのデータの送信元
  • バッファサイズ/バッファ間隔
    • Kinesis Data Firehose が送信元から送信先にデータを配信するまでのバッファのサイズ、期間

配信ストリームのイメージ

データ変換

送信元のデータを変換してから送信先に配信したい場合、オプションでLambda関数を呼び出すことが可能です。 Amazon Kinesis Data Firehose のデータ変換

配信エラー時のログ

配信ストリームの作成時にエラーログを有効化することで、配信エラー時のログが CloudWatch Logs に送信されます。
エラーログはKinesis Data Firehose / CloudWatch Logs のマネジメントコンソールで確認することができます。
CloudWatch Logs を使用した Kinesis Data Firehose のモニタリング

料金体系

Kinesis Data Firehose では、取り込まれたデータの量に基づいて料金が発生します。
東京リージョンの場合の料金は以下のようになります。

Amazon Kinesis Data Firehose 料金

実際に触ってみた

RDS でエラーログを有効化し、CloudWatch Logs に保存されたログを Kinesis Data Firehose 経由で S3 に保存してみました。

参考にしたドキュメントは以下です。
CloudWatch Logs サブスクリプションフィルタの使用 - Amazon Kinesis Data Firehose のサブスクリプションフィルタ

前提条件

  • 構築済のRDSでエラーログが有効化されていること(今回はMySQLを使用しました。)
  • CloudWatch Logs にエラーログが送信されていることが確認済であること
  • データ送信先のS3バケットが作成済であること
  • AWS CLI が使用できること

RDSでエラーログが有効であることを確認

CloudWatch Logs で対象のロググループが作成され、ログが送信されていることを確認

Kinesis Data Firehose のIAMロールを作成する

まずは Kinesis Data Firehose が S3 にデータを置くためのIAMロールを作成します。 任意のフォルダに下記を記述したファイルを用意します。(例:~/TrustPolicyForFirehose.json

{
  "Statement": {
    "Effect": "Allow",
    "Principal": { "Service": "firehose.amazonaws.com" },
    "Action": "sts:AssumeRole",
    "Condition": { "StringEquals": { "sts:ExternalId":"【アカウントID】" } }
  }
}

上記の信頼ポリシーのファイルを使用して、AWS CLI でIAMロールを作成します。

aws iam create-role \
      --role-name FirehosetoS3Role \
      --assume-role-policy-document file://~/TrustPolicyForFirehose.json

次に、作成したロールに権限を与えるために、下記を記述した権限ポリシーファイルを用意します。(例:~/PermissionsForFirehose.json

{
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [ 
          "s3:AbortMultipartUpload", 
          "s3:GetBucketLocation", 
          "s3:GetObject", 
          "s3:ListBucket", 
          "s3:ListBucketMultipartUploads", 
          "s3:PutObject" ],
      "Resource": [ 
          "arn:aws:s3:::my-bucket", 
          "arn:aws:s3:::my-bucket/*" ]
    }
  ]
}

以下のコマンドで、権限ポリシーをアタッチします。

aws iam put-role-policy --role-name FirehosetoS3Role --policy-name Permissions-Policy-For-Firehose --policy-document file://~/PermissionsForFirehose.json

Kinesis Data Firehose の配信ストリームを作成する

Kinesis Data Firehose の配信ストリームを作成します。

Kinesis のダッシュボードに移動し、左側ペインのData Firehoseから「Create delivery stream」をクリックします。

Delivery stream name に任意の配信ストリーム名を入力します。
他はデフォルト値のままで「Next」をクリックします。

次の画面でデータ変換を設定できるのですが、今回は使用せずに「Next」をクリックします。

配信先にAmazon S3を選択し、保存先のバケットを選択します。

今回はプレフィックスの配下にログを保存したいので、Prefixにerror/を入力しました。
「Next」をクリックします。

次に、以下を設定します。今回は全てデフォルト値としました。

  • S3に保存するバッファサイズ/期間
  • 圧縮するかどうか
  • 暗号化するかどうか
  • S3への配信が失敗したときにエラーロギングするかどうか
  • タグ付け

最後に、Kinesis Data Firehose にアタッチするIAMロールを選択します。
「Create new or choose」をクリックします。

先ほど作成したロール、ポリシー名を選択して、「許可」をクリックします。

設定画面に戻って、IAMロールが選択できていることが確認できたら「Next」をクリックします。

設定を確認して問題なければ「Create delivery stream」をクリックします。

CloudWatch Logs のIAMロールを作成する

次に、CloudWatch Logs が Kinesis Data Firehose にデータを送信するためのIAMロールを作成します。 任意のフォルダに下記を記述したファイルを用意します。(例:~/TrustPolicyForCWL.json

{
  "Statement": {
    "Effect": "Allow",
    "Principal": { "Service": "logs.【リージョンID】.amazonaws.com" },
    "Action": "sts:AssumeRole"
  }
}

上記の信頼ポリシーのファイルを使用して、AWS CLI でIAMロールを作成します。

aws iam create-role \
      --role-name CWLtoKinesisFirehoseRole \
      --assume-role-policy-document file://~/TrustPolicyForCWL.json

次に、作成したロールに権限を与えるために、下記を記述した権限ポリシーファイルを用意します。(例:~/PermissionsForCWL.json

{
    "Statement":[
      {
        "Effect":"Allow",
        "Action":["firehose:*"],
        "Resource":["arn:aws:firehose:【リージョンID】:【アカウントID】:*"]
      },
      {
        "Effect":"Allow",
        "Action":["iam:PassRole"],
        "Resource":["arn:aws:iam::【アカウントID】:role/CWLtoKinesisFirehoseRole"]
      }
    ]
}

以下のコマンドで、権限ポリシーをアタッチします。

aws iam put-role-policy --role-name CWLtoKinesisFirehoseRole --policy-name Permissions-Policy-For-CWL --policy-document file://~/PermissionsForCWL.json

CloudWatch Logs から Kinesis Data Firehose にデータを送信するためのサブスクリプションフィルタを作成する

次に、以下のコマンドでCloudWatch Logs サブスクリプションフィルタを作成します。
サブスクリプションフィルタの作成はマネジメントコンソールからは行えず、AWS CLIからのみ可能な操作になります。

aws logs put-subscription-filter \
    --log-group-name "CloudTrail" \
    --filter-name "Destination" \
    --filter-pattern "" \
    --destination-arn "【Kinesis Data Firehose の配信ストリームのARN】" \
    --role-arn "arn:aws:iam::【アカウントID】:role/CWLtoKinesisFirehoseRole"

S3に配信されていることを確認する

全ての設定が終わり、しばらく待つと、指定したS3バケットのプレフィックス配下にファイルが保存されていることが確認できました!

このデータはgzip形式で圧縮されているので、.gzの拡張子をつけた状態でダウンロードし、そのファイルを解凍することで中身を確認することができます。
中身をのぞくとログが保存されていることが確認できました。
(以下抜粋)

{"messageType":"DATA_MESSAGE","owner":"xxxxxxx","logGroup":"/aws/rds/instance/database-2/error","logStream":"database-2","subscriptionFilters":["Destination"],"logEvents":[{"id":"35522925049899681854890615869074390491557551650244853760","timestamp":1592903050264,"message":"2020-06-23T09:04:10.264534Z 0 [Note] Giving 1 client threads a chance to die gracefully"},
{"id":"35522925049899681854890615869074390491557551650244853761","timestamp":1592903050264,"message":"2020-06-23T09:04:10.264557Z 0 [Note] Shutting down slave threads"},
{"id":"35522925094501172251951862152145827036854274662205685762","timestamp":1592903052264,"message":"2020-06-23T09:04:12.264723Z 0 [Note] Forcefully disconnecting 1 remaining clients"},
{"id":"35522925094501172251951862152145827036854274662205685764","timestamp":1592903052264,"message":"2020-06-23T09:04:12.264810Z 0 [Note] Event Scheduler: Purging the queue. 0 events"},
{"id":"35522925094523472997150392775287362755126923023711666181","timestamp":1592903052265,"message":"2020-06-23T09:04:12.265043Z 0 [Note] Binlog end"},
{"id":"35522925094523472997150392775287362755126923023711666182","timestamp":1592903052265,"message":"2020-06-23T09:04:12.265598Z 0 [Note] Shutting down plugin 'RDS_EVENTS_THREADS_WAITS_CURRENT'"},
{"id":"35522925094523472997150392775287362755126923023711666183","timestamp":1592903052265,"message":"2020-06-23T09:04:12.265607Z 0 [Note] Shutting down plugin 'RDS_PROCESSLIST'"},

さいごに

Kinesis Data Firehose の概要と実際に触ってみた手順を書いてみました。
かいつまんだ内容にはなりますが、これから勉強してみたいと思っている方の参考になれば幸いです。

以上、コンサル部@大阪オフィスのYui(@MayForBlue)でしたっ(`・ω・´)

参考リンク

Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehose とは?

CloudWatch Logs サブスクリプションフィルタの使用

今回の検証手順について、以下のブログでも詳細に解説されているので、参考にしていただければと思います。