Lambda 関数のログを CloudWatch Logs サブスクリプションフィルター経由で Amazon Data Firehose に流し S3 Tables(Iceberg)へ書き込む構成を CDK で実装してみた

Lambda 関数のログを CloudWatch Logs サブスクリプションフィルター経由で Amazon Data Firehose に流し S3 Tables(Iceberg)へ書き込む構成を CDK で実装してみた

ただし、1レコードに複数のログがまとめられる等、課題あり。
2026.05.11

こんにちは、クラスメソッドの若槻です。

先日、CloudWatch Logs の AWS マネージドな S3 テーブル統合機能を CDK で実装する記事を書きました。

ただ、AWS マネージドな S3 テーブル統合機能には「データ保持期間がロググループの保持ポリシーに連動する」「テーブルスキーマを自前で決められない」といった制約があります。用途に応じてスキーマやデータ保持期間をコントロールしたい場合は、CloudWatch Logs から Amazon Data Firehose(旧 Kinesis Data Firehose)を経由して Amazon S3 Tables に直接書き込む構成のほうが向いています。

本記事では、AWS CDK(TypeScript)でこの構成を実装してみました。

実装した構成

実装した主なリソースは以下です。

リソース 役割
Lambda 関数 ログの発生源。loggingFormat: JSON で JSON 形式のログを出力する
サブスクリプションフィルター CloudWatch Logs から Firehose へログを転送
Amazon Data Firehose Apache Iceberg destination で S3 Tables へ書き込む配信ストリーム
Lambda 変換関数 Firehose 内で gzip 解凍・CWL ラッパー除去・プラットフォームログ除外を実行
S3 Tables テーブルバケット・名前空間・Iceberg テーブル
Glue s3tablescatalog Athena から S3 Tables にアクセスするためのフェデレーテッドカタログ
Athena ワークグループ クエリ実行環境

アーキテクチャは以下の通りです。

Lambda 関数(loggingFormat: JSON)

CloudWatch Logs(/aws/lambda/xxx)
  ↓ サブスクリプションフィルター
Amazon Data Firehose
  ├─ Lambda 変換: gzip 解凍 + CWL ラッパー除去 + プラットフォームログ除外
  └─ Iceberg destination
S3 Tables(Apache Iceberg テーブル)

Athena(s3tablescatalog 経由)

本構成では Lambda 関数ごとに専用の Iceberg テーブルとパイプライン(Firehose・サブスクリプションフィルター)を設けています。主な理由は以下の通りです。

  • スキーマ・保持期間を関数単位で独立管理できる — 関数ごとにログ出力フィールドが異なる場合にも、テーブルスキーマを個別に定義できる
  • 複数関数のログが混在しない — テーブルが分かれているためクエリがシンプルになる
  • 拡張しやすいLambdaAConstruct が S3 テーブル・Firehose・サブスクリプションフィルターを内包しているため、Lambda 関数を追加する際は LambdaBConstruct などを同じパターンで追加するだけで済む

コンストラクトは lib/constructs/ 配下にサブディレクトリとして分割し、CDK コンストラクトツリーとディレクトリ階層を一致させています。

lib/
├── constructs/
│   ├── lambda-a/
│   │   ├── index.ts                                       # LambdaAConstruct
│   │   └── logging/
│   │       ├── index.ts                                   # LoggingConstruct
│   │       ├── log-group/index.ts                         # LogGroupConstruct
│   │       ├── s3-table/index.ts                          # S3TableConstruct
│   │       └── log-s3table-pipeline/
│   │           ├── index.ts                               # LogS3TablePipelineConstruct
│   │           ├── firehose/
│   │           │   ├── index.ts                           # FirehoseConstruct
│   │           │   └── transform-lambda/index.ts          # TransformLambdaConstruct
│   │           └── subscription-filter/index.ts           # SubscriptionFilterConstruct
│   ├── s3-tables-bucket/
│   │   ├── index.ts                                       # S3TablesBucketConstruct
│   │   └── lambda-logs-namespace/index.ts                 # LambdaLogsNamespaceConstruct
│   ├── s3-tables-catalog/index.ts                         # S3TablesCatalogConstruct
│   └── athena-workgroup/
│       ├── index.ts                                       # AthenaWorkGroupConstruct
│       └── result-bucket/index.ts                         # ResultBucketConstruct
└── sample-stack.ts                                        # スタック本体

CDK コンストラクトツリーは以下の構造です。

SampleStack
├── S3TablesBucketConstruct
│   └── LambdaLogsNamespaceConstruct
├── LambdaAConstruct
│   ├── LoggingConstruct
│   │   ├── LogGroupConstruct
│   │   ├── S3TableConstruct
│   │   └── LogS3TablePipelineConstruct
│   │       ├── FirehoseConstruct
│   │       └── SubscriptionFilterConstruct
│   └── Default (Lambda Function)
├── S3TablesCatalogConstruct
└── AthenaWorkGroupConstruct

やってみた

0. 前提

CDK バージョン

S3 Tables の名前空間とテーブルを作成する @aws-cdk/aws-s3tables-alpha を使用します。各パッケージは最新版をインストールしてください。

npm install aws-cdk-lib@latest constructs@latest @aws-cdk/aws-s3tables-alpha@latest
npm install -D aws-cdk@latest

1. スタック本体

スタック本体のコードです。

sample-stack.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";

import { AthenaWorkGroupConstruct } from "./constructs/athena-workgroup";
import { LambdaAConstruct } from "./constructs/lambda-a";
import { S3TablesBucketConstruct } from "./constructs/s3-tables-bucket";
import { S3TablesCatalogConstruct } from "./constructs/s3-tables-catalog";

