[アップデート]Amazon Data Firehose から S3 Tables へ直接データ配信できるようになりました

[アップデート]Amazon Data Firehose から S3 Tables へ直接データ配信できるようになりました

Firehoseの設定時にリソースリンクを指定することに気づくのにめちゃくちゃ時間がかかりました
Clock Icon2025.03.19

お疲れさまです。とーちです。

Amazon Data Firehose がリアルタイムのストリーミングデータを Amazon S3 テーブルに配信できるようになったというアップデートがあったので試してみました。

https://aws.amazon.com/about-aws/whats-new/2025/03/amazon-data-firehose-real-time-streaming-data-s3-tables/

とりあえずまとめ

  • Amazon Data Firehose がリアルタイムのストリーミングデータを S3 Tables に配信できるようになりました
  • 受信レコードの内容に基づいて、データを異なる S3 テーブルに動的にルーティングできます
  • Firehose と S3 Tables の連携には、Glue データカタログのリソースリンクとアクセス権限の設定が必要です

Firehose から S3 Tables への配信について

今回のアップデートにより、Firehose では以下の宛先に配信できるようになりました

  1. S3 Tables(Amazon S3 テーブルバケット内のテーブル)
  2. セルフマネージドな Apache Iceberg テーブル(通常の S3 バケットを使って AWS Glue データカタログで管理)

Firehose のソースとしては、Amazon Kinesis Data Streams、Amazon MSK、Direct PUT(アプリケーションからの直接書き込み)の他、AWS WAF ウェブ ACL ログ、Amazon VPC フローログなどからも直接 Firehose に書き込めるようになっています。これらのいずれのソースでも S3 Tables への配信をサポートしています。

また、What's New によると「受信レコードの内容に基づいて、レコードをさまざまな Amazon S3 テーブルにルーティングできる」とのことなので、一つのデータソースから複数の S3 テーブルへの配信が可能です。

前提条件や注意事項

公式ドキュメントを見ると以下のような注意事項がありました。他の注意事項についても記載があるので、ぜひ公式ドキュメントをご一読ください。

列について

  • Firehose は多層構造を持つ JSON データを処理する際に、基本的に最上位レベルのフィールドを認識します
  • ネストされたフィールドは対応するテーブル側で適切なデータ型(struct型またはmap型)が定義されているときのみ処理可能です

例えば以下のようなデータが input として入ってきた場合

{
   "version":"2016-04-01",
   "deviceId":"<solution_unique_device_id>",
   "sensorId":"<device_sensor_id>",
   "timestamp":"2024-01-11T20:42:45.000Z",
   "value":"<actual_value>",
   "position":{
      "x":143.595901,
      "y":476.399628,
      "z":0.24234876
   }
}

Firehose で処理するには、Iceberg テーブル側で position カラムを STRUCT<x:DOUBLE, y:DOUBLE, z:DOUBLE> のような型で定義する必要があります。

レコードフォーマットについて

レコードごとに 1 つの JSON オブジェクト:1 つの Firehose レコードで送信できる JSON オブジェクトは 1 つだけです。レコード内で複数の JSON オブジェクトを集約して送信すると、Firehose はエラーになります。

やってみる

それでは Firehose を使って S3 Tables に配信してみようと思います。

1. S3 Tables テーブルバケットとテーブルの作成

まずは配信先となるテーブルバケットを作成します。詳細については以下の記事をご参照ください。

https://dev.classmethod.jp/articles/aws-s3-tables-getting-started-tutorial-with-console/

今回は、上記の記事で作成した my-s3-table-bucket という名前のテーブルバケットと、my_s3_namespaceという名前空間をそのまま使います。

テーブルは以下のクエリでfirehosetestというテーブルを作成しました

CREATE TABLE `my_s3_namespace`.firehosetest (
sale_date date, 
product_category string, 
sales_amount double)
PARTITIONED BY (month(sale_date))
TBLPROPERTIES ('table_type' = 'iceberg')

