CX事業本部Delivery部のアベシです。
この記事ではCloudWatchLogsのサブスクリプションフィルターから、Firehoseを通してログをS3に流す構成について、CDKで構築する方法を紹介します。また、ログは流す途中でAthenaからクエリできる形に変換します。
CDKで主に開発している方には参考になるかと思います
また、この構成ではCloudWatchLogsにログが生成されると都度Kinesisに渡されますので、リアルタイム性が求められるログデータの収集や解析に活用できそうです
Lambda Processorを使ったログデータの変換について
実際のコードを紹介する前にAthenaでクエリできようにするためのデータ変換について説明します。
クエリできる形のログデータ形式にするために以下の変換を行います
- ログがgzip形式で圧縮されているので解凍する
- その後ログをJSON Linesの形式に変換
サブスクリプションフィルターからFirehoseへ渡るログはgzip形式で圧縮されているので解凍が必要です。
また、FirehoseからS3にはバッファ時間内にサブスクリプションフィルターから受けた複数のログデータがまとめて流されます。
それら複数のログデータはJSONが1列に連結したような形となるため、そのままではAthenaでクエリができません。
FirehoseからS3に渡る連結されたログデータは以下のような形式です
{"temperature": 21.84, "pressure": 1018.09, "humidity": 55.04, "date": 20230212170241}{"temperature": 21.85, "pressure": 1018.07, "humidity": 54.73, "date": 20230212170245}{"temperature": 21.84, "pressure": 1018.06, "humidity": 55.04, "date": 20230212170256}{"temperature": 21.84, "pressure": 1018.04, "humidity": 54.75, "date": 20230212170302}{"temperature": 21.84, "pressure": 1018.09, "humidity": 54.91, "date": 20230212170312}{"temperature": 21.85, "pressure": 1018.11, "humidity": 54.91, "date": 20230212170322}{"temperature": 21.84, "pressure": 1018.14, "humidity": 54.85, "date": 20230212170332}{"temperature": 21.82, "pressure": 1018.11, "humidity": 54.98, "date": 20230212170342}
それぞれのログデータの末尾に改行コードを追加してJSON Linesの形式にします。これでAthenaからクエリできます。
{"temperature": 21.84, "pressure": 1018.09, "humidity": 55.04, "date": 20230212170241}
{"temperature": 21.85, "pressure": 1018.07, "humidity": 54.73, "date": 20230212170245}
{"temperature": 21.84, "pressure": 1018.06, "humidity": 55.04, "date": 20230212170256}
{"temperature": 21.84, "pressure": 1018.04, "humidity": 54.75, "date": 20230212170302}
{"temperature": 21.84, "pressure": 1018.09, "humidity": 54.91, "date": 20230212170312}
{"temperature": 21.85, "pressure": 1018.11, "humidity": 54.91, "date": 20230212170322}
{"temperature": 21.84, "pressure": 1018.14, "humidity": 54.85, "date": 20230212170332}
{"temperature": 21.82, "pressure": 1018.11, "humidity": 54.98, "date": 20230212170342}
これらの処理をFirehoseのLambda Processorを使って処理します。
構成
作成するアーキテクチャの構成イメージは以下です。
実行環境
項目名 | バージョン |
---|---|
mac OS | Ventura 13.2 |
npm | 9.4.2 |
AWS CDK | 2.64.0 |
CDKコード
構築に使用したAWS CDKのコードは以下です。
lib/temperature-monitor-app-log-group-stack.ts
import {
Stack,
StackProps,
aws_logs,
RemovalPolicy,
aws_s3,
aws_kinesis,
aws_logs_destinations,
Duration,
Size,
aws_iam,
aws_lambda_nodejs,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';
import { Construct } from 'constructs';
import { Runtime } from 'aws-cdk-lib/aws-lambda';
interface TemperatureMonitorAppLogGroupStackProps extends StackProps {
projectName: string;
environmentDataBucket: aws_s3.Bucket;
}
export class TemperatureMonitorAppLogGroupStack extends Stack {
public readonly tempPressHumDataLogGroup: aws_logs.LogGroup;
public readonly errorLogGroup: aws_logs.LogGroup;
constructor(
scope: Construct,
id: string,
props: TemperatureMonitorAppLogGroupStackProps,
) {
super(scope, id, props);
//ログ変換用Lambda関数の作成
const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
this,
'Processor',
{
runtime: Runtime.NODEJS_18_X,
functionName: `${props.projectName}-log-data-transform-func`,
entry: 'src/lambda/handlers/log-data-transform-func.ts',
timeout: Duration.minutes(1),
logRetention: 30,
},
);
//ログ変換用Lambda関数をLambdaFunctionProcessorに割り当て
const lambdaProcessor = new firehose_alpha.LambdaFunctionProcessor(
lambdaFunction,
{
bufferInterval: Duration.seconds(60),
bufferSize: Size.mebibytes(1),
retries: 1,
},
);
// KinesisStreamの作成
const sourceStream = new aws_kinesis.Stream(this, 'Source Stream');
// DeliveryStreamの作成
const DeliveryStream = new firehose_alpha.DeliveryStream(
this,
'Delivery Stream',
{
sourceStream,
destinations: [
new firehose_destination_alpha.S3Bucket(props.environmentDataBucket, {
processor: lambdaProcessor, //LambdaFunctionProcessorを割り当て
dataOutputPrefix: 'environment/data/',
errorOutputPrefix: 'error/',
bufferingInterval: Duration.seconds(60), // 最低値60秒、最大値900秒
bufferingSize: Size.mebibytes(3),
}),
],
},
);
//CloudWatchLogsのログ保存元LogGroup作成
const tempPressHumDataLogGroupName = `${props.projectName}-tempPressHumDataLogGroup`;
const tempPressHumDataLogGroup = new aws_logs.LogGroup(
this,
tempPressHumDataLogGroupName,
{
logGroupName: tempPressHumDataLogGroupName,
removalPolicy: RemovalPolicy.RETAIN,
retention: 365,
},
);
// CloudWatchLogsからKinesisStreamへPutRecordを許可するためのロールとポリシーの作成
const allowLogPutDeliveryStreamRole = new aws_iam.Role(
this,
'CloudWatchLogsCanPutRecords',
{
assumedBy: new aws_iam.ServicePrincipal(
`logs.ap-northeast-1.amazonaws.com`, // リージョンの記載が必要なため注意!
),
},
);
allowLogPutDeliveryStreamRole.addToPolicy(
new aws_iam.PolicyStatement({
actions: ['firehose:PutRecord'],
effect: aws_iam.Effect.ALLOW,
resources: [sourceStream.streamArn],
}),
);
//SubscriptionFilter(ログデータをCloudWatchLogsからFireHoseへストリームデータで出力)
const subscriptionFilter = tempPressHumDataLogGroup.addSubscriptionFilter(
'subscriptionFilter',
{
destination: new aws_logs_destinations.KinesisDestination(sourceStream),
filterPattern: aws_logs.FilterPattern.allEvents(),
},
);
this.tempPressHumDataLogGroup = tempPressHumDataLogGroup;
}
}
CDKコード解説
ログデータ変換用Lambda関数に関する部分
NodejsFunctionクラスを使用してLambda関数を作成します。
const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
this,
'Processor',
{
runtime: Runtime.NODEJS_18_X,
functionName: `${props.projectName}-log-data-transform-func`,
entry: 'src/lambda/handlers/log-data-transform-func.ts',
timeout: Duration.minutes(1),
logRetention: 30,
},
);
作成したLambda関数をLambadaProcessor用の関数としてLambdaFunctionProcessorクラスの第一引数で割当てます。
bufferIntervalプロパティで指定した秒数の間に受信したデータをまとめて送る使用になっており60秒から900秒の間で設定できます。
const lambdaProcessor = new firehose_alpha.LambdaFunctionProcessor(
lambdaFunction,// ← ここでLambda関数を割当
{
bufferInterval: Duration.seconds(60),//min:60秒 ~ max:900秒
bufferSize: Size.mebibytes(1),
retries: 1,
},
);
Kinesisに関する部分
SubscriptionFilterからログデータを受けるKinesis Streamsを作成しています
const sourceStream = new aws_kinesis.Stream(this, 'Source Stream');
Kinesis Streamsで受けたログデータを読み取りS3へ流すため、FireHoseのDeliveryStreamを作成しています。
先程定義したLambadaProcessorをここで割り当てています。
const DeliveryStream = new firehose_alpha.DeliveryStream(
this,
'Delivery Stream',
{
sourceStream,// ← ここでLambadaProcessorを割当
destinations: [
new firehose_destination_alpha.S3Bucket(props.environmentDataBucket, {
processor: lambdaProcessor, //LambdaFunctionProcessorを割り当て
dataOutputPrefix: 'environment/data/',
errorOutputPrefix: 'error/',
bufferingInterval: Duration.seconds(60), // 最低値60秒、最大値900秒
bufferingSize: Size.mebibytes(3),
}),
],
},
);
ポリシーに関して
CloudWatchLogsからKinesisStreamへログデータのPutRecord許可するためのロールとポリシーを作成しています。
該当するアクションはfirehose:PutRecord
です
- 注意点
プリンシパルの指定はlogs.の後にリージョンを明記してください。書かない場合デプロイ中にSubscriptionFilterからテストメッセージが送れずに以下の内容のエラーが発生します。エラー内容からは分かりづらいですがリージョンを記載すると発生しなくなります。//エラーメッセージ "Could not deliver test message to specified Kinesis stream. Check if the given kinesis stream is in ACTIVE state"
const allowLogPutDeliveryStreamRole = new aws_iam.Role( this, 'CloudWatchLogsCanPutRecords', { assumedBy: new aws_iam.ServicePrincipal( `logs.ap-northeast-1.amazonaws.com`, // logs.の後にリージョンの記載が必要 ), }, ); allowLogPutDeliveryStreamRole.addToPolicy( new aws_iam.PolicyStatement({ actions: ['firehose:PutRecord'], effect: aws_iam.Effect.ALLOW, resources: [sourceStream.streamArn], }), );
ログデータ変換用Lambda関数のコード
src/lambda/handlers/log-data-transform-func.ts
import { Buffer } from 'buffer';
import * as zlib from 'zlib';
import * as lambda from 'aws-lambda';
interface Event {
records: [];
}
interface Record {
data: string;
recordId: string;
}
exports.handler = (
event: Event,
context: lambda.Context,
callback: lambda.Callback,
) => {
console.log('get event', event);
const output = event.records.map((record: Record) => {
// FireHoseから渡されたevent内のdataをデコード
const decodeData = Buffer.from(record.data, 'base64');
// デコードしたgzipデータを解凍(解凍されたデータはJSON)
const logData = zlib.gunzipSync(decodeData).toString('utf-8');
const logDataObject = JSON.parse(logData);
//logデータの末尾に改行コード(\n)を連結する
const logDataObjectAddIndention =
logDataObject.logEvents[0].message + '\n';
//改行コードを追加したlogデータをBase64エンコード
const returnData = Buffer.from(
logDataObjectAddIndention,
'utf8',
).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: returnData,
};
});
console.log('After processed logs are', JSON.stringify({ records: output }));
callback(null, { records: output });
};
Lambdaコード解説
ログデータのデコード
FirehoseからLambda関数に渡されたeventの複数のdataにログが格納されてます。これらはBase64エンコードされてますのでまずデコードします。
この段階ではまだデータがgzipで圧縮されています
const decodeData = Buffer.from(record.data, 'base64');
デコードしたデータを解凍
前述したとおり、CloudWatchLogsからKinesisに渡された時点でgzip圧縮されてますのでデータを解凍します。
const logData = zlib.gunzipSync(decodeData).toString('utf-8');
解凍したデータは以下のような構成のオブジェクトになっていて、logEvents内のmessageの値が実際のCloudWatchLogsのログとなります
//実際の解凍したデータ
{
messageType: 'DATA_MESSAGE',
owner: '************',
logGroup: 'environment-data-monitor-app-tempPressHumDataLogGroup',
logStream: 'environment_data_to_Cloud_Watch--311787949',
subscriptionFilters: [
'TemperatureMonitorAppLogGroupStack-environmentdatamonitorapptempPressHumDataLogGroupsubscriptionFilter2793BF49-5Sot9gYWy7tW'
],
logEvents: [
{
id: '37380785236138197061213798472809550571675194767369306112',
timestamp: 1676212382293,
message: '{"temperature": 21.51, "pressure": 1018.13, "humidity": 42.67, "date": 20230212233302}'
}
]
}
改行コード追加
ここでAthenaでも処理できるようにログに改行コードの\nを追加します.
const logDataObjectAddIndention = logDataObject.logEvents[0].message + '\n';
エンコード
最後に処理が完了したログデータをBase64エンコードし直します。
const returnData = Buffer.from(logDataObjectAddIndention,'utf8',).toString('base64');
変換したログデータをFirehoseに返す
処理が完了した後Firehoseに返すデータは下のリンク先の公式ドキュメントに説明がある通り、recordId、result、data、となります。
dataに関してはBase64エンコードした形で返す必要があります
return {
recordId: record.recordId,
result: 'Ok',
data: returnData,
};
動作確認
ログ形式の確認
S3に渡ってきたログが以下のような形式にとなっており、期待したJSON Linsの形式になっています。
Athenaからのクエリ確認
期待通りの形式なっているため以下の通りAthenaでのクエリが成功しました。
参考
今回紹介した方法の他にもFirehoseの動的パーティショニングを使用してログデータに改行コードを入れる方法を弊社ブログで紹介しております。
その他の参考記事
- CloudWatch LogsのログデータをKinesis Data Firehose経由でS3に出力する
- [備忘録] Kinesis FirehoseのLambdaによるデータ変換について
以上