SQLite3データをData Firehose経由でS3 TablesのIcebergテーブルに格納する

SQLite3データをData Firehose経由でS3 TablesのIcebergテーブルに格納する

Clock Icon2025.02.14

データ事業本部の笠原です。

今回は、SQLite3のデータベースファイルを読み込んで、Data Firehose経由でS3 TablesのIcebergテーブルに格納してみたいと思います。

なるべくCDKで実装してみましたが、一部CDK (CloudFormation)では対応していない箇所がありますので、その部分は別途AWS CLIを使ったシェルスクリプトを起動して設定します。

環境

以下の環境で確認しています。

  • AWS CDK: 2.178.1
  • AWS CLI: 2.23.10
  • Node: 22.13.1
  • TypeScript: 5.6.3
  • Python: 3.12.8

サンプルコード

以下のGitHubに格納しております。

https://github.com/kasacchiful/sqlite2iceberg_sample

概要図

今回の構成は以下のとおりです。
オレゴンリージョン (us-west-2) で試していますが、東京リージョン (ap-northeast-1) でも動作するはずです。

firehose-s3tables-arch

S3バケットにSQLite3のデータベースファイルをアップロードしてもらうと、Lambda関数が起動してデータベースファイルを読み込みます。
Lambda関数内で読み込んだデータをData Firehoseのストリームに「Direct PUT」します。
Data Firehoseのストリームの送信先にIcebergテーブルを指定できますので、今回はS3 Tablesで作成したIcebergテーブルに送信します。

構成要素

事前にS3のテーブルバケットにて、「AWS 分析サービスとの統合 - プレビュー」を有効にしておきます。
AWSマネジメントコンソール上で「統合を有効にする」をクリックして、有効化しておきましょう。

s3tables-analytics-service-integration

SQLite3 データベースファイル

SQLite3のデータベースファイルを作成します。
今回はPython3のスクリプトで作成しました。

データベースファイルには、2つのテーブルが入っていて、その中の1つのテーブル temperature_humidity_histories をIcebergテーブルに配信することを想定します。
データの中身は適当なランダム数値を割り当てています。

data/generate_sqlite3_data.py
import sqlite3
import random
from datetime import datetime, timedelta

## create database.
with sqlite3.connect("main.db") as con:
    cur = con.cursor()

    ## create tables.
    cur.execute("""
        CREATE TABLE IF NOT EXISTS temperature_humidity_histories (
            sensor_id,
            created_at,
            temperature,
            humidity
        )
    """)

    cur.execute("""
        CREATE TABLE IF NOT EXISTS thermohygrometers (
            sensor_id,
            sensor_name
        )
    """)

    ## insert data
    sensor_cnt = 10
    sensors = [ (i, f'meter{i}',) for i in range(1, sensor_cnt+1) ]
    cur.executemany("""
        INSERT INTO thermohygrometers VALUES (?, ?)
    """, sensors)

    days_cnt = 60
    base_date = datetime.now() - timedelta(days=days_cnt)
    datelist = [
        (base_date + timedelta(days=d)).strftime('%Y-%m-%d %H:%M:%S')
        for d in range(days_cnt)
    ]
    histories = [
        (random.choice(range(1, sensor_cnt+1)),
        date,
        random.normalvariate(20.0, 2),
        random.normalvariate(40.0, 10),)
        for date in datelist
    ]
    cur.executemany("""
        INSERT INTO temperature_humidity_histories VALUES (?, ?, ?, ?)
    """, histories)

    con.commit()

実行すると、 main.db というデータベースファイルが作成されます。
このファイルをS3のソース用バケットに格納します。

S3

最初にS3バケットを用意します。
今回は以下の3つを作成します。

  • ソース用バケット
    • SQLite3のデータベースファイルをアップロードしていただく汎用バケット
  • ストリーム用バケット
    • Firehoseの配信失敗時にバックアップデータを格納する汎用バケット
  • ターゲットIcebergテーブルバケット
    • Firehose配信先となるテーブルバケット

S3 TablesはまだL1コンストラクトしか用意されていませんので、
今回はこのL1コンストラクトの CfnTableBucket クラスで作成しています。

また、ソース用のS3バケットにEventBridgeへの通知を有効化しています。
EventBridgeを介してLambda関数を起動します。