export class SampleStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    /** S3 Tables(テーブルバケット・Lambda ログ用名前空間) */
    const s3TablesBucketConstruct = new S3TablesBucketConstruct(
      this,
      "S3TablesBucket",
    );

    /** Glue s3tablescatalog フェデレーテッドカタログ(Firehose / Athena から S3 Tables へアクセスするために必要) */
    const s3TablesCatalogConstruct = new S3TablesCatalogConstruct(
      this,
      "S3TablesCatalog",
    );

    /** Lambda 関数 A: サンプルの Lambda 関数(S3 Tables ログパイプライン含む) */
    const lambdaAConstruct = new LambdaAConstruct(this, "LambdaA", {
      namespace: s3TablesBucketConstruct.lambdaLogsNamespace,
      tableBucketCatalogArn: s3TablesBucketConstruct.tableBucketCatalogArn,
    });
    // Lambda 関数が増える場合はここに LambdaBConstruct などを追加していく

    /** Firehose はカタログ経由で Iceberg テーブルにアクセスするため、s3tablescatalog の作成完了を待つ */
    lambdaAConstruct.node.addDependency(s3TablesCatalogConstruct);

    /** Athena ワークグループ(クエリ実行環境) */
    new AthenaWorkGroupConstruct(this, "AthenaWorkGroup");
  }
}

2. S3TablesBucketConstruct / LambdaLogsNamespaceConstruct

S3 Tables のテーブルバケットと Lambda ログ用名前空間("lambda_logs")をそれぞれ独立したコンストラクトで管理します。

関連するハマったポイント:その1(テーブルバケットの削除・再作成)

s3-tables-bucket/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";

import { LambdaLogsNamespaceConstruct } from "./lambda-logs-namespace";

/** S3 Tables のテーブルバケットを管理するコンストラクト。 */
export class S3TablesBucketConstruct extends Construct {
  readonly tableBucket: s3tables.TableBucket;
  /** Firehose Iceberg destination に渡す Glue カタログ ARN(バケットレベル) */
  readonly tableBucketCatalogArn: string;
  readonly lambdaLogsNamespace: s3tables.Namespace;

  constructor(scope: Construct, id: string) {
    super(scope, id);

    const stack = cdk.Stack.of(this);

    // S3 Tables バケット名はアカウント内でユニークであれば良いため、construct パスから自動導出する
    const tableBucketName = cdk.Names.uniqueId(this).toLowerCase();

    /** S3 Tables テーブルバケット */
    this.tableBucket = new s3tables.TableBucket(this, "Default", {
      tableBucketName, // 指定必須
      removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
    });

    this.tableBucketCatalogArn = cdk.Arn.format(
      {
        service: "glue",
        resource: "catalog",
        resourceName: `s3tablescatalog/${tableBucketName}`,
      },
      stack,
    );

    /** Lambda 関数ログ用名前空間("lambda_logs") */
    const lambdaLogsNamespaceConstruct = new LambdaLogsNamespaceConstruct(
      this,
      "LambdaLogsNamespace",
      { tableBucket: this.tableBucket },
    );
    this.lambdaLogsNamespace = lambdaLogsNamespaceConstruct.namespace;

    // 名前空間を追加する場合は、ここに XxxNamespaceConstruct を追加していく
  }
}
s3-tables-bucket/lambda-logs-namespace/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import { Construct } from "constructs";

interface LambdaLogsNamespaceConstructProps {
  tableBucket: s3tables.TableBucket;
}

/** Lambda 関数ログ用の S3 Tables 名前空間("lambda_logs")を管理するコンストラクト。 */
export class LambdaLogsNamespaceConstruct extends Construct {
  readonly namespace: s3tables.Namespace;

  constructor(
    scope: Construct,
    id: string,
    props: LambdaLogsNamespaceConstructProps,
  ) {
    super(scope, id);

    const { tableBucket } = props;

    this.namespace = new s3tables.Namespace(this, "Default", {
      namespaceName: "lambda_logs",
      tableBucket,
    });
  }
}

3. LambdaAConstruct / LoggingConstruct

LambdaAConstruct は Lambda 関数本体と、ログ関連リソースをまとめた LoggingConstruct を管理します。

lambda-a/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as lambda from "aws-cdk-lib/aws-lambda";
import { Construct } from "constructs";

import { LoggingConstruct } from "./logging";

interface LambdaAConstructProps {
  namespace: s3tables.Namespace;
  tableBucketCatalogArn: string;
}

/** Lambda 関数 A・ロググループ・S3 Tables ログパイプラインを管理するコンストラクト。 */
export class LambdaAConstruct extends Construct {
  readonly function: lambda.Function;

  constructor(scope: Construct, id: string, props: LambdaAConstructProps) {
    super(scope, id);

    const { namespace, tableBucketCatalogArn } = props;

    /** ロググループ・Iceberg テーブル・配信パイプラインをまとめたログ基盤 */
    const loggingConstruct = new LoggingConstruct(this, "Logging", {
      namespace,
      tableBucketCatalogArn,
    });

    /** ログ発生源となる Lambda 関数 */
    this.function = new lambda.Function(this, "Default", {
      runtime: lambda.Runtime.NODEJS_24_X,
      handler: "index.handler",
      code: lambda.Code.fromInline(`
exports.handler = async (event) => {
  const jst = () => new Date(Date.now() + 9 * 60 * 60 * 1000).toISOString().replace('Z', '+09:00');
  const count = event && typeof event.count === 'number' ? event.count : 3;
  const messages = ['start', 'invoked', 'end'];
  for (let i = 0; i < count; i++) {
    console.log(JSON.stringify({ timestamp: jst(), level: 'INFO', message: messages[i] || 'log' }));
  }
  return { statusCode: 200 };
};`),
      logGroup: loggingConstruct.logGroup,
      loggingFormat: lambda.LoggingFormat.JSON, // S3 テーブルで扱いやすい構造化ログにするため
    });
  }
}
lambda-a/logging/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";

import { LogGroupConstruct } from "./log-group";
import { LogS3TablePipelineConstruct } from "./log-s3table-pipeline";
import { S3TableConstruct } from "./s3-table";