2. 配信失敗データ格納用の S3 バケットの作成

Firehose ではデータ配信に失敗したとき、またはすべての配信データについて、配信先とは別に用意したS3 バケットにデータを配信できます。

このための S3 バケットを作成しておきます。このバケットは通常のバケットで特に特殊な設定も不要なので作り方は割愛します。

3. S3 テーブルを宛先として使用するための Firehose のロールを作成する

Firehose には、AWS Glue テーブルにアクセスし、S3 テーブルにデータを書き込むための権限をもたせる必要があります。

そのための IAM ロールを事前に作成しておきます。今回はfirehose-s3-tables-role という名前で IAM ロールを作成しました。

具体的なポリシーについては以下をご参照ください。信頼ポリシーの書き方等も記載があります。

今回は、少し緩めに以下のポリシーで作成しました。実際に使用する場合には最小権限の原則に則り、もう少し厳しくすることを推奨します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3TableAccessViaGlueFederation",
      "Effect": "Allow",
      "Action": [
        "glue:GetTable",
        "glue:GetDatabase",
        "glue:UpdateTable"
      ],
      "Resource": [
        "arn:aws:glue:ap-northeast-1:account-id:catalog/s3tablescatalog/*",
        "arn:aws:glue:ap-northeast-1:account-id:catalog/s3tablescatalog",
        "arn:aws:glue:ap-northeast-1:account-id:catalog",
        "arn:aws:glue:ap-northeast-1:account-id:database/*",
        "arn:aws:glue:ap-northeast-1:account-id:table/*/*"
      ]
    },
    {
      "Sid": "S3DeliveryErrorBucketPermission",
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::*",
        "arn:aws:s3:::*/*"
      ]
    },
    {
      "Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation",
      "Effect": "Allow",
      "Action": [
        "lakeformation:GetDataAccess"
      ],
      "Resource": "*"
    },
    {
      "Sid": "LoggingInCloudWatch",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:ap-northeast-1:account-id:log-group:*:log-stream:log-*"
      ]
    }
  ]
}

4. リソースリンクの作成

テーブルにアクセスするには、Amazon Data Firehose にテーブルの名前空間をターゲットとするリソースリンクが必要とのこと。

リソースリンクってなんぞや?という話ですが、Glue データカタログの1オブジェクトで、Glue データカタログ上で管理するデータベースやテーブルに別の名前(エイリアス)を割り当てるためのものだそうです。詳細は以下の公式ドキュメントをご参照ください。

https://docs.aws.amazon.com/lake-formation/latest/dg/resource-links-about.html

Firehose で S3 Tables を使用するにあたり重要な点として覚えておきたいのが、Firehose の設定を行う際に指定する Database 名はここで作成したリソースリンク名になるということです。(私はこれに気付かずに結構な時間を溶かしました)

公式ドキュメントにも以下のように記載があります。

  • https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-prereq.html
  • Firehose は、AWS Glue データカタログのデフォルトカタログに登録されているデータベース内のテーブルにデータをストリーミングします。S3 テーブルバケット内のテーブルにデータをストリーミングするには、テーブルバケット内の名前空間を指すリソースリンクをデフォルトカタログに作成します。

また以下のページでは For delivery to tables in Amazon S3 table buckets, Firehose supports only the default AWS Glue catalog.と記載があり、Firehose ではデフォルトの AWS Glue カタログのみをサポートしていることを明記しています。

ここがとても分かりづらい部分なのですが、S3 Tables はデフォルトの AWS Glue カタログ(下記の絵の Default Catalog)でメタデータを管理するのではなく、デフォルトカタログとは別の s3tablescatalog というカタログでメタデータを管理しています。

そのため Firehose でアクセスするにはデフォルトの AWS Glue カタログ(Default Catalog)にリソースリンクを作ることで S3 Tables のメタデータにアクセスしているというわけです。そのため Firehose で Database 名を指定する際は S3 Tables に作った名前空間ではなくリソースリンク名を指定する必要があります。