lib/sqlite2iceberg_s3bucket-stack.ts
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { CfnTableBucket } from 'aws-cdk-lib/aws-s3tables';

    // S3 Bucket
    // ソース用バケット
    this.srcBucket = new Bucket(this, 'SourceBucket', {
      removalPolicy: cdk.RemovalPolicy.RETAIN,
      eventBridgeEnabled: true,   // EventBridgeへのイベント通知を有効化
    });

    // ストリーム用バケット (配信失敗時のバックアップデータ格納先)
    this.deliveryStreamBucket = new Bucket(this, 'DeliveryStreamBucket', {
      removalPolicy: cdk.RemovalPolicy.RETAIN,
    });

    // S3 Table Bucket
    // ターゲットIcebergテーブルバケット
    this.targetTableBucket = new CfnTableBucket(this, 'TableBucket', {
      tableBucketName: 'firehose-s3table',
    });

Icebergテーブル作成

ターゲットとなるS3 Tablesのテーブルバケットを作成したら、
S3 TablesのIcebergに対して、namespaceとtableを作成します。

namespaceとtableの作成には、今回はAWS CLIを利用します。
その際、Lake FormationのData Parmissionsにて、操作するプリンシパル(IAMユーザやIAMロール)に対して、利用するカタログとデータベースを操作する権限をAWS CLIで付与します。
今回は、実際にAWS CLIを操作するIAMユーザ/IAMロールのARNを指定します。

## テーブルバケットのカタログに対する権限追加
aws lakeformation grant-permissions \
  --catalog-id "${AWS_ACCOUNT_ID}" \
  --principal "{\"DataLakePrincipalIdentifier\": \"${AWS_PRINCIPAL_ARN}\"}" \
  --resource "{\"Catalog\": {\"Id\": \"${AWS_ACCOUNT_ID}:s3tablescatalog/${TABLE_BUCKET_NAME}\"}}" \
  --permissions "ALL" \
  --permissions-with-grant-option "ALL"

## namespace (database) 作成
## namespace名は 'main' としています。
aws s3tables create-namespace \
  --table-bucket-arn ${TABLE_BUCKET_ARN} \
  --namespace main

## テーブルバケットのnamespace (database)に対する権限追加
aws lakeformation grant-permissions \
  --catalog-id "${AWS_ACCOUNT_ID}" \
  --principal "{\"DataLakePrincipalIdentifier\": \"${AWS_PRINCIPAL_ARN}\"}" \
  --resource "{\"Database\": { \"CatalogId\": \"${AWS_ACCOUNT_ID}:s3tablescatalog/${TABLE_BUCKET_NAME}\", \"Name\": \"main\" }}" \
  --permissions "ALL" \
  --permissions-with-grant-option "ALL"

## テーブル作成
## テーブル名は 'temperature_humidity_histories' としています。
aws s3tables create-table \
  --table-bucket-arn ${TABLE_BUCKET_ARN} \
  --namespace main \
  --name temperature_humidity_histories \
  --format 'ICEBERG' \
  --metadata '{"iceberg":{"schema":{"fields":[{"name":"sensor_id","type":"int","required":true},{"name":"created_at","type":"timestamp"},{"name":"temperature","type":"double"},{"name":"humidity","type":"double"}]}}}'

IAMロール

今回利用するIAMロールは以下です。

  • Lambda関数用ロール
  • Data Firehose配信ストリーム用ロール
  • Data Firehose配信失敗時のバックアップS3バケットにアクセスするためのロール

配信失敗時のS3バケットにアクセスするためのロールは作らずに、配信ストリーム用のロールと兼務して使ってもOKですが、
今回は分けてみました。

Lambda関数は、 service-role/AWSLambdaBasicExecutionRole の他、
ソースのS3バケットとData Firehoseの配信ストリームにアクセスできればOKです。

Data Firehose配信ストリーム用ロールは、AWSドキュメントを参考にしつつ、
ドキュメントに記載されていた Glueへのアクセスリソースでは権限不足だったので、今回はリソースを "*" にしています。

https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html#using-s3-tables