interface LoggingConstructProps {
  namespace: s3tables.Namespace;
  tableBucketCatalogArn: string;
}

/** Lambda 関数のログ関連リソース(ロググループ・Iceberg テーブル・配信パイプライン)を管理するコンストラクト。 */
export class LoggingConstruct extends Construct {
  readonly logGroup: logs.LogGroup;

  constructor(scope: Construct, id: string, props: LoggingConstructProps) {
    super(scope, id);

    const { namespace, tableBucketCatalogArn } = props;

    /** Lambda 関数のロググループ */
    const { logGroup } = new LogGroupConstruct(this, "LogGroup");
    this.logGroup = logGroup;

    /** ログを格納する Iceberg テーブル */
    const s3TableConstruct = new S3TableConstruct(this, "S3Table", {
      namespace,
    });

    /** CloudWatch Logs → Firehose → S3 Tables のログ配信パイプライン */
    new LogS3TablePipelineConstruct(this, "LogS3TablePipeline", {
      logGroup,
      namespace,
      table: s3TableConstruct.table,
      tableBucketCatalogArn,
    });
  }
}

4. S3TableConstruct

Lambda 関数ログを格納する S3 Tables の Iceberg テーブルを管理するコンストラクトです。スキーマは log_events の1カラムのみを定義しています。

関連するハマったポイント:その2(1レコードに JSON オブジェクトは1つのみという制約)

lambda-a/logging/s3-table/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";

interface S3TableConstructProps {
  namespace: s3tables.Namespace;
}

/**
 * Lambda 関数ログを格納する S3 Tables の Iceberg テーブルを管理するコンストラクト。
 *
 * スキーマは Firehose の Lambda 変換で出力される JSON フィールドに合わせて定義する。
 */
export class S3TableConstruct extends Construct {
  readonly table: s3tables.Table;

  constructor(scope: Construct, id: string, props: S3TableConstructProps) {
    super(scope, id);

    const { namespace } = props;

    // S3 Tables のテーブル名はコンストラクトパスから自動導出する(小文字・数字・アンダースコアのみ)
    const tableName = cdk.Names.uniqueId(this)
      .toLowerCase()
      .replace(/[^a-z0-9]/g, "_");

    this.table = new s3tables.Table(this, "Default", {
      tableName,
      namespace,
      openTableFormat: s3tables.OpenTableFormat.ICEBERG,
      removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
      icebergMetadata: {
        icebergSchema: {
          schemaFieldList: [
            /**
             * logEvents 配列全体を JSON 文字列として格納する。
             * Iceberg destination は1レコードにつき JSON オブジェクトを1つしか受け付けないため、
             * logEvents を個別レコードに分割せず配列ごと1カラムに詰める。
             * S3 Tables (Iceberg) はカラム名を小文字に正規化するため、フィールド名は小文字+アンダースコアで定義する。
             * @see https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-considerations.html
             */
            { name: "log_events", type: "string" },
          ],
        },
      },
    });
  }
}

5. S3TablesCatalogConstruct