alt text

リソースリンクはコマンドで作成できます。今回は以下のコマンドで作りました。コマンドからも分かるとおり、リソースリンクはGlueのデータベースオブジェクトという扱いです。

aws glue create-database --region ap-northeast-1 \
--catalog-id "<カタログID>" \
--database-input \
'{
  "Name": "my_rsclink",
  "TargetDatabase": {
    "CatalogId": "<カタログID>:s3tablescatalog/my-s3-table-bucket",
    "DatabaseName": "my_s3_namespace"
  },
  "CreateTableDefaultPermissions": []
}'

カタログ ID というのがよくわからなかったのですが、どうやらアカウント ID のようです。

テーブルが作成されている状態であれば、Athena で以下の操作をすることでカタログ ID を確認できます

alt text

alt text

リソースリンクが作成されると、Lake Formation の画面からもその存在を確認できます。

alt text

5. リソースリンクへの権限付与

リソースリンクを使用してテーブルにアクセスする場合は、リソースリンクリンク先の名前空間両方にアクセス許可を付与する必要があります。(後述しますが、私の勘違いでなければテーブルにもアクセス許可が必要)

これらは Lake Formation で管理されているリソースになるので、Lake Formation を使って、作成したfirehose-s3-tables-role に許可を与えていきます。

Lake Formation の画面で Data permissions を開き、Grant を押します

alt text

プリンシパルに作成したロールを指定します

alt text

まずはリソースリンクに対する許可から作成します。以下のようにデフォルトカタログ(アカウント ID になります)を選び、Databases には作成したリソースリンクを指定します

alt text

許可については Resource link permissionsDescribe のみにチェックをつけます

alt text

同様に、再度、Data permissions から、Grant ボタンを押してリンク先の名前空間に対してもfirehose-s3-tables-role に許可を与えます

alt text

Prerequisites to use Apache Iceberg Tables as a destination - Amazon Data Firehoseによると読み込み・書き込みアクセス許可が必要とのことなので以下のような権限にしました

alt text

自分が確認した限りではテーブルに対しても権限付与が必要でした。テーブル権限がない場合は以下のエラーがでます。