lib/sqlite2iceberg_target-stack.ts
import { Role, ServicePrincipal, ManagedPolicy, PolicyStatement } from 'aws-cdk-lib/aws-iam';

    // IAM Role
    // Lambda 関数用ロール
    const loadFunctionRole = new Role(this, 'LoadFunctionRole', {
      assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
      managedPolicies: [
        ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole'),
        ManagedPolicy.fromAwsManagedPolicyName('AmazonS3FullAccess'),
        ManagedPolicy.fromAwsManagedPolicyName('AmazonKinesisFirehoseFullAccess'),
      ],
    });

    // Data Firehose 配信ストリーム用ロール
    const deliveryStreamIcebergRole = new Role(this, 'DeliveryStreamIcebergRole', {
      assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
    });
    deliveryStreamIcebergRole.addToPolicy(new PolicyStatement({
      actions: [
        "glue:GetTable",
        "glue:GetDatabase",
        "glue:UpdateTable",
      ],
      resources: ["*"],
    }));
    deliveryStreamIcebergRole.addToPolicy(new PolicyStatement({
      actions: [
        "lakeformation:GetDataAccess",
      ],
      resources: [
        "*",
      ],
    }));
    deliveryStreamIcebergRole.addToPolicy(new PolicyStatement({
      actions: [
        "logs:PutLogEvents",
      ],
      resources: [
        "*"
      ],
    }));

    // Data Firehose の配信失敗時のバックアップ先S3にアクセスするロール
    const deliveryStreamS3Role = new Role(this, 'DeleveryStreamS3Role', {
      assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
    });
    deliveryStreamS3Role.addToPolicy(new PolicyStatement({
      actions: [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject",
      ],
      resources: [
        `${props?.deliveryStreamBucket.bucketArn}`,
        `${props?.deliveryStreamBucket.bucketArn}/*`,
      ],
    }));
    deliveryStreamS3Role.addToPolicy(new PolicyStatement({
      actions: [
        "logs:PutLogEvents",
      ],
      resources: [
        "*"
      ],
    }));

Lake Formation

1. Icebergデータベースへのリソースリンクの作成

Lake FormationのDatabases内にある「Create database」からリソースリンクを作成することができます。これによって、S3 Tablesのカタログ {AWS_ACCOUNT_ID}:s3tablescatalog/{TABLE_BUCKET_NAME} にあるIcebergのデータベースを、Glueのデフォルトカタログ {AWS_ACCOUNT_ID} から参照することができます。

Data Firehoseの考慮事項には、以下のようにデフォルトのAWS Glueカタログのみをサポートするとの記載がありますので、このためにリソースリンクを作成します。

Amazon S3 テーブルバケット内のテーブルへの配信の場合、Firehose はデフォルトの AWS Glue カタログのみをサポートします。

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/apache-iceberg-considerations.html

AWS CLIやAWS CDK (CloudFormation)では、Glue Databaseでデータベースのリソースリンクを定義できます。

lib/sqlite2iceberg_target-stack.ts
import { CfnDatabase } from 'aws-cdk-lib/aws-glue';

    // Resource Link for S3 Tables Database
    const s3tablesResourceLink = new CfnDatabase(this, 'S3TablesResourceLink', {
      catalogId: `${props?.env?.account}`,
      databaseInput: {
        name: dbResourceLinkName,
        targetDatabase: {
          catalogId: `${props?.env?.account}:s3tablescatalog/${props?.targetTableBucket.tableBucketName}`,
          databaseName: databaseName,
        },
      },
    });

設定したリソースリンクは、AWSマネジメントコンソール上では Lake Formation の Databases 項目に一覧が表示されます。

lakeformation_resource_link_db

2. 権限付与

Firehoseの配信ストリームにアタッチするIAMロールに対して、Icebergデータベースやテーブルを操作する権限を付与します。

私が試したところ、以下3つの権限設定が必要でした。

  • データベースのリソースリンクへの権限付与
  • データベースのリソースリンク先のテーブルへの権限付与
  • 実際のS3Tablesのテーブルへの権限付与

データベースのリソースリンクへの権限付与については、AWS CDK (Cloudformation)でも設定可能でした。
permissionsDESCRIBE で十分です。

lib/sqlite2iceberg_target-stack.ts
import { CfnPrincipalPermissions } from 'aws-cdk-lib/aws-lakeformation';

    // Lake Formation
    new CfnPrincipalPermissions(this, 'ResourceLinkPermission', {
      principal: {
        dataLakePrincipalIdentifier: deliveryStreamIcebergRole.roleArn,
      },
      resource: {   // リソースリンク(DB)に対する権限付与
        database: {
          catalogId: s3tablesResourceLink.catalogId,
          name: dbResourceLinkName,
        },
      },
      permissions: ["DESCRIBE"],
      permissionsWithGrantOption: [],
    });

他2つは、AWS CDK (Cloudformation)で設定しようとするとエラーとなったため、
今回はAWS CLIで設定します。