関連するハマったポイント:その3(Glue カタログ削除時の Lake Formation 権限)、その4(federatedCatalog.identifierbucket/* 固定で変更不可)

Athena から S3 Tables にアクセスするための Glue フェデレーテッドカタログです。createDatabaseDefaultPermissions / createTableDefaultPermissionsIAM_ALLOWED_PRINCIPALS:ALL を設定して IAM アクセス制御モードにしている点は前回の記事と同じです。これにより、Lake Formation の明示的な権限付与なしで Firehose と Athena 双方からアクセスできるようになります。

s3-tables-catalog/index.ts
import * as cdk from "aws-cdk-lib";
import * as glue from "aws-cdk-lib/aws-glue";
import { Construct } from "constructs";

/**
 * Glue s3tablescatalog フェデレーテッドカタログ。
 * @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-aws.html#table-integration-procedures
 */
export class S3TablesCatalogConstruct extends Construct {
  /** Glue s3tablescatalog カタログの ARN */
  readonly catalogArn: string;

  constructor(scope: Construct, id: string) {
    super(scope, id);

    /** IAM アクセス制御モードで統合する設定 */
    const iamAllowedPrincipalsAll: glue.CfnCatalog.PrincipalPermissionsProperty =
      {
        principal: { dataLakePrincipalIdentifier: "IAM_ALLOWED_PRINCIPALS" },
        permissions: ["ALL"],
      };

    const stack = cdk.Stack.of(this);

    this.catalogArn = cdk.Arn.format(
      { service: "glue", resource: "catalog", resourceName: "s3tablescatalog" },
      stack,
    );

    new glue.CfnCatalog(this, "Default", {
      name: "s3tablescatalog",
      federatedCatalog: {
        identifier: cdk.Arn.format(
          {
            service: "s3tables",
            resource: "bucket",
            resourceName: "*", // ワイルドカード必須。特定バケット指定・作成後の変更ともに不可
          },
          stack,
        ),
        connectionName: "aws:s3tables",
      },
      createDatabaseDefaultPermissions: [iamAllowedPrincipalsAll], // IAM アクセス制御モード:新規 DB に IAM_ALLOWED_PRINCIPALS:ALL を自動付与
      createTableDefaultPermissions: [iamAllowedPrincipalsAll], // IAM アクセス制御モード:新規テーブルに IAM_ALLOWED_PRINCIPALS:ALL を自動付与
      allowFullTableExternalDataAccess: "True", // 外部クエリエンジン(Athena 等)からのフルアクセスを許可
    });
  }
}

6. LogS3TablePipelineConstruct

CloudWatch Logs → Firehose → S3 Tables のログ配信パイプラインを管理するコンストラクトです。FirehoseConstructSubscriptionFilterConstruct をまとめています。

lambda-a/logging/log-s3table-pipeline/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";

import { FirehoseConstruct } from "./firehose";
import { SubscriptionFilterConstruct } from "./subscription-filter";

interface LogS3TablePipelineConstructProps {
  logGroup: logs.ILogGroup;
  namespace: s3tables.Namespace;
  table: s3tables.Table;
  tableBucketCatalogArn: string;
}

/**
 * CloudWatch Logs → Firehose → S3 Tables(Iceberg)のログ配信パイプラインを管理するコンストラクト。
 */
export class LogS3TablePipelineConstruct extends Construct {
  readonly firehoseConstruct: FirehoseConstruct;

  constructor(
    scope: Construct,
    id: string,
    props: LogS3TablePipelineConstructProps,
  ) {
    super(scope, id);

    const { logGroup, namespace, table, tableBucketCatalogArn } = props;

    /** Iceberg テーブルへの配信ストリーム */
    this.firehoseConstruct = new FirehoseConstruct(this, "Firehose", {
      tableBucketCatalogArn,
      namespace,
      table,
    });

    /** CloudWatch Logs から Firehose へのサブスクリプションフィルター */
    new SubscriptionFilterConstruct(this, "SubscriptionFilter", {
      logGroup,
      deliveryStream: this.firehoseConstruct.deliveryStream,
    });
  }
}

7. FirehoseConstruct

関連するハマったポイント:その5(プロセッサ非対応)、その6(DefaultPolicy の依存関係)、その7(バッファ間隔)、その8(IAM 権限)

Apache Iceberg destination の Firehose 配信ストリームを作成するコンストラクトです。tableBucketCatalogArnS3TablesBucketConstruct から受け取ります。

lambda-a/logging/log-s3table-pipeline/firehose/index.ts
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as cdk from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as firehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as logs from "aws-cdk-lib/aws-logs";
import * as s3 from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";

import { TransformLambdaConstruct } from "./transform-lambda";

interface FirehoseConstructProps {
  /** Firehose の Iceberg destination に渡す Glue カタログ ARN(バケットレベル) */
  tableBucketCatalogArn: string;
  namespace: s3tables.Namespace;
  table: s3tables.Table;
}

/**
 * Kinesis Data Firehose 配信ストリームを管理するコンストラクト。
 *
 * - 入力: CloudWatch Logs サブスクリプションフィルター(gzip 圧縮された CWL ペイロード)
 * - 処理: Lambda 変換で gzip 解凍 + CWL ラッパー除去 + プラットフォームログ除外
 * - 出力: Apache Iceberg テーブル(S3 Tables)
 *
 * Iceberg destination は AWS マネージドの Decompression / CloudWatchLogProcessing プロセッサに
 * 非対応のため、Lambda 変換で gzip 解凍と JSON 抽出をまとめて行う。
 */
export class FirehoseConstruct extends Construct {
  readonly deliveryStream: firehose.CfnDeliveryStream;

  constructor(scope: Construct, id: string, props: FirehoseConstructProps) {
    super(scope, id);

    const { tableBucketCatalogArn, namespace, table } = props;

    /** 配信失敗時のバックアップ用 S3 バケット */
    const backupBucket = new s3.Bucket(this, "BackupBucket", {
      removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
      autoDeleteObjects: true,
    });

    /** Firehose 配信エラー用 CloudWatch Logs */
    const errorLogGroup = new logs.LogGroup(this, "ErrorLogGroup", {
      removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
    });
    const errorLogStream = new logs.LogStream(this, "ErrorLogStream", {
      logGroup: errorLogGroup,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    /** Lambda 変換関数(CWL gzip 解凍 + ラッパー除去) */
    const { function: transformFunction } = new TransformLambdaConstruct(
      this,
      "TransformLambda",
    );

    /** Firehose 用サービスロール */
    const role = new iam.Role(this, "Role", {
      assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
    });

    /**
     * Glue Federation 経由で S3 Tables に書き込むための許可。
     * @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-firehose.html#firehose-role-s3tables
     *
     * 以下の試行錯誤の結果、上記ドキュメントのサンプルよりも権限を広く取る必要があった:
     * - glue:GetTable / glue:UpdateTable を個別指定 → Firehose ストリーム作成時の検証で AccessDenied
     * - glue:* + lakeformation:* → Firehose ストリーム作成には成功するが、s3tables:* を省くと
     *   データ配信が失敗するケースがあった
     * - glue:* + lakeformation:* + s3tables:* → 作成・配信ともに成功(現在の設定)
     * - アクションを絞っても、リソースをバケット・テーブル ARN に限定しても AccessDenied になるため、
     *   いずれも resources: ["*"] が必要
     */
    role.addToPolicy(
      new iam.PolicyStatement({
        actions: ["glue:*", "lakeformation:*", "s3tables:*"],
        resources: ["*"],
      }),
    );

    backupBucket.grantReadWrite(role);
    errorLogGroup.grantWrite(role);
    transformFunction.grantInvoke(role);

    this.deliveryStream = new firehose.CfnDeliveryStream(this, "Default", {
      deliveryStreamType: "DirectPut",
      icebergDestinationConfiguration: {
        roleArn: role.roleArn,
        catalogConfiguration: { catalogArn: tableBucketCatalogArn },
        s3Configuration: {
          bucketArn: backupBucket.bucketArn,
          roleArn: role.roleArn,
          errorOutputPrefix: "errors/",
        },
        // ログは追記のみのため INSERT に限定してスループットを最適化する。UPSERT が必要な場合は false に変更する
        appendOnly: true,
        // バッファ間隔が長いほど1レコードに含まれる logEvents が増えログサイズが大きくなるため、
        // ログ量に応じて intervalInSeconds / sizeInMBs を調整すること
        bufferingHints: {
          intervalInSeconds: 60, // 複数レコードをまとめて Iceberg テーブルに書き込むためのバッファ間隔
          sizeInMBs: 1, // 最小バッファサイズ
        },
        destinationTableConfigurationList: [
          {
            destinationDatabaseName: namespace.namespaceName,
            destinationTableName: table.tableName,
          },
        ],
        /**
         * Iceberg destination ではネイティブ Decompression / CloudWatchLogProcessing が
         * サポートされないため、Lambda 変換で gzip 解凍と Lambda JSON ログの抽出を
         * 一括で行う。
         */
        processingConfiguration: {
          enabled: true,
          processors: [
            {
              type: "Lambda",
              parameters: [
                {
                  parameterName: "LambdaArn",
                  parameterValue: transformFunction.functionArn,
                },
              ],
            },
          ],
        },
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: errorLogGroup.logGroupName,
          logStreamName: errorLogStream.logStreamName,
        },
      },
    });

    /** Iceberg テーブル作成完了後に配信ストリームを作成 */
    this.deliveryStream.node.addDependency(table);

    /**
     * Firehose は作成時に roleArn を引数に glue:GetTable を呼ぶため、ロール本体だけでなく
     * インラインポリシー(DefaultPolicy)の作成完了も待つ必要がある。CDK は Ref で参照される
     * Role に対する依存関係しか自動付与しないため、ここで明示的に追加する。
     */
    const defaultPolicy = role.node
      .findChild("DefaultPolicy")
      .node.defaultChild as cdk.CfnResource;
    this.deliveryStream.node.addDependency(defaultPolicy);
  }
}

8. TransformLambdaConstruct

関連するハマったポイント:その5(プロセッサ非対応)、その2(1レコードに JSON オブジェクトは1つのみという制約)

Firehose の Lambda 変換用関数を作成するコンストラクトです。

lambda-a/logging/log-s3table-pipeline/firehose/transform-lambda/index.ts
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs";
import { Construct } from "constructs";

/**
 * Firehose の Lambda 変換用の関数。
 *
 * - CWL gzip ペイロードを解凍
 * - プラットフォームログを除外し、アプリケーションログのみを抽出
 * - logEvents 配列全体を JSON 文字列として log_events カラムに格納
 */
export class TransformLambdaConstruct extends Construct {
  readonly function: lambda.IFunction;

  constructor(scope: Construct, id: string) {
    super(scope, id);

    this.function = new nodejs.NodejsFunction(this, "Default", {
      entry: "../server/src/lambda/handlers/firehose-transform.ts",
      runtime: lambda.Runtime.NODEJS_24_X,
      architecture: lambda.Architecture.ARM_64,
      logGroup: new cdk.aws_logs.LogGroup(this, "LogGroup", {
        removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
      }),
    });
  }
}

ハンドラー本体は下記です。CWL から流れてくる gzip 圧縮ペイロードを解凍し、logEvents 配列全体を JSON 文字列として1カラムに格納します。

packages/server/src/lambda/handlers/firehose-transform.ts
import { gunzipSync } from "zlib";

/**
 * CloudWatch Logs サブスクリプションフィルターから Firehose 経由で届く gzip 圧縮された
 * CWL ペイロードを解凍し、1レコードとして Iceberg テーブルに書き込む JSON を返す。
 *
 * Iceberg destination は AWS マネージドの Decompression プロセッサに非対応のため、Lambda 変換でログ行を取り出す。
 */

/**
 * CWL サブスクリプションフィルター経由で Firehose に届く gzip 解凍後の JSON ペイロード形式。
 * @see https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
 */
interface CwlPayload {
  messageType: string;
  logEvents: { id: string; timestamp: number; message: string }[];
}

interface FirehoseRecord {
  recordId: string;
  data: string; // base64-encoded gzip
}

interface FirehoseEvent {
  records: FirehoseRecord[];
}

export const handler = async (
  event: FirehoseEvent,
): Promise<{
  /** @see https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation-status-model.html */
  records: { recordId: string; result: "Ok" | "Dropped"; data: string }[];
}> => {
  const records = event.records.map((record) => {
    const decompressed = gunzipSync(
      new Uint8Array(Buffer.from(record.data, "base64")),
    ).toString("utf-8");
    const payload: CwlPayload = JSON.parse(decompressed);

    /**
     * 制御メッセージ(CONTROL_MESSAGE)など DATA_MESSAGE 以外はスキップ。
     * @see https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
     */
    if (payload.messageType !== "DATA_MESSAGE") {
      return {
        recordId: record.recordId,
        result: "Dropped" as const,
        data: "",
      };
    }

    /**
     * アプリケーションログのみを残し、message を Lambda JSON エンベロープの内側の値に差し替える。
     *
     * loggingFormat: JSON を設定すると Lambda は各ログ行を
     * { timestamp, level, requestId, message } のエンベロープで包む。
     * 発生源 Lambda は message 部に { level, message } 形式の JSON を出力することを前提とする。
     * プラットフォームログ(platform.initStart 等)は level フィールドを持たないため除外される。
     */
    const appLogs = payload.logEvents.flatMap((e) => {
      try {
        const envelope = JSON.parse(e.message) as Record<string, unknown>;
        if (typeof envelope.level !== "string") return [];
        // CWL の id / timestamp ラッパーを除去し、console.log の出力内容だけを返す
        return [
          JSON.parse(envelope.message as string) as Record<string, unknown>,
        ];
      } catch {
        return [];
      }
    });

    if (appLogs.length === 0) {
      return {
        recordId: record.recordId,
        result: "Dropped" as const,
        data: "",
      };
    }

    /**
     * Iceberg destination は1レコードにつき JSON オブジェクトを1つしか受け付けないため、
     * logEvents 配列全体を JSON 文字列として1カラムに格納する。
     * これにより CWL ペイロード内の全 logEvent をロストせず保持できる。
     *
     * クエリ時は Athena の json_parse / json_extract を使って logEvents を展開する。
     *
     * @see https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-considerations.html
     */
    const output = JSON.stringify({
      log_events: JSON.stringify(appLogs),
    });

    return {
      recordId: record.recordId,
      result: "Ok" as const,
      data: Buffer.from(output).toString("base64"),
    };
  });

  return { records };
};

9. SubscriptionFilterConstruct

関連するハマったポイント:その6(DefaultPolicy の依存関係)

CloudWatch Logs から Firehose へログを送るサブスクリプションフィルターです。Firehose に PutRecord する際に CloudWatch Logs が assume するロールも合わせて作成します。

lambda-a/logging/log-s3table-pipeline/subscription-filter/index.ts
import * as cdk from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as firehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";

interface SubscriptionFilterConstructProps {
  logGroup: logs.ILogGroup;
  deliveryStream: firehose.CfnDeliveryStream;
}

/**
 * CloudWatch Logs から Kinesis Data Firehose へログを送るサブスクリプションフィルター。
 *
 * - CloudWatch Logs が Firehose に PutRecord する際に assume するロールを作成
 * - 全ログを Firehose に送るために filterPattern は空文字を指定
 *
 * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample
 */
export class SubscriptionFilterConstruct extends Construct {
  constructor(
    scope: Construct,
    id: string,
    props: SubscriptionFilterConstructProps,
  ) {
    super(scope, id);

    const { logGroup, deliveryStream } = props;

    const stack = cdk.Stack.of(this);

    /** CWL → Firehose の PutRecord に使われるロール */
    const role = new iam.Role(this, "Role", {
      assumedBy: new iam.ServicePrincipal(`logs.${stack.region}.amazonaws.com`),
    });
    role.addToPolicy(
      new iam.PolicyStatement({
        actions: ["firehose:PutRecord", "firehose:PutRecordBatch"],
        resources: [deliveryStream.attrArn],
      }),
    );

    const subscriptionFilter = new logs.CfnSubscriptionFilter(this, "Default", {
      logGroupName: logGroup.logGroupName,
      destinationArn: deliveryStream.attrArn,
      roleArn: role.roleArn,
      filterPattern: "", // すべてのログを送信
    });

    /**
     * サブスクリプションフィルター作成時に Firehose へテストメッセージを PUT して権限検証するため、
     * インラインポリシーの作成完了を待つ。
     */
    const defaultPolicy = role.node.findChild("DefaultPolicy").node
      .defaultChild as cdk.CfnResource;
    subscriptionFilter.node.addDependency(defaultPolicy);
  }
}

10. AthenaWorkGroupConstruct

Athena のワークグループとクエリ結果バケットを作成します。前回の記事と構造は同じで、ワークグループ名のみ変えています。

athena-workgroup/index.ts
import * as athena from "aws-cdk-lib/aws-athena";
import { Construct } from "constructs";

import { ResultBucketConstruct } from "./result-bucket";

/**
 * Athena ワークグループとクエリ結果バケットを管理するコンストラクト。
 */
export class AthenaWorkGroupConstruct extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    /** クエリ結果を保存する S3 バケット */
    const { bucket } = new ResultBucketConstruct(this, "ResultBucket");

    /** Athena ワークグループ */
    new athena.CfnWorkGroup(this, "Default", {
      name: "lambda-firehose-s3tables",
      workGroupConfiguration: {
        resultConfiguration: {
          outputLocation: `s3://${bucket.bucketName}/results/`,
        },
      },
      recursiveDeleteOption: true, // スタック削除時にクエリ結果バケットも削除する(検証用スタックのため。本番では false を推奨)
    });
  }
}

11. 動作確認

デプロイ

npx cdk deploy

Lambda 関数を呼び出してログを発生させる

Lambda のイベントに count を渡すと、その数だけログを出力します(デフォルトは 3 件)。

aws lambda invoke \
  --function-name <関数> \
  --payload '{"count": 3}' \
  --cli-binary-format raw-in-base64-out \
  /dev/null

バッファ間隔(60 秒)経過後、Athena でクエリするとデータが書き込まれていることを確認できます。log_events カラムには CWL のバッファリング単位で logEvents 配列が JSON 文字列として格納されます。

count の値を変えながら実行間隔を 90 秒以上空けると、1レコードに含まれるログ件数が異なる複数のレコードを作成できます。以下は count=3count=1count=2 の順に呼び出した結果です。

SELECT
  json_array_length(json_parse(log_events)) AS log_count,
  log_events
FROM "s3tablescatalog/<テーブルバケット名>"."lambda_logs"."<テーブル名>"
ORDER BY json_extract_scalar(json_parse(log_events), '$[0].timestamp');
log_count | log_events
3         | [{"timestamp":"2026-05-11T09:37:04.918+09:00","level":"INFO","message":"start"},{"timestamp":"2026-05-11T09:37:04.950+09:00","level":"INFO","message":"invoked"},{"timestamp":"2026-05-11T09:37:04.950+09:00","level":"INFO","message":"end"}]
1         | [{"timestamp":"2026-05-11T09:38:35.694+09:00","level":"INFO","message":"start"}]
2         | [{"timestamp":"2026-05-11T09:40:06.302+09:00","level":"INFO","message":"start"},{"timestamp":"2026-05-11T09:40:06.302+09:00","level":"INFO","message":"invoked"}]

log_events の中身を展開したい場合は CROSS JOIN UNNEST を使います。

SELECT
  json_extract_scalar(log_event, '$.timestamp') AS timestamp,
  json_extract_scalar(log_event, '$.level')     AS level,
  json_extract_scalar(log_event, '$.message')   AS message
FROM "s3tablescatalog/<テーブルバケット名>"."lambda_logs"."<テーブル名>"
CROSS JOIN UNNEST(CAST(json_parse(log_events) AS array(json))) AS t(log_event)
ORDER BY timestamp;
timestamp                         | level | message
2026-05-11T09:37:04.918+09:00     | INFO  | start
2026-05-11T09:37:04.950+09:00     | INFO  | invoked
2026-05-11T09:37:04.950+09:00     | INFO  | end
2026-05-11T09:38:35.694+09:00     | INFO  | start
2026-05-11T09:40:06.302+09:00     | INFO  | start
2026-05-11T09:40:06.302+09:00     | INFO  | invoked

1レコードに複数のログが含まれていても CROSS JOIN UNNEST で1行1ログに展開できます。timestamplevelmessage がすべて console.log で出力した値そのままで格納されていることが確認できます。

ハマったポイント

その1:S3 Tables のテーブルバケットは削除直後に同名で再作成できない

removalPolicy: DESTROY のテーブルバケットを削除すると、AWS 内部では非同期的に削除処理が進みます。同じ名前でデプロイし直そうとすると下記のエラーになります。

Resource handler returned message: "The bucket is in a transitional state because of
a previous deletion attempt. Try again later." HandlerErrorCode: AlreadyExists

削除が完了するまで待機するか、コンストラクト ID を変えて別名で再作成するようにしましょう。この挙動により検証時に結構時間を取られました。

その2:Iceberg destination の1レコードには JSON オブジェクトを1つしか含められない

Iceberg destination には「1レコードにつき JSON オブジェクトは1つだけ」という制約があります。

CWL サブスクリプションフィルターが Firehose に送る1ペイロードには複数の logEvents が含まれる場合があります。そのため、最初は各 logEvent を個別の JSON に変換して改行区切り(NDJSON)で結合する実装を試みましたが、制約違反でエラーになりました。

// NG: 複数の logEvent を1レコードに詰めるとエラーになる
return {
  recordId: record.recordId,
  result: "Ok" as const,
  data: Buffer.from(lines.join("\n")).toString("base64"), // 複数 JSON → 制約違反
};

また、最初の logEvent だけを返す方法もありますが、この場合 2 件目以降の logEvent がロストします。

// NG: データロストが発生する
return {
  recordId: record.recordId,
  result: "Ok" as const,
  data: Buffer.from(lines[0]).toString("base64"), // 2件目以降がロスト
};

解決策として、logEvents 配列全体を JSON 文字列として1カラムに格納する方法を採用しました。1レコード = 1 CWL ペイロード となるため制約に違反せず、全 logEvent を保持できます。クエリ時は Athena の json_parse / json_extract で logEvents を展開します。

// OK: logEvents 配列全体を1カラムに格納する
const output = JSON.stringify({
  log_events: JSON.stringify(appLogs),
});

テーブルスキーマも合わせて1カラムのみ定義しています。

lambda-a/logging/s3-table/index.ts
schemaFieldList: [
  { name: "log_events", type: "string" }, // logEvents 配列を JSON 文字列として格納
],

その3:Glue カタログの削除には Lake Formation の DROP 権限が必要

スタック削除時に Glue カタログの削除が AccessDeniedException になる場合は、Lake Formation の DROP 権限が不足しています。CDK の CloudFormation 実行ロールに権限を付与してください。

# CDK 実行ロール(cdk-hnb659fds-cfn-exec-role)に DROP を付与
aws lakeformation grant-permissions \
  --principal '{"DataLakePrincipalIdentifier":"arn:aws:iam::<ACCOUNT_ID>:role/cdk-hnb659fds-cfn-exec-role-<ACCOUNT_ID>-ap-northeast-1"}' \
  --permissions DROP \
  --resource '{"Catalog":{"Id":"<ACCOUNT_ID>:s3tablescatalog"}}' \
  --region ap-northeast-1

なお、この grant-permissions コマンド自体も Lake Formation 管理者権限が必要です。CLI で AccessDeniedException になる場合は AWS コンソールの Lake Formation → Data permissions から付与してください。