{"attemptsMade":0,"arrivalTimestamp":1742373964000,"errorCode":"Glue.AccessDenied","errorMessage":"Access was denied. Ensure that the trust policy for the provided IAM role allows Firehose to assume the role, and the access policy allows access to the Glue table. Table: my_rsclink.firehosetest"<以下テーブルバケットに送信失敗した実データが続く>

テーブル権限を追加するには以下のようにします。

vscode-paste-1742374295448-kopoh066gtb.png

vscode-paste-1742374308937-4fdddbodrpw.png

6. Firehose ストリームの作成

ようやく前準備が終わったので Firehose ストリームを作成していきます。今回はシンプルに AWS CLI を使ってレコードを Firehose に put しようと思うので、ソースには Direct Put を使用します。また宛先は Apache Iceberg テーブルを指定します

alt text

Apache Iceberg テーブルにデータを書き込むにはテーブルスキーマ(列名、データ型)等の情報を管理する台帳(データカタログ)が必要です。そのため、以下の送信先の設定で Glue データカタログが存在するアカウントを指定します。別アカウントにあるデータカタログも参照できるようです

alt text

またここが重要な設定なのですが、**ルーティング情報のインライン解析** というチェックボックスを有効にすることで以下の入力欄が出てきます

alt text

このチェックボックスは Firehose が受信したレコードに対してJSONQuery 式を使った制御を有効とするためのものになっています。

JSONQuery 式とは

JSONQuery 式とは、受信したレコードを どの Databaseどのテーブルどのように処理(Insert,Update,Delete) するかを指定するものとなっています。

分かりやすいように、ここでは例として Firehose が以下のような JSON レコードを受け取ったと仮定します

{
  "deviceId": "Device1234",
  "timestamp": "2024-11-28T11:30:00Z",
  "data": {
    "temperature": 21.5,
    "location": {
      "latitude": 37.3324,
      "longitude": -122.0311
    }
  },
  "powerlevel": 84,
  "action": "insert",
  "status": "online"
}

JSONQuery 式では、データベース式(どのデータベース)、テーブル式(どのテーブル)、オペレーション式(どういう処理)の3つを指定するのですが、各値については、静的な値を指定するか、または受信レコードの値を元に動的な値を指定することができます。

例えば my_rsclink という Database に記録したい、という場合は、静的な指定となるので "my_rsclink" という感じでダブルクォーテーションで囲みます。

例えばテーブルについては受信した JSON レコードの中の deviceId の名前を持つテーブルに記録したい、という場合は、.deviceId とテーブル式に書くことで、受信レコードの deviceId の値(上記の例では Device1234)の名前のテーブルに書き込むことができるといった形です。

なおオペレーション式には、insertupdatedelete のいずれかの値を指定する必要があります。受信レコードの中にこれらの値のいずれかが値として入っている項目があればその値を参照することで動的に動作を変更することもできます。(上記の場合、.actionと記載すれば action の値に沿った動作をさせることが可能)

今回は、Database は事前に作成したリソースリンクの"my_rsclink"、テーブルは"firehosetest"、アクションは"insert"でやってみます。

バックアップの設定で事前に作成した 配信失敗データ格納用の S3 バケット を指定します

alt text

詳細設定の箇所で、作成した IAM ロール firehose-s3-tables-role を指定しましょう。ここまで設定できたら「Firehose ストリームを作成」を押します

alt text

7. Firehose にレコードを送信する

正常に作成されたら、以下のコマンドで Firehose に直接レコードを put してみましょう

aws firehose put-record \
        --delivery-stream-name PUT-ICE-MNcRq \
        --record '{"Data":"eyJzYWxlX2RhdGUiOiIyMDI0LTAzLTE5IiwicHJvZHVjdF9jYXRlZ29yeSI6ImVsZWN0cm9uaWNzIiwic2FsZXNfYW1vdW50IjoxOTkuOTl9"}'

--record のところが分かりづらいですが、以下の JSON を Base64 エンコードしたものです

{
  "sale_date": "2024-03-19",
  "product_category": "electronics",
  "sales_amount": 199.99
}

8. 結果の確認

Athena から S3 Tables を select してみると以下のようにレコードが出力されていました

SELECT * FROM my_s3_namespace.firehosetest
ORDER BY sale_date DESC
LIMIT 10;

alt text

(何回か実行したので複数レコードが出ています)

まとめ

今回は Amazon Data Firehose から S3 Tables へのリアルタイムデータ配信を試してみました。設定の流れとしては以下のようになります

  1. S3 Tables のテーブルバケットと配信先テーブルを作成する
  2. 配信失敗データ格納用の S3 バケットを作成する
  3. Firehose 用の IAM ロールを作成するß
  4. Glue データカタログにリソースリンクを作成する
  5. Lake Formation で必要な権限を付与する
  6. Firehose ストリームを作成する
  7. データを送信してテストする

特に注意すべき点としては、S3 Tables のテーブルにアクセスするためには Glue データカタログのリソースリンクが必要であり、Firehose の設定では Database 名としてこのリソースリンク名を指定する必要があるということです。

最後にトラブルシューティング向けの情報をいくつか

  • S3Tablesに出力されないなというときは、Amazon S3 のソースレコードのバックアップ で指定した S3 バックアップバケット に入ったファイルを確認しましょう。ファイルを見ると、どのレコードがどういう理由で失敗したかが書いてあります。
  • 権限エラーのときはIAMロールのポリシーだけでなく、LakeFormationの権限も確認しましょう
  • どうしてもうまくいかないときは一度、Firehoseのストリームを作り直すのもありかもしれません。実際、私の経験でも、同じ設定で再作成したことで問題が解決したケースがありました。

以上、とーちでした。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.