--principal オプションの DataLakePrincipalIdentifier は、権限付与対象のIAMユーザ/IAMロールのARNを指定する必要があり、今回はData Firehose配信ストリームにアタッチするIAMロールのARNを指定します。

また、 --resource オプションの "TableWildcard": {} は、すべてのテーブル(ALL_TABLES) を対象とする場合の設定方法になります。

## データベースリソースリンク先のテーブルへの権限付与
## AWS_PRINCIPAL_ARN は、Data Firehoseの配信ストリームにアタッチするIAMロールのARN
aws lakeformation grant-permissions \
  --catalog-id "${AWS_ACCOUNT_ID}" \
  --principal "{\"DataLakePrincipalIdentifier\": \"${AWS_PRINCIPAL_ARN}\"}" \
  --resource "{\"Table\": {\"CatalogId\": \"${AWS_ACCOUNT_ID}\", \"DatabaseName\": \"resource_link_to_s3tables_main\", \"TableWildcard\": {} }}" \
  --permissions "ALL"

## 実際のS3Tablesのテーブルへの権限付与
aws lakeformation grant-permissions \
  --catalog-id "${AWS_ACCOUNT_ID}" \
  --principal "{\"DataLakePrincipalIdentifier\": \"${AWS_PRINCIPAL_ARN}\"}" \
  --resource "{\"Table\": {\"CatalogId\": \"${AWS_ACCOUNT_ID}:s3tablescatalog/${TABLE_BUCKET_NAME}\", \"DatabaseName\": \"main\", \"TableWildcard\": {} }}" \
  --permissions "ALL"

Data Firehose

Data Firehoseの配信先にIcebergテーブルを指定することができます。
icebergDestinationConfiguration にて、適宜設定を行います。

AWS CDKではL2コンストラクトで用意されている配信先は、現時点でS3バケットのみなので、
今回はL1コンストラクト CfnDeliveryStream で定義します。

lib/sqlite2iceberg_target-stack.ts
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';

    // Data Firehose
    const deliveryStream = new CfnDeliveryStream(this, 'DeliveryStream', {
      deliveryStreamType: 'DirectPut',
      icebergDestinationConfiguration: {
        roleArn: deliveryStreamIcebergRole.roleArn,
        bufferingHints: {
          intervalInSeconds: 30,    // 開発向けに早くデータを反映してもらうため
          sizeInMBs: 5,
        },
        retryOptions: {
          durationInSeconds: 30,    // 再試行時間も短めにした
        },
        catalogConfiguration: {
          catalogArn: `arn:aws:glue:${props?.env?.region}:${props?.env?.account}:catalog`,
        },
        destinationTableConfigurationList: [{
          destinationDatabaseName: dbResourceLinkName,
          destinationTableName: tableName,
          uniqueKeys: ['sensor_id', 'created_at'],    // UpdateやDeleteの際に使われるが、今回はInsertしかしないので無視される
          s3ErrorOutputPrefix: `dstErrIceberg/${dbResourceLinkName}/${tableName}`,
        }],
        s3Configuration: {
          // 配信エラー発生時のバックアップ先S3バケットの設定
          bucketArn: `${props?.deliveryStreamBucket.bucketArn}`,
          roleArn: deliveryStreamS3Role.roleArn,
          cloudWatchLoggingOptions: {
            // バックアップエラー発生時のCloudWatch Logs設定
            enabled: true,
            logGroupName: backupLogGroup.logGroupName,
            logStreamName: backupLogStream.logStreamName,
          },
        },
        cloudWatchLoggingOptions: {
          // 配信エラー発生時のCloudWatch Logs設定
          enabled: true,
          logGroupName: streamLogGroup.logGroupName,
          logStreamName: streamLogStream.logStreamName,
        },
      },
    });

ちなみに、配信先のテーブルは複数可能です。
その際、配信元のデータ内に配信先のテーブルを指定することで、Data Firehose側で振り分けてくれます。
今回は配信先を1つにして単一テーブルに対してデータを送るようにしているので、複数宛の振り分け機能は試していません。

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/apache-iceberg-format-input-record-different.html

Lambda

Lambda関数では、S3のソースバケットに配置されたSQLite3データベースファイルを読み込んで、
Data Firehoseの配信ストリームにデータレコードを投入します。

lambda/load/lambda_function.py
import os
import shutil
import sqlite3
import boto3
import botocore
import json
from datetime import datetime
from pprint import pprint