その4:federatedCatalog.identifierbucket/* 固定で変更不可

s3tablescatalog という名前の Glue フェデレーテッドカタログは、federatedCatalog.identifierarn:aws:s3tables:{region}:{account}:bucket/* でなければ作成できません。特定バケットの ARN を指定すると次のエラーになります。

s3tablescatalog is a reserved name for federated Amazon S3 Tables catalogs
with the federation identifier arn:aws:s3tables:...:bucket/*
and aws:s3tables connection name.

また、作成後に変更することもできません。Lake Formation の Alter on Catalog 権限を付与した上でデプロイしても、Glue API 自体が更新を拒否します。

Update on FederateCatalog is not allowed.

コード中の resourceName: "*" はワイルドカードですが、特定バケットに変更することはできない必須値です。

その5:Iceberg destination は Decompression / CloudWatchLogProcessing プロセッサ非対応

CWL から Firehose に流れてくるレコードは gzip 圧縮された JSON で、logEvents 配列の中にログ行が入れ子になっています。これを S3 destination で配信する場合は AWS マネージドな Decompression + CloudWatchLogProcessing プロセッサを組み合わせれば Lambda 変換不要で平坦化できます(参考)。

しかし、Iceberg destination ではこれらのプロセッサに非対応です。デプロイ時に下記のエラーになります。

Resource handler returned message: "DecompressionProcessor is currently only supported for destination type S3, Splunk and Snowflake"

そのため、CWL → Firehose → Iceberg ルートでは gzip 解凍と JSON 抽出を Lambda 変換で行う必要があります

その6:Firehose の依存関係に DefaultPolicy を含める必要がある

Firehose は作成時にロールの glue:GetTable を呼んでテーブルの存在と権限を検証します。このとき CDK の自動依存関係は ロール本体 までしか辿らないため、ロールにアタッチされる DefaultPolicyAWS::IAM::Policy)の作成と Firehose の作成が並行で走ってしまい、検証時にポリシーが未付与で Firehose 作成が失敗することがあります。

Resource handler returned message: "Role ... is not authorized to perform: glue:GetTable
for the given table or the table does not exist."

そのため、明示的に DefaultPolicy を依存関係に追加して、ポリシーアタッチ完了後に Firehose を作成させる必要があります。

lambda-a/log-s3table-pipeline/firehose/index.ts
const defaultPolicy = role.node
  .findChild("DefaultPolicy")
  .node.defaultChild as cdk.CfnResource;
this.deliveryStream.node.addDependency(defaultPolicy);

これは SubscriptionFilterConstruct 側の Firehose PutRecord 検証でも同様に必要です。

その7:Iceberg destination のバッファ間隔に 0 を指定してもデプロイ・配信ともに成功する

bufferingHints.intervalInSeconds0 を指定した場合、デプロイも配信も問題なく成功しました。内部的に最小値(60 秒)として扱われているものと思われます。本構成では意図を明示するために 60 を指定しています。

lambda-a/log-s3table-pipeline/firehose/index.ts
bufferingHints: {
  intervalInSeconds: 60, // 複数レコードをまとめて Iceberg テーブルに書き込むためのバッファ間隔
  sizeInMBs: 1,
},

その8:Firehose ロールの IAM 権限はドキュメントのサンプルより広く取る必要がある

AWS ドキュメントのサンプルでは glue:GetTable / glue:UpdateTable などを個別に指定する例が示されていますが、実際には以下の問題が発生しました。

  • アクションを個別指定 → Firehose ストリーム作成時に内部で行われる glue:GetTable の権限検証で AccessDenied
  • glue:* + lakeformation:* のみs3tables:* 省略)→ ストリーム作成は成功するが、データ配信が失敗するケースがあった
  • アクションを広げても、リソースをバケット・テーブル ARN に限定AccessDenied

結果として、glue:* + lakeformation:* + s3tables:*resources: ["*"] で付与する必要がありました。

なお、lakeformation:* は Lake Formation の API を呼び出すための IAM レベルの権限であり、Lake Formation コンソールや CLI で付与するデータ権限(テーブルへの SELECT 等)とは別物です。S3TablesCatalogConstructcreateTableDefaultPermissions: IAM_ALLOWED_PRINCIPALS:ALL を設定しているため、Lake Formation のデータ権限は付与不要で IAM 権限だけでアクセス制御できます。

課題と今後の改善案

今回の構成を試すにあたり、次のような知見とフィードバックを得ることができました。

課題

  • Lambda ログと S3 Tables の相性は良くない — Lambda ログはそもそも非構造化ログであり、S3 Tables が得意とする構造化データとは性質が異なる。加えて CWL が複数の logEvents をまとめて Firehose に送るため Iceberg destination の「1レコード1 JSON オブジェクト」制約と相性が悪く、Firehose の Iceberg destination が gzip 解凍プロセッサに非対応なため変換 Lambda も必須となり、コストと管理コストが増える
  • クエリに UNNEST が必要でやや煩雑 — 1レコードに JSON 文字列として logEvents 配列を詰めているため、個々のログ行にアクセスするには CROSS JOIN UNNEST(CAST(json_parse(log_events) AS array(json))) が必要。SELECT * ではそのままクエリできない
  • Lake Formation による列レベルのアクセス制御ができない — 全データを log_events の1カラムに詰めているため、特定のフィールドだけを隠すといった列レベルのセキュリティを適用できない

今後の改善案

  • S3 Tables は統計分析・他データソースとの突き合わせ用途に絞る — そもそも、単に何が起きたかを調べたい場合は CloudWatch Logs Insights の方が手軽。S3 Tables は集計・分析・列レベルセキュリティが求められるユースケースに向いている
  • Kinesis Data Streams への切り替えで1イベント1レコード化できる可能性がある — AWS ドキュメントによると、KPL でレコードを集約し Amazon Kinesis Data Streams 経由で Firehose に取り込む場合は自動的に集約解除されて1レコード1 JSON オブジェクトとして扱われる。ソースを CWL サブスクリプションフィルターから Kinesis Data Streams に切り替えることで、各ログイベントを個別レコードとして書き込める可能性がある
  • CWL 経由ではなく Firehose へダイレクトプットする — CWL を経由せず、ログ発生源から直接 Firehose へプットする構成にすれば、送信時点で適切な JSON 形式に整形できる。CWL サブスクリプションフィルターによる gzip 圧縮がなくなるため変換 Lambda での解凍処理も不要になる。またバッファ間隔を 0 秒に設定することで複数データが1レコードにまとめられない可能性もある

おわりに

Lambda 関数のログを CloudWatch Logs サブスクリプションフィルター経由で Amazon Data Firehose に流し、S3 Tables(Iceberg)に書き込む構成を CDK で実装してみました。

S3 テーブル統合機能を使った構成と比べると、

  • テーブルスキーマを自前で決められる(格納するフィールドや型を用途に合わせて定義できる)
  • データ保持期間を CWL の保持ポリシーから切り離せる(Iceberg 側で別途管理)
  • JST 表記でタイムスタンプを保持できる(クエリ時のタイムゾーン変換が不要)

といった自由度があります。一方で Lambda 変換の実装と運用が増えるクエリの煩雑さ列レベルアクセス制御の制限 といったデメリットが引き続きあります。次回以降で改善案を試してみたいと思います。

参考

https://dev.classmethod.jp/articles/lambda-logs-s3-tables-cdk/

https://dev.classmethod.jp/articles/aws-cdk-s3tables-namespace-and-table-l2-construct/

https://dev.classmethod.jp/articles/aws-firehose-s3-tables-integration-tutorial/

https://aws.amazon.com/blogs/big-data/deliver-decompressed-amazon-cloudwatch-logs-to-amazon-s3-and-splunk-using-amazon-data-firehose/

https://zenn.dev/dataheroes/articles/20250728-cwlogs-firehose-iceberg-s3tables

以上

この記事をシェアする

関連記事