Lambda 関数のログを CloudWatch Logs サブスクリプションフィルター経由で Amazon Data Firehose に流し S3 Tables(Iceberg)へ書き込む構成を CDK で実装してみた
こんにちは、クラスメソッドの若槻です。
先日、CloudWatch Logs の AWS マネージドな S3 テーブル統合機能を CDK で実装する記事を書きました。
ただ、AWS マネージドな S3 テーブル統合機能には「データ保持期間がロググループの保持ポリシーに連動する」「テーブルスキーマを自前で決められない」といった制約があります。用途に応じてスキーマやデータ保持期間をコントロールしたい場合は、CloudWatch Logs から Amazon Data Firehose(旧 Kinesis Data Firehose)を経由して Amazon S3 Tables に直接書き込む構成のほうが向いています。
本記事では、AWS CDK(TypeScript)でこの構成を実装してみました。
実装した構成
実装した主なリソースは以下です。
| リソース | 役割 |
|---|---|
| Lambda 関数 | ログの発生源。loggingFormat: JSON で JSON 形式のログを出力する |
| サブスクリプションフィルター | CloudWatch Logs から Firehose へログを転送 |
| Amazon Data Firehose | Apache Iceberg destination で S3 Tables へ書き込む配信ストリーム |
| Lambda 変換関数 | Firehose 内で gzip 解凍・CWL ラッパー除去・プラットフォームログ除外を実行 |
| S3 Tables | テーブルバケット・名前空間・Iceberg テーブル |
| Glue s3tablescatalog | Athena から S3 Tables にアクセスするためのフェデレーテッドカタログ |
| Athena ワークグループ | クエリ実行環境 |
アーキテクチャは以下の通りです。
Lambda 関数(loggingFormat: JSON)
↓
CloudWatch Logs(/aws/lambda/xxx)
↓ サブスクリプションフィルター
Amazon Data Firehose
├─ Lambda 変換: gzip 解凍 + CWL ラッパー除去 + プラットフォームログ除外
└─ Iceberg destination
S3 Tables(Apache Iceberg テーブル)
↑
Athena(s3tablescatalog 経由)
本構成では Lambda 関数ごとに専用の Iceberg テーブルとパイプライン(Firehose・サブスクリプションフィルター)を設けています。主な理由は以下の通りです。
- スキーマ・保持期間を関数単位で独立管理できる — 関数ごとにログ出力フィールドが異なる場合にも、テーブルスキーマを個別に定義できる
- 複数関数のログが混在しない — テーブルが分かれているためクエリがシンプルになる
- 拡張しやすい —
LambdaAConstructが S3 テーブル・Firehose・サブスクリプションフィルターを内包しているため、Lambda 関数を追加する際はLambdaBConstructなどを同じパターンで追加するだけで済む
コンストラクトは lib/constructs/ 配下にサブディレクトリとして分割し、CDK コンストラクトツリーとディレクトリ階層を一致させています。
lib/
├── constructs/
│ ├── lambda-a/
│ │ ├── index.ts # LambdaAConstruct
│ │ └── logging/
│ │ ├── index.ts # LoggingConstruct
│ │ ├── log-group/index.ts # LogGroupConstruct
│ │ ├── s3-table/index.ts # S3TableConstruct
│ │ └── log-s3table-pipeline/
│ │ ├── index.ts # LogS3TablePipelineConstruct
│ │ ├── firehose/
│ │ │ ├── index.ts # FirehoseConstruct
│ │ │ └── transform-lambda/index.ts # TransformLambdaConstruct
│ │ └── subscription-filter/index.ts # SubscriptionFilterConstruct
│ ├── s3-tables-bucket/
│ │ ├── index.ts # S3TablesBucketConstruct
│ │ └── lambda-logs-namespace/index.ts # LambdaLogsNamespaceConstruct
│ ├── s3-tables-catalog/index.ts # S3TablesCatalogConstruct
│ └── athena-workgroup/
│ ├── index.ts # AthenaWorkGroupConstruct
│ └── result-bucket/index.ts # ResultBucketConstruct
└── sample-stack.ts # スタック本体
CDK コンストラクトツリーは以下の構造です。
SampleStack
├── S3TablesBucketConstruct
│ └── LambdaLogsNamespaceConstruct
├── LambdaAConstruct
│ ├── LoggingConstruct
│ │ ├── LogGroupConstruct
│ │ ├── S3TableConstruct
│ │ └── LogS3TablePipelineConstruct
│ │ ├── FirehoseConstruct
│ │ └── SubscriptionFilterConstruct
│ └── Default (Lambda Function)
├── S3TablesCatalogConstruct
└── AthenaWorkGroupConstruct
やってみた
0. 前提
CDK バージョン
S3 Tables の名前空間とテーブルを作成する @aws-cdk/aws-s3tables-alpha を使用します。各パッケージは最新版をインストールしてください。
npm install aws-cdk-lib@latest constructs@latest @aws-cdk/aws-s3tables-alpha@latest
npm install -D aws-cdk@latest
1. スタック本体
スタック本体のコードです。
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import { AthenaWorkGroupConstruct } from "./constructs/athena-workgroup";
import { LambdaAConstruct } from "./constructs/lambda-a";
import { S3TablesBucketConstruct } from "./constructs/s3-tables-bucket";
import { S3TablesCatalogConstruct } from "./constructs/s3-tables-catalog";
export class SampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
/** S3 Tables(テーブルバケット・Lambda ログ用名前空間) */
const s3TablesBucketConstruct = new S3TablesBucketConstruct(
this,
"S3TablesBucket",
);
/** Glue s3tablescatalog フェデレーテッドカタログ(Firehose / Athena から S3 Tables へアクセスするために必要) */
const s3TablesCatalogConstruct = new S3TablesCatalogConstruct(
this,
"S3TablesCatalog",
);
/** Lambda 関数 A: サンプルの Lambda 関数(S3 Tables ログパイプライン含む) */
const lambdaAConstruct = new LambdaAConstruct(this, "LambdaA", {
namespace: s3TablesBucketConstruct.lambdaLogsNamespace,
tableBucketCatalogArn: s3TablesBucketConstruct.tableBucketCatalogArn,
});
// Lambda 関数が増える場合はここに LambdaBConstruct などを追加していく
/** Firehose はカタログ経由で Iceberg テーブルにアクセスするため、s3tablescatalog の作成完了を待つ */
lambdaAConstruct.node.addDependency(s3TablesCatalogConstruct);
/** Athena ワークグループ(クエリ実行環境) */
new AthenaWorkGroupConstruct(this, "AthenaWorkGroup");
}
}
2. S3TablesBucketConstruct / LambdaLogsNamespaceConstruct
S3 Tables のテーブルバケットと Lambda ログ用名前空間("lambda_logs")をそれぞれ独立したコンストラクトで管理します。
関連するハマったポイント:その1(テーブルバケットの削除・再作成)
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import { LambdaLogsNamespaceConstruct } from "./lambda-logs-namespace";
/** S3 Tables のテーブルバケットを管理するコンストラクト。 */
export class S3TablesBucketConstruct extends Construct {
readonly tableBucket: s3tables.TableBucket;
/** Firehose Iceberg destination に渡す Glue カタログ ARN(バケットレベル) */
readonly tableBucketCatalogArn: string;
readonly lambdaLogsNamespace: s3tables.Namespace;
constructor(scope: Construct, id: string) {
super(scope, id);
const stack = cdk.Stack.of(this);
// S3 Tables バケット名はアカウント内でユニークであれば良いため、construct パスから自動導出する
const tableBucketName = cdk.Names.uniqueId(this).toLowerCase();
/** S3 Tables テーブルバケット */
this.tableBucket = new s3tables.TableBucket(this, "Default", {
tableBucketName, // 指定必須
removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
});
this.tableBucketCatalogArn = cdk.Arn.format(
{
service: "glue",
resource: "catalog",
resourceName: `s3tablescatalog/${tableBucketName}`,
},
stack,
);
/** Lambda 関数ログ用名前空間("lambda_logs") */
const lambdaLogsNamespaceConstruct = new LambdaLogsNamespaceConstruct(
this,
"LambdaLogsNamespace",
{ tableBucket: this.tableBucket },
);
this.lambdaLogsNamespace = lambdaLogsNamespaceConstruct.namespace;
// 名前空間を追加する場合は、ここに XxxNamespaceConstruct を追加していく
}
}
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import { Construct } from "constructs";
interface LambdaLogsNamespaceConstructProps {
tableBucket: s3tables.TableBucket;
}
/** Lambda 関数ログ用の S3 Tables 名前空間("lambda_logs")を管理するコンストラクト。 */
export class LambdaLogsNamespaceConstruct extends Construct {
readonly namespace: s3tables.Namespace;
constructor(
scope: Construct,
id: string,
props: LambdaLogsNamespaceConstructProps,
) {
super(scope, id);
const { tableBucket } = props;
this.namespace = new s3tables.Namespace(this, "Default", {
namespaceName: "lambda_logs",
tableBucket,
});
}
}
3. LambdaAConstruct / LoggingConstruct
LambdaAConstruct は Lambda 関数本体と、ログ関連リソースをまとめた LoggingConstruct を管理します。
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as lambda from "aws-cdk-lib/aws-lambda";
import { Construct } from "constructs";
import { LoggingConstruct } from "./logging";
interface LambdaAConstructProps {
namespace: s3tables.Namespace;
tableBucketCatalogArn: string;
}
/** Lambda 関数 A・ロググループ・S3 Tables ログパイプラインを管理するコンストラクト。 */
export class LambdaAConstruct extends Construct {
readonly function: lambda.Function;
constructor(scope: Construct, id: string, props: LambdaAConstructProps) {
super(scope, id);
const { namespace, tableBucketCatalogArn } = props;
/** ロググループ・Iceberg テーブル・配信パイプラインをまとめたログ基盤 */
const loggingConstruct = new LoggingConstruct(this, "Logging", {
namespace,
tableBucketCatalogArn,
});
/** ログ発生源となる Lambda 関数 */
this.function = new lambda.Function(this, "Default", {
runtime: lambda.Runtime.NODEJS_24_X,
handler: "index.handler",
code: lambda.Code.fromInline(`
exports.handler = async (event) => {
const jst = () => new Date(Date.now() + 9 * 60 * 60 * 1000).toISOString().replace('Z', '+09:00');
const count = event && typeof event.count === 'number' ? event.count : 3;
const messages = ['start', 'invoked', 'end'];
for (let i = 0; i < count; i++) {
console.log(JSON.stringify({ timestamp: jst(), level: 'INFO', message: messages[i] || 'log' }));
}
return { statusCode: 200 };
};`),
logGroup: loggingConstruct.logGroup,
loggingFormat: lambda.LoggingFormat.JSON, // S3 テーブルで扱いやすい構造化ログにするため
});
}
}
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";
import { LogGroupConstruct } from "./log-group";
import { LogS3TablePipelineConstruct } from "./log-s3table-pipeline";
import { S3TableConstruct } from "./s3-table";
interface LoggingConstructProps {
namespace: s3tables.Namespace;
tableBucketCatalogArn: string;
}
/** Lambda 関数のログ関連リソース(ロググループ・Iceberg テーブル・配信パイプライン)を管理するコンストラクト。 */
export class LoggingConstruct extends Construct {
readonly logGroup: logs.LogGroup;
constructor(scope: Construct, id: string, props: LoggingConstructProps) {
super(scope, id);
const { namespace, tableBucketCatalogArn } = props;
/** Lambda 関数のロググループ */
const { logGroup } = new LogGroupConstruct(this, "LogGroup");
this.logGroup = logGroup;
/** ログを格納する Iceberg テーブル */
const s3TableConstruct = new S3TableConstruct(this, "S3Table", {
namespace,
});
/** CloudWatch Logs → Firehose → S3 Tables のログ配信パイプライン */
new LogS3TablePipelineConstruct(this, "LogS3TablePipeline", {
logGroup,
namespace,
table: s3TableConstruct.table,
tableBucketCatalogArn,
});
}
}
4. S3TableConstruct
Lambda 関数ログを格納する S3 Tables の Iceberg テーブルを管理するコンストラクトです。スキーマは log_events の1カラムのみを定義しています。
関連するハマったポイント:その2(1レコードに JSON オブジェクトは1つのみという制約)
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
interface S3TableConstructProps {
namespace: s3tables.Namespace;
}
/**
* Lambda 関数ログを格納する S3 Tables の Iceberg テーブルを管理するコンストラクト。
*
* スキーマは Firehose の Lambda 変換で出力される JSON フィールドに合わせて定義する。
*/
export class S3TableConstruct extends Construct {
readonly table: s3tables.Table;
constructor(scope: Construct, id: string, props: S3TableConstructProps) {
super(scope, id);
const { namespace } = props;
// S3 Tables のテーブル名はコンストラクトパスから自動導出する(小文字・数字・アンダースコアのみ)
const tableName = cdk.Names.uniqueId(this)
.toLowerCase()
.replace(/[^a-z0-9]/g, "_");
this.table = new s3tables.Table(this, "Default", {
tableName,
namespace,
openTableFormat: s3tables.OpenTableFormat.ICEBERG,
removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
icebergMetadata: {
icebergSchema: {
schemaFieldList: [
/**
* logEvents 配列全体を JSON 文字列として格納する。
* Iceberg destination は1レコードにつき JSON オブジェクトを1つしか受け付けないため、
* logEvents を個別レコードに分割せず配列ごと1カラムに詰める。
* S3 Tables (Iceberg) はカラム名を小文字に正規化するため、フィールド名は小文字+アンダースコアで定義する。
* @see https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-considerations.html
*/
{ name: "log_events", type: "string" },
],
},
},
});
}
}
5. S3TablesCatalogConstruct
関連するハマったポイント:その3(Glue カタログ削除時の Lake Formation 権限)、その4(federatedCatalog.identifier は bucket/* 固定で変更不可)
Athena から S3 Tables にアクセスするための Glue フェデレーテッドカタログです。createDatabaseDefaultPermissions / createTableDefaultPermissions に IAM_ALLOWED_PRINCIPALS:ALL を設定して IAM アクセス制御モードにしている点は前回の記事と同じです。これにより、Lake Formation の明示的な権限付与なしで Firehose と Athena 双方からアクセスできるようになります。
import * as cdk from "aws-cdk-lib";
import * as glue from "aws-cdk-lib/aws-glue";
import { Construct } from "constructs";
/**
* Glue s3tablescatalog フェデレーテッドカタログ。
* @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-aws.html#table-integration-procedures
*/
export class S3TablesCatalogConstruct extends Construct {
/** Glue s3tablescatalog カタログの ARN */
readonly catalogArn: string;
constructor(scope: Construct, id: string) {
super(scope, id);
/** IAM アクセス制御モードで統合する設定 */
const iamAllowedPrincipalsAll: glue.CfnCatalog.PrincipalPermissionsProperty =
{
principal: { dataLakePrincipalIdentifier: "IAM_ALLOWED_PRINCIPALS" },
permissions: ["ALL"],
};
const stack = cdk.Stack.of(this);
this.catalogArn = cdk.Arn.format(
{ service: "glue", resource: "catalog", resourceName: "s3tablescatalog" },
stack,
);
new glue.CfnCatalog(this, "Default", {
name: "s3tablescatalog",
federatedCatalog: {
identifier: cdk.Arn.format(
{
service: "s3tables",
resource: "bucket",
resourceName: "*", // ワイルドカード必須。特定バケット指定・作成後の変更ともに不可
},
stack,
),
connectionName: "aws:s3tables",
},
createDatabaseDefaultPermissions: [iamAllowedPrincipalsAll], // IAM アクセス制御モード:新規 DB に IAM_ALLOWED_PRINCIPALS:ALL を自動付与
createTableDefaultPermissions: [iamAllowedPrincipalsAll], // IAM アクセス制御モード:新規テーブルに IAM_ALLOWED_PRINCIPALS:ALL を自動付与
allowFullTableExternalDataAccess: "True", // 外部クエリエンジン(Athena 等)からのフルアクセスを許可
});
}
}
6. LogS3TablePipelineConstruct
CloudWatch Logs → Firehose → S3 Tables のログ配信パイプラインを管理するコンストラクトです。FirehoseConstruct と SubscriptionFilterConstruct をまとめています。
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";
import { FirehoseConstruct } from "./firehose";
import { SubscriptionFilterConstruct } from "./subscription-filter";
interface LogS3TablePipelineConstructProps {
logGroup: logs.ILogGroup;
namespace: s3tables.Namespace;
table: s3tables.Table;
tableBucketCatalogArn: string;
}
/**
* CloudWatch Logs → Firehose → S3 Tables(Iceberg)のログ配信パイプラインを管理するコンストラクト。
*/
export class LogS3TablePipelineConstruct extends Construct {
readonly firehoseConstruct: FirehoseConstruct;
constructor(
scope: Construct,
id: string,
props: LogS3TablePipelineConstructProps,
) {
super(scope, id);
const { logGroup, namespace, table, tableBucketCatalogArn } = props;
/** Iceberg テーブルへの配信ストリーム */
this.firehoseConstruct = new FirehoseConstruct(this, "Firehose", {
tableBucketCatalogArn,
namespace,
table,
});
/** CloudWatch Logs から Firehose へのサブスクリプションフィルター */
new SubscriptionFilterConstruct(this, "SubscriptionFilter", {
logGroup,
deliveryStream: this.firehoseConstruct.deliveryStream,
});
}
}
7. FirehoseConstruct
関連するハマったポイント:その5(プロセッサ非対応)、その6(DefaultPolicy の依存関係)、その7(バッファ間隔)、その8(IAM 権限)
Apache Iceberg destination の Firehose 配信ストリームを作成するコンストラクトです。tableBucketCatalogArn は S3TablesBucketConstruct から受け取ります。
import * as s3tables from "@aws-cdk/aws-s3tables-alpha";
import * as cdk from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as firehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as logs from "aws-cdk-lib/aws-logs";
import * as s3 from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
import { TransformLambdaConstruct } from "./transform-lambda";
interface FirehoseConstructProps {
/** Firehose の Iceberg destination に渡す Glue カタログ ARN(バケットレベル) */
tableBucketCatalogArn: string;
namespace: s3tables.Namespace;
table: s3tables.Table;
}
/**
* Kinesis Data Firehose 配信ストリームを管理するコンストラクト。
*
* - 入力: CloudWatch Logs サブスクリプションフィルター(gzip 圧縮された CWL ペイロード)
* - 処理: Lambda 変換で gzip 解凍 + CWL ラッパー除去 + プラットフォームログ除外
* - 出力: Apache Iceberg テーブル(S3 Tables)
*
* Iceberg destination は AWS マネージドの Decompression / CloudWatchLogProcessing プロセッサに
* 非対応のため、Lambda 変換で gzip 解凍と JSON 抽出をまとめて行う。
*/
export class FirehoseConstruct extends Construct {
readonly deliveryStream: firehose.CfnDeliveryStream;
constructor(scope: Construct, id: string, props: FirehoseConstructProps) {
super(scope, id);
const { tableBucketCatalogArn, namespace, table } = props;
/** 配信失敗時のバックアップ用 S3 バケット */
const backupBucket = new s3.Bucket(this, "BackupBucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
autoDeleteObjects: true,
});
/** Firehose 配信エラー用 CloudWatch Logs */
const errorLogGroup = new logs.LogGroup(this, "ErrorLogGroup", {
removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
});
const errorLogStream = new logs.LogStream(this, "ErrorLogStream", {
logGroup: errorLogGroup,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
/** Lambda 変換関数(CWL gzip 解凍 + ラッパー除去) */
const { function: transformFunction } = new TransformLambdaConstruct(
this,
"TransformLambda",
);
/** Firehose 用サービスロール */
const role = new iam.Role(this, "Role", {
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});
/**
* Glue Federation 経由で S3 Tables に書き込むための許可。
* @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-integrating-firehose.html#firehose-role-s3tables
*
* 以下の試行錯誤の結果、上記ドキュメントのサンプルよりも権限を広く取る必要があった:
* - glue:GetTable / glue:UpdateTable を個別指定 → Firehose ストリーム作成時の検証で AccessDenied
* - glue:* + lakeformation:* → Firehose ストリーム作成には成功するが、s3tables:* を省くと
* データ配信が失敗するケースがあった
* - glue:* + lakeformation:* + s3tables:* → 作成・配信ともに成功(現在の設定)
* - アクションを絞っても、リソースをバケット・テーブル ARN に限定しても AccessDenied になるため、
* いずれも resources: ["*"] が必要
*/
role.addToPolicy(
new iam.PolicyStatement({
actions: ["glue:*", "lakeformation:*", "s3tables:*"],
resources: ["*"],
}),
);
backupBucket.grantReadWrite(role);
errorLogGroup.grantWrite(role);
transformFunction.grantInvoke(role);
this.deliveryStream = new firehose.CfnDeliveryStream(this, "Default", {
deliveryStreamType: "DirectPut",
icebergDestinationConfiguration: {
roleArn: role.roleArn,
catalogConfiguration: { catalogArn: tableBucketCatalogArn },
s3Configuration: {
bucketArn: backupBucket.bucketArn,
roleArn: role.roleArn,
errorOutputPrefix: "errors/",
},
// ログは追記のみのため INSERT に限定してスループットを最適化する。UPSERT が必要な場合は false に変更する
appendOnly: true,
// バッファ間隔が長いほど1レコードに含まれる logEvents が増えログサイズが大きくなるため、
// ログ量に応じて intervalInSeconds / sizeInMBs を調整すること
bufferingHints: {
intervalInSeconds: 60, // 複数レコードをまとめて Iceberg テーブルに書き込むためのバッファ間隔
sizeInMBs: 1, // 最小バッファサイズ
},
destinationTableConfigurationList: [
{
destinationDatabaseName: namespace.namespaceName,
destinationTableName: table.tableName,
},
],
/**
* Iceberg destination ではネイティブ Decompression / CloudWatchLogProcessing が
* サポートされないため、Lambda 変換で gzip 解凍と Lambda JSON ログの抽出を
* 一括で行う。
*/
processingConfiguration: {
enabled: true,
processors: [
{
type: "Lambda",
parameters: [
{
parameterName: "LambdaArn",
parameterValue: transformFunction.functionArn,
},
],
},
],
},
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: errorLogGroup.logGroupName,
logStreamName: errorLogStream.logStreamName,
},
},
});
/** Iceberg テーブル作成完了後に配信ストリームを作成 */
this.deliveryStream.node.addDependency(table);
/**
* Firehose は作成時に roleArn を引数に glue:GetTable を呼ぶため、ロール本体だけでなく
* インラインポリシー(DefaultPolicy)の作成完了も待つ必要がある。CDK は Ref で参照される
* Role に対する依存関係しか自動付与しないため、ここで明示的に追加する。
*/
const defaultPolicy = role.node
.findChild("DefaultPolicy")
.node.defaultChild as cdk.CfnResource;
this.deliveryStream.node.addDependency(defaultPolicy);
}
}
8. TransformLambdaConstruct
関連するハマったポイント:その5(プロセッサ非対応)、その2(1レコードに JSON オブジェクトは1つのみという制約)
Firehose の Lambda 変換用関数を作成するコンストラクトです。
import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs";
import { Construct } from "constructs";
/**
* Firehose の Lambda 変換用の関数。
*
* - CWL gzip ペイロードを解凍
* - プラットフォームログを除外し、アプリケーションログのみを抽出
* - logEvents 配列全体を JSON 文字列として log_events カラムに格納
*/
export class TransformLambdaConstruct extends Construct {
readonly function: lambda.IFunction;
constructor(scope: Construct, id: string) {
super(scope, id);
this.function = new nodejs.NodejsFunction(this, "Default", {
entry: "../server/src/lambda/handlers/firehose-transform.ts",
runtime: lambda.Runtime.NODEJS_24_X,
architecture: lambda.Architecture.ARM_64,
logGroup: new cdk.aws_logs.LogGroup(this, "LogGroup", {
removalPolicy: cdk.RemovalPolicy.DESTROY, // 検証用スタックのため。本番では RETAIN を推奨
}),
});
}
}
ハンドラー本体は下記です。CWL から流れてくる gzip 圧縮ペイロードを解凍し、logEvents 配列全体を JSON 文字列として1カラムに格納します。
import { gunzipSync } from "zlib";
/**
* CloudWatch Logs サブスクリプションフィルターから Firehose 経由で届く gzip 圧縮された
* CWL ペイロードを解凍し、1レコードとして Iceberg テーブルに書き込む JSON を返す。
*
* Iceberg destination は AWS マネージドの Decompression プロセッサに非対応のため、Lambda 変換でログ行を取り出す。
*/
/**
* CWL サブスクリプションフィルター経由で Firehose に届く gzip 解凍後の JSON ペイロード形式。
* @see https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
*/
interface CwlPayload {
messageType: string;
logEvents: { id: string; timestamp: number; message: string }[];
}
interface FirehoseRecord {
recordId: string;
data: string; // base64-encoded gzip
}
interface FirehoseEvent {
records: FirehoseRecord[];
}
export const handler = async (
event: FirehoseEvent,
): Promise<{
/** @see https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation-status-model.html */
records: { recordId: string; result: "Ok" | "Dropped"; data: string }[];
}> => {
const records = event.records.map((record) => {
const decompressed = gunzipSync(
new Uint8Array(Buffer.from(record.data, "base64")),
).toString("utf-8");
const payload: CwlPayload = JSON.parse(decompressed);
/**
* 制御メッセージ(CONTROL_MESSAGE)など DATA_MESSAGE 以外はスキップ。
* @see https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
*/
if (payload.messageType !== "DATA_MESSAGE") {
return {
recordId: record.recordId,
result: "Dropped" as const,
data: "",
};
}
/**
* アプリケーションログのみを残し、message を Lambda JSON エンベロープの内側の値に差し替える。
*
* loggingFormat: JSON を設定すると Lambda は各ログ行を
* { timestamp, level, requestId, message } のエンベロープで包む。
* 発生源 Lambda は message 部に { level, message } 形式の JSON を出力することを前提とする。
* プラットフォームログ(platform.initStart 等)は level フィールドを持たないため除外される。
*/
const appLogs = payload.logEvents.flatMap((e) => {
try {
const envelope = JSON.parse(e.message) as Record<string, unknown>;
if (typeof envelope.level !== "string") return [];
// CWL の id / timestamp ラッパーを除去し、console.log の出力内容だけを返す
return [
JSON.parse(envelope.message as string) as Record<string, unknown>,
];
} catch {
return [];
}
});
if (appLogs.length === 0) {
return {
recordId: record.recordId,
result: "Dropped" as const,
data: "",
};
}
/**
* Iceberg destination は1レコードにつき JSON オブジェクトを1つしか受け付けないため、
* logEvents 配列全体を JSON 文字列として1カラムに格納する。
* これにより CWL ペイロード内の全 logEvent をロストせず保持できる。
*
* クエリ時は Athena の json_parse / json_extract を使って logEvents を展開する。
*
* @see https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-considerations.html
*/
const output = JSON.stringify({
log_events: JSON.stringify(appLogs),
});
return {
recordId: record.recordId,
result: "Ok" as const,
data: Buffer.from(output).toString("base64"),
};
});
return { records };
};
9. SubscriptionFilterConstruct
関連するハマったポイント:その6(DefaultPolicy の依存関係)
CloudWatch Logs から Firehose へログを送るサブスクリプションフィルターです。Firehose に PutRecord する際に CloudWatch Logs が assume するロールも合わせて作成します。
import * as cdk from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as firehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as logs from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";
interface SubscriptionFilterConstructProps {
logGroup: logs.ILogGroup;
deliveryStream: firehose.CfnDeliveryStream;
}
/**
* CloudWatch Logs から Kinesis Data Firehose へログを送るサブスクリプションフィルター。
*
* - CloudWatch Logs が Firehose に PutRecord する際に assume するロールを作成
* - 全ログを Firehose に送るために filterPattern は空文字を指定
*
* @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample
*/
export class SubscriptionFilterConstruct extends Construct {
constructor(
scope: Construct,
id: string,
props: SubscriptionFilterConstructProps,
) {
super(scope, id);
const { logGroup, deliveryStream } = props;
const stack = cdk.Stack.of(this);
/** CWL → Firehose の PutRecord に使われるロール */
const role = new iam.Role(this, "Role", {
assumedBy: new iam.ServicePrincipal(`logs.${stack.region}.amazonaws.com`),
});
role.addToPolicy(
new iam.PolicyStatement({
actions: ["firehose:PutRecord", "firehose:PutRecordBatch"],
resources: [deliveryStream.attrArn],
}),
);
const subscriptionFilter = new logs.CfnSubscriptionFilter(this, "Default", {
logGroupName: logGroup.logGroupName,
destinationArn: deliveryStream.attrArn,
roleArn: role.roleArn,
filterPattern: "", // すべてのログを送信
});
/**
* サブスクリプションフィルター作成時に Firehose へテストメッセージを PUT して権限検証するため、
* インラインポリシーの作成完了を待つ。
*/
const defaultPolicy = role.node.findChild("DefaultPolicy").node
.defaultChild as cdk.CfnResource;
subscriptionFilter.node.addDependency(defaultPolicy);
}
}
10. AthenaWorkGroupConstruct
Athena のワークグループとクエリ結果バケットを作成します。前回の記事と構造は同じで、ワークグループ名のみ変えています。
import * as athena from "aws-cdk-lib/aws-athena";
import { Construct } from "constructs";
import { ResultBucketConstruct } from "./result-bucket";
/**
* Athena ワークグループとクエリ結果バケットを管理するコンストラクト。
*/
export class AthenaWorkGroupConstruct extends Construct {
constructor(scope: Construct, id: string) {
super(scope, id);
/** クエリ結果を保存する S3 バケット */
const { bucket } = new ResultBucketConstruct(this, "ResultBucket");
/** Athena ワークグループ */
new athena.CfnWorkGroup(this, "Default", {
name: "lambda-firehose-s3tables",
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${bucket.bucketName}/results/`,
},
},
recursiveDeleteOption: true, // スタック削除時にクエリ結果バケットも削除する(検証用スタックのため。本番では false を推奨)
});
}
}
11. 動作確認
デプロイ
npx cdk deploy
Lambda 関数を呼び出してログを発生させる
Lambda のイベントに count を渡すと、その数だけログを出力します(デフォルトは 3 件)。
aws lambda invoke \
--function-name <関数名> \
--payload '{"count": 3}' \
--cli-binary-format raw-in-base64-out \
/dev/null
バッファ間隔(60 秒)経過後、Athena でクエリするとデータが書き込まれていることを確認できます。log_events カラムには CWL のバッファリング単位で logEvents 配列が JSON 文字列として格納されます。
count の値を変えながら実行間隔を 90 秒以上空けると、1レコードに含まれるログ件数が異なる複数のレコードを作成できます。以下は count=3・count=1・count=2 の順に呼び出した結果です。
SELECT
json_array_length(json_parse(log_events)) AS log_count,
log_events
FROM "s3tablescatalog/<テーブルバケット名>"."lambda_logs"."<テーブル名>"
ORDER BY json_extract_scalar(json_parse(log_events), '$[0].timestamp');
log_count | log_events
3 | [{"timestamp":"2026-05-11T09:37:04.918+09:00","level":"INFO","message":"start"},{"timestamp":"2026-05-11T09:37:04.950+09:00","level":"INFO","message":"invoked"},{"timestamp":"2026-05-11T09:37:04.950+09:00","level":"INFO","message":"end"}]
1 | [{"timestamp":"2026-05-11T09:38:35.694+09:00","level":"INFO","message":"start"}]
2 | [{"timestamp":"2026-05-11T09:40:06.302+09:00","level":"INFO","message":"start"},{"timestamp":"2026-05-11T09:40:06.302+09:00","level":"INFO","message":"invoked"}]
log_events の中身を展開したい場合は CROSS JOIN UNNEST を使います。
SELECT
json_extract_scalar(log_event, '$.timestamp') AS timestamp,
json_extract_scalar(log_event, '$.level') AS level,
json_extract_scalar(log_event, '$.message') AS message
FROM "s3tablescatalog/<テーブルバケット名>"."lambda_logs"."<テーブル名>"
CROSS JOIN UNNEST(CAST(json_parse(log_events) AS array(json))) AS t(log_event)
ORDER BY timestamp;
timestamp | level | message
2026-05-11T09:37:04.918+09:00 | INFO | start
2026-05-11T09:37:04.950+09:00 | INFO | invoked
2026-05-11T09:37:04.950+09:00 | INFO | end
2026-05-11T09:38:35.694+09:00 | INFO | start
2026-05-11T09:40:06.302+09:00 | INFO | start
2026-05-11T09:40:06.302+09:00 | INFO | invoked
1レコードに複数のログが含まれていても CROSS JOIN UNNEST で1行1ログに展開できます。timestamp・level・message がすべて console.log で出力した値そのままで格納されていることが確認できます。
ハマったポイント
その1:S3 Tables のテーブルバケットは削除直後に同名で再作成できない
removalPolicy: DESTROY のテーブルバケットを削除すると、AWS 内部では非同期的に削除処理が進みます。同じ名前でデプロイし直そうとすると下記のエラーになります。
Resource handler returned message: "The bucket is in a transitional state because of
a previous deletion attempt. Try again later." HandlerErrorCode: AlreadyExists
削除が完了するまで待機するか、コンストラクト ID を変えて別名で再作成するようにしましょう。この挙動により検証時に結構時間を取られました。
その2:Iceberg destination の1レコードには JSON オブジェクトを1つしか含められない
Iceberg destination には「1レコードにつき JSON オブジェクトは1つだけ」という制約があります。
CWL サブスクリプションフィルターが Firehose に送る1ペイロードには複数の logEvents が含まれる場合があります。そのため、最初は各 logEvent を個別の JSON に変換して改行区切り(NDJSON)で結合する実装を試みましたが、制約違反でエラーになりました。
// NG: 複数の logEvent を1レコードに詰めるとエラーになる
return {
recordId: record.recordId,
result: "Ok" as const,
data: Buffer.from(lines.join("\n")).toString("base64"), // 複数 JSON → 制約違反
};
また、最初の logEvent だけを返す方法もありますが、この場合 2 件目以降の logEvent がロストします。
// NG: データロストが発生する
return {
recordId: record.recordId,
result: "Ok" as const,
data: Buffer.from(lines[0]).toString("base64"), // 2件目以降がロスト
};
解決策として、logEvents 配列全体を JSON 文字列として1カラムに格納する方法を採用しました。1レコード = 1 CWL ペイロード となるため制約に違反せず、全 logEvent を保持できます。クエリ時は Athena の json_parse / json_extract で logEvents を展開します。
// OK: logEvents 配列全体を1カラムに格納する
const output = JSON.stringify({
log_events: JSON.stringify(appLogs),
});
テーブルスキーマも合わせて1カラムのみ定義しています。
schemaFieldList: [
{ name: "log_events", type: "string" }, // logEvents 配列を JSON 文字列として格納
],
その3:Glue カタログの削除には Lake Formation の DROP 権限が必要
スタック削除時に Glue カタログの削除が AccessDeniedException になる場合は、Lake Formation の DROP 権限が不足しています。CDK の CloudFormation 実行ロールに権限を付与してください。
# CDK 実行ロール(cdk-hnb659fds-cfn-exec-role)に DROP を付与
aws lakeformation grant-permissions \
--principal '{"DataLakePrincipalIdentifier":"arn:aws:iam::<ACCOUNT_ID>:role/cdk-hnb659fds-cfn-exec-role-<ACCOUNT_ID>-ap-northeast-1"}' \
--permissions DROP \
--resource '{"Catalog":{"Id":"<ACCOUNT_ID>:s3tablescatalog"}}' \
--region ap-northeast-1
なお、この grant-permissions コマンド自体も Lake Formation 管理者権限が必要です。CLI で AccessDeniedException になる場合は AWS コンソールの Lake Formation → Data permissions から付与してください。
その4:federatedCatalog.identifier は bucket/* 固定で変更不可
s3tablescatalog という名前の Glue フェデレーテッドカタログは、federatedCatalog.identifier が arn:aws:s3tables:{region}:{account}:bucket/* でなければ作成できません。特定バケットの ARN を指定すると次のエラーになります。
s3tablescatalog is a reserved name for federated Amazon S3 Tables catalogs
with the federation identifier arn:aws:s3tables:...:bucket/*
and aws:s3tables connection name.
また、作成後に変更することもできません。Lake Formation の Alter on Catalog 権限を付与した上でデプロイしても、Glue API 自体が更新を拒否します。
Update on FederateCatalog is not allowed.
コード中の resourceName: "*" はワイルドカードですが、特定バケットに変更することはできない必須値です。
その5:Iceberg destination は Decompression / CloudWatchLogProcessing プロセッサ非対応
CWL から Firehose に流れてくるレコードは gzip 圧縮された JSON で、logEvents 配列の中にログ行が入れ子になっています。これを S3 destination で配信する場合は AWS マネージドな Decompression + CloudWatchLogProcessing プロセッサを組み合わせれば Lambda 変換不要で平坦化できます(参考)。
しかし、Iceberg destination ではこれらのプロセッサに非対応です。デプロイ時に下記のエラーになります。
Resource handler returned message: "DecompressionProcessor is currently only supported for destination type S3, Splunk and Snowflake"
そのため、CWL → Firehose → Iceberg ルートでは gzip 解凍と JSON 抽出を Lambda 変換で行う必要があります。
その6:Firehose の依存関係に DefaultPolicy を含める必要がある
Firehose は作成時にロールの glue:GetTable を呼んでテーブルの存在と権限を検証します。このとき CDK の自動依存関係は ロール本体 までしか辿らないため、ロールにアタッチされる DefaultPolicy(AWS::IAM::Policy)の作成と Firehose の作成が並行で走ってしまい、検証時にポリシーが未付与で Firehose 作成が失敗することがあります。
Resource handler returned message: "Role ... is not authorized to perform: glue:GetTable
for the given table or the table does not exist."
そのため、明示的に DefaultPolicy を依存関係に追加して、ポリシーアタッチ完了後に Firehose を作成させる必要があります。
const defaultPolicy = role.node
.findChild("DefaultPolicy")
.node.defaultChild as cdk.CfnResource;
this.deliveryStream.node.addDependency(defaultPolicy);
これは SubscriptionFilterConstruct 側の Firehose PutRecord 検証でも同様に必要です。
その7:Iceberg destination のバッファ間隔に 0 を指定してもデプロイ・配信ともに成功する
bufferingHints.intervalInSeconds に 0 を指定した場合、デプロイも配信も問題なく成功しました。内部的に最小値(60 秒)として扱われているものと思われます。本構成では意図を明示するために 60 を指定しています。
bufferingHints: {
intervalInSeconds: 60, // 複数レコードをまとめて Iceberg テーブルに書き込むためのバッファ間隔
sizeInMBs: 1,
},
その8:Firehose ロールの IAM 権限はドキュメントのサンプルより広く取る必要がある
AWS ドキュメントのサンプルでは glue:GetTable / glue:UpdateTable などを個別に指定する例が示されていますが、実際には以下の問題が発生しました。
- アクションを個別指定 → Firehose ストリーム作成時に内部で行われる
glue:GetTableの権限検証でAccessDenied glue:*+lakeformation:*のみ(s3tables:*省略)→ ストリーム作成は成功するが、データ配信が失敗するケースがあった- アクションを広げても、リソースをバケット・テーブル ARN に限定 →
AccessDenied
結果として、glue:* + lakeformation:* + s3tables:* を resources: ["*"] で付与する必要がありました。
なお、lakeformation:* は Lake Formation の API を呼び出すための IAM レベルの権限であり、Lake Formation コンソールや CLI で付与するデータ権限(テーブルへの SELECT 等)とは別物です。S3TablesCatalogConstruct で createTableDefaultPermissions: IAM_ALLOWED_PRINCIPALS:ALL を設定しているため、Lake Formation のデータ権限は付与不要で IAM 権限だけでアクセス制御できます。
課題と今後の改善案
今回の構成を試すにあたり、次のような知見とフィードバックを得ることができました。
課題
- Lambda ログと S3 Tables の相性は良くない — Lambda ログはそもそも非構造化ログであり、S3 Tables が得意とする構造化データとは性質が異なる。加えて CWL が複数の
logEventsをまとめて Firehose に送るため Iceberg destination の「1レコード1 JSON オブジェクト」制約と相性が悪く、Firehose の Iceberg destination が gzip 解凍プロセッサに非対応なため変換 Lambda も必須となり、コストと管理コストが増える - クエリに UNNEST が必要でやや煩雑 — 1レコードに JSON 文字列として
logEvents配列を詰めているため、個々のログ行にアクセスするにはCROSS JOIN UNNEST(CAST(json_parse(log_events) AS array(json)))が必要。SELECT *ではそのままクエリできない - Lake Formation による列レベルのアクセス制御ができない — 全データを
log_eventsの1カラムに詰めているため、特定のフィールドだけを隠すといった列レベルのセキュリティを適用できない
今後の改善案
- S3 Tables は統計分析・他データソースとの突き合わせ用途に絞る — そもそも、単に何が起きたかを調べたい場合は CloudWatch Logs Insights の方が手軽。S3 Tables は集計・分析・列レベルセキュリティが求められるユースケースに向いている
- Kinesis Data Streams への切り替えで1イベント1レコード化できる可能性がある — AWS ドキュメントによると、KPL でレコードを集約し Amazon Kinesis Data Streams 経由で Firehose に取り込む場合は自動的に集約解除されて1レコード1 JSON オブジェクトとして扱われる。ソースを CWL サブスクリプションフィルターから Kinesis Data Streams に切り替えることで、各ログイベントを個別レコードとして書き込める可能性がある
- CWL 経由ではなく Firehose へダイレクトプットする — CWL を経由せず、ログ発生源から直接 Firehose へプットする構成にすれば、送信時点で適切な JSON 形式に整形できる。CWL サブスクリプションフィルターによる gzip 圧縮がなくなるため変換 Lambda での解凍処理も不要になる。またバッファ間隔を 0 秒に設定することで複数データが1レコードにまとめられない可能性もある
おわりに
Lambda 関数のログを CloudWatch Logs サブスクリプションフィルター経由で Amazon Data Firehose に流し、S3 Tables(Iceberg)に書き込む構成を CDK で実装してみました。
S3 テーブル統合機能を使った構成と比べると、
- テーブルスキーマを自前で決められる(格納するフィールドや型を用途に合わせて定義できる)
- データ保持期間を CWL の保持ポリシーから切り離せる(Iceberg 側で別途管理)
- JST 表記でタイムスタンプを保持できる(クエリ時のタイムゾーン変換が不要)
といった自由度があります。一方で Lambda 変換の実装と運用が増える・クエリの煩雑さ・列レベルアクセス制御の制限 といったデメリットが引き続きあります。次回以降で改善案を試してみたいと思います。
参考
以上