def handler(event, _):
    if event.get("Records"):
        ## S3イベント通知
        src_bucket = event["Records"][0]["s3"]["bucket"]["name"]
        src_objkey = event["Records"][0]["s3"]["object"]["key"]
    else:
        ## EventBridgeイベント通知
        src_bucket = event["detail"]["bucket"]["name"]
        src_objkey = event["detail"]["object"]["key"]

    delivery_stream = os.environ.get('TGT_STREAM')
    target_table = os.environ.get('TGT_TABLE')

    s3_client = boto3.client('s3')
    firehose_client = boto3.client('firehose')

    ## download sqlite database file.
    tmp_dir = '/tmp/load'
    local_path = os.path.join(tmp_dir, os.path.dirname(src_objkey))
    if not os.path.exists(local_path):
        os.makedirs(local_path)
    local_objkey = os.path.join(local_path, os.path.basename(src_objkey))
    s3_client.download_file(src_bucket, src_objkey, local_objkey)

    with sqlite3.connect(local_objkey) as con:
        cur = con.cursor()

        ## scan "temperature_humidity_histories" table data
        batch_size = 10
        cur.execute(f"""
            SELECT sensor_id,
                   created_at,
                   temperature,
                   humidity
            FROM {target_table}
        """)
        rows = cur.fetchmany(batch_size)
        while (rows is not None) and rows:
            data = [ {
                "sensor_id": row[0],
                "created_at": datetime.strptime(row[1], '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%dT%H:%M:%SZ'),
                "temperature": row[2],
                "humidity": row[3] } for row in rows ]
            records = [ { "Data": json.dumps(record).encode() } for record in data ]
            res = firehose_client.put_record_batch(
                DeliveryStreamName=delivery_stream,
                Records=records,
            )
            print("\nstream response:")
            pprint(res)
            rows = cur.fetchmany(batch_size)

        cur.close()

    shutil.rmtree(local_path)
    return {
        "src_bucket": src_bucket,
    }

CDK上での定義は以下のとおりです。

今回は、S3バケット作成用のスタックとそれ以外のスタックで分けた影響で、
S3イベント通知をLambda関数のトリガーにするとスタックの循環参照でエラーになるため、
EventBridgeを介してLambda関数を起動します。
ソース用のS3バケットにEventBridgeへの通知を有効化したのは、このためです。

lib/sqlite2iceberg_target-stack.ts
import { Function, Runtime, Code } from 'aws-cdk-lib/aws-lambda';
import { Rule } from 'aws-cdk-lib/aws-events';
import { LambdaFunction as targetLambdaFunction } from 'aws-cdk-lib/aws-events-targets';

    // Lambda Function
    const loadFunction = new Function(this, 'LoadFunction', {
      runtime: Runtime.PYTHON_3_12,
      code: Code.fromAsset('lambda/load'),
      handler: 'lambda_function.handler',
      memorySize: 256,
      timeout: cdk.Duration.seconds(10),
      role: loadFunctionRole,
      environment: {
        SRC_BUCKET: `${props?.sourceBucket.bucketName}`,
        TGT_STREAM: deliveryStream.ref,
        TGT_TABLE: tableName,
      },
    });

    // EventBridge Rule
    const onCreateS3Object = new Rule(this, 'onCreateS3ObjectRule', {
      eventPattern: {
        source: ["aws.s3"],
        detailType: ["Object Created"],
        detail: {
          bucket: {
            name: [props?.sourceBucket.bucketName]
          },
        },
      },
      targets: [
        new targetLambdaFunction(loadFunction, {}),
      ]
    });

動作確認

最初に作成したSQLite3データベースファイル main.db をソース用S3バケットにアップロードします。
その後しばらくすると、S3 TablesのIcebergテーブルに格納されるはずなので、Athenaで確認してみましょう。

Athenaで、データベースにリソースリンク resource_link_to_s3tables_main を指定して、以下のクエリを実行すると、結果が返るはずです。

select *
from temperature_humidity_histories
;

firehose_s3tables_athena_result

まとめ

今回は、以下のことを意識して試してみました。

  • なるべくAWS CDKで動作環境を構築する
  • AWS CDKで現状対応できない部分はAWS CLIを利用する
  • Data Firehoseの配信先のIcebergは、S3 Tablesのテーブルバケットとする
  • SQLite3のデータベースファイルをPythonで扱う

参考になれば、幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.