AWS CDK CloudWatchLogsからS3へAthenaでクエリできる形でログデータを出力する
CX事業本部Delivery部のアベシです。
この記事ではCloudWatchLogsのサブスクリプションフィルターから、Firehoseを通してログをS3に流す構成について、CDKで構築する方法を紹介します。また、ログは流す途中でAthenaからクエリできる形に変換します。
CDKで主に開発している方には参考になるかと思います
また、この構成ではCloudWatchLogsにログが生成されると都度Kinesisに渡されますので、リアルタイム性が求められるログデータの収集や解析に活用できそうです
Lambda Processorを使ったログデータの変換について
実際のコードを紹介する前にAthenaでクエリできようにするためのデータ変換について説明します。
クエリできる形のログデータ形式にするために以下の変換を行います
- ログがgzip形式で圧縮されているので解凍する
- その後ログをJSON Linesの形式に変換
サブスクリプションフィルターからFirehoseへ渡るログはgzip形式で圧縮されているので解凍が必要です。
また、FirehoseからS3にはバッファ時間内にサブスクリプションフィルターから受けた複数のログデータがまとめて流されます。
それら複数のログデータはJSONが1列に連結したような形となるため、そのままではAthenaでクエリができません。
FirehoseからS3に渡る連結されたログデータは以下のような形式です
{"temperature": 21.84, "pressure": 1018.09, "humidity": 55.04, "date": 20230212170241}{"temperature": 21.85, "pressure": 1018.07, "humidity": 54.73, "date": 20230212170245}{"temperature": 21.84, "pressure": 1018.06, "humidity": 55.04, "date": 20230212170256}{"temperature": 21.84, "pressure": 1018.04, "humidity": 54.75, "date": 20230212170302}{"temperature": 21.84, "pressure": 1018.09, "humidity": 54.91, "date": 20230212170312}{"temperature": 21.85, "pressure": 1018.11, "humidity": 54.91, "date": 20230212170322}{"temperature": 21.84, "pressure": 1018.14, "humidity": 54.85, "date": 20230212170332}{"temperature": 21.82, "pressure": 1018.11, "humidity": 54.98, "date": 20230212170342}
それぞれのログデータの末尾に改行コードを追加してJSON Linesの形式にします。これでAthenaからクエリできます。
{"temperature": 21.84, "pressure": 1018.09, "humidity": 55.04, "date": 20230212170241} {"temperature": 21.85, "pressure": 1018.07, "humidity": 54.73, "date": 20230212170245} {"temperature": 21.84, "pressure": 1018.06, "humidity": 55.04, "date": 20230212170256} {"temperature": 21.84, "pressure": 1018.04, "humidity": 54.75, "date": 20230212170302} {"temperature": 21.84, "pressure": 1018.09, "humidity": 54.91, "date": 20230212170312} {"temperature": 21.85, "pressure": 1018.11, "humidity": 54.91, "date": 20230212170322} {"temperature": 21.84, "pressure": 1018.14, "humidity": 54.85, "date": 20230212170332} {"temperature": 21.82, "pressure": 1018.11, "humidity": 54.98, "date": 20230212170342}
これらの処理をFirehoseのLambda Processorを使って処理します。
構成
作成するアーキテクチャの構成イメージは以下です。
実行環境
項目名 | バージョン |
---|---|
mac OS | Ventura 13.2 |
npm | 9.4.2 |
AWS CDK | 2.64.0 |
CDKコード
構築に使用したAWS CDKのコードは以下です。
import { Stack, StackProps, aws_logs, RemovalPolicy, aws_s3, aws_kinesis, aws_logs_destinations, Duration, Size, aws_iam, aws_lambda_nodejs, } from 'aws-cdk-lib'; import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha'; import * as firehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'; import { Construct } from 'constructs'; import { Runtime } from 'aws-cdk-lib/aws-lambda'; interface TemperatureMonitorAppLogGroupStackProps extends StackProps { projectName: string; environmentDataBucket: aws_s3.Bucket; } export class TemperatureMonitorAppLogGroupStack extends Stack { public readonly tempPressHumDataLogGroup: aws_logs.LogGroup; public readonly errorLogGroup: aws_logs.LogGroup; constructor( scope: Construct, id: string, props: TemperatureMonitorAppLogGroupStackProps, ) { super(scope, id, props); //ログ変換用Lambda関数の作成 const lambdaFunction = new aws_lambda_nodejs.NodejsFunction( this, 'Processor', { runtime: Runtime.NODEJS_18_X, functionName: `${props.projectName}-log-data-transform-func`, entry: 'src/lambda/handlers/log-data-transform-func.ts', timeout: Duration.minutes(1), logRetention: 30, }, ); //ログ変換用Lambda関数をLambdaFunctionProcessorに割り当て const lambdaProcessor = new firehose_alpha.LambdaFunctionProcessor( lambdaFunction, { bufferInterval: Duration.seconds(60), bufferSize: Size.mebibytes(1), retries: 1, }, ); // KinesisStreamの作成 const sourceStream = new aws_kinesis.Stream(this, 'Source Stream'); // DeliveryStreamの作成 const DeliveryStream = new firehose_alpha.DeliveryStream( this, 'Delivery Stream', { sourceStream, destinations: [ new firehose_destination_alpha.S3Bucket(props.environmentDataBucket, { processor: lambdaProcessor, //LambdaFunctionProcessorを割り当て dataOutputPrefix: 'environment/data/', errorOutputPrefix: 'error/', bufferingInterval: Duration.seconds(60), // 最低値60秒、最大値900秒 bufferingSize: Size.mebibytes(3), }), ], }, ); //CloudWatchLogsのログ保存元LogGroup作成 const tempPressHumDataLogGroupName = `${props.projectName}-tempPressHumDataLogGroup`; const tempPressHumDataLogGroup = new aws_logs.LogGroup( this, tempPressHumDataLogGroupName, { logGroupName: tempPressHumDataLogGroupName, removalPolicy: RemovalPolicy.RETAIN, retention: 365, }, ); // CloudWatchLogsからKinesisStreamへPutRecordを許可するためのロールとポリシーの作成 const allowLogPutDeliveryStreamRole = new aws_iam.Role( this, 'CloudWatchLogsCanPutRecords', { assumedBy: new aws_iam.ServicePrincipal( `logs.ap-northeast-1.amazonaws.com`, // リージョンの記載が必要なため注意! ), }, ); allowLogPutDeliveryStreamRole.addToPolicy( new aws_iam.PolicyStatement({ actions: ['firehose:PutRecord'], effect: aws_iam.Effect.ALLOW, resources: [sourceStream.streamArn], }), ); //SubscriptionFilter(ログデータをCloudWatchLogsからFireHoseへストリームデータで出力) const subscriptionFilter = tempPressHumDataLogGroup.addSubscriptionFilter( 'subscriptionFilter', { destination: new aws_logs_destinations.KinesisDestination(sourceStream), filterPattern: aws_logs.FilterPattern.allEvents(), }, ); this.tempPressHumDataLogGroup = tempPressHumDataLogGroup; } }
CDKコード解説
ログデータ変換用Lambda関数に関する部分
NodejsFunctionクラスを使用してLambda関数を作成します。
const lambdaFunction = new aws_lambda_nodejs.NodejsFunction( this, 'Processor', { runtime: Runtime.NODEJS_18_X, functionName: `${props.projectName}-log-data-transform-func`, entry: 'src/lambda/handlers/log-data-transform-func.ts', timeout: Duration.minutes(1), logRetention: 30, }, );
作成したLambda関数をLambadaProcessor用の関数としてLambdaFunctionProcessorクラスの第一引数で割当てます。
bufferIntervalプロパティで指定した秒数の間に受信したデータをまとめて送る使用になっており60秒から900秒の間で設定できます。
const lambdaProcessor = new firehose_alpha.LambdaFunctionProcessor( lambdaFunction,// ← ここでLambda関数を割当 { bufferInterval: Duration.seconds(60),//min:60秒 ~ max:900秒 bufferSize: Size.mebibytes(1), retries: 1, }, );
Kinesisに関する部分
SubscriptionFilterからログデータを受けるKinesis Streamsを作成しています
const sourceStream = new aws_kinesis.Stream(this, 'Source Stream');
Kinesis Streamsで受けたログデータを読み取りS3へ流すため、FireHoseのDeliveryStreamを作成しています。
先程定義したLambadaProcessorをここで割り当てています。
const DeliveryStream = new firehose_alpha.DeliveryStream( this, 'Delivery Stream', { sourceStream,// ← ここでLambadaProcessorを割当 destinations: [ new firehose_destination_alpha.S3Bucket(props.environmentDataBucket, { processor: lambdaProcessor, //LambdaFunctionProcessorを割り当て dataOutputPrefix: 'environment/data/', errorOutputPrefix: 'error/', bufferingInterval: Duration.seconds(60), // 最低値60秒、最大値900秒 bufferingSize: Size.mebibytes(3), }), ], }, );
ポリシーに関して
CloudWatchLogsからKinesisStreamへログデータのPutRecord許可するためのロールとポリシーを作成しています。
該当するアクションはfirehose:PutRecord
です
- 注意点
プリンシパルの指定はlogs.の後にリージョンを明記してください。書かない場合デプロイ中にSubscriptionFilterからテストメッセージが送れずに以下の内容のエラーが発生します。エラー内容からは分かりづらいですがリージョンを記載すると発生しなくなります。//エラーメッセージ "Could not deliver test message to specified Kinesis stream. Check if the given kinesis stream is in ACTIVE state"
const allowLogPutDeliveryStreamRole = new aws_iam.Role( this, 'CloudWatchLogsCanPutRecords', { assumedBy: new aws_iam.ServicePrincipal( `logs.ap-northeast-1.amazonaws.com`, // logs.の後にリージョンの記載が必要 ), }, ); allowLogPutDeliveryStreamRole.addToPolicy( new aws_iam.PolicyStatement({ actions: ['firehose:PutRecord'], effect: aws_iam.Effect.ALLOW, resources: [sourceStream.streamArn], }), );
ログデータ変換用Lambda関数のコード
import { Buffer } from 'buffer'; import * as zlib from 'zlib'; import * as lambda from 'aws-lambda'; interface Event { records: []; } interface Record { data: string; recordId: string; } exports.handler = ( event: Event, context: lambda.Context, callback: lambda.Callback, ) => { console.log('get event', event); const output = event.records.map((record: Record) => { // FireHoseから渡されたevent内のdataをデコード const decodeData = Buffer.from(record.data, 'base64'); // デコードしたgzipデータを解凍(解凍されたデータはJSON) const logData = zlib.gunzipSync(decodeData).toString('utf-8'); const logDataObject = JSON.parse(logData); //logデータの末尾に改行コード(\n)を連結する const logDataObjectAddIndention = logDataObject.logEvents[0].message + '\n'; //改行コードを追加したlogデータをBase64エンコード const returnData = Buffer.from( logDataObjectAddIndention, 'utf8', ).toString('base64'); return { recordId: record.recordId, result: 'Ok', data: returnData, }; }); console.log('After processed logs are', JSON.stringify({ records: output })); callback(null, { records: output }); };
Lambdaコード解説
ログデータのデコード
FirehoseからLambda関数に渡されたeventの複数のdataにログが格納されてます。これらはBase64エンコードされてますのでまずデコードします。
この段階ではまだデータがgzipで圧縮されています
const decodeData = Buffer.from(record.data, 'base64');
デコードしたデータを解凍
前述したとおり、CloudWatchLogsからKinesisに渡された時点でgzip圧縮されてますのでデータを解凍します。
const logData = zlib.gunzipSync(decodeData).toString('utf-8');
解凍したデータは以下のような構成のオブジェクトになっていて、logEvents内のmessageの値が実際のCloudWatchLogsのログとなります
//実際の解凍したデータ { messageType: 'DATA_MESSAGE', owner: '************', logGroup: 'environment-data-monitor-app-tempPressHumDataLogGroup', logStream: 'environment_data_to_Cloud_Watch--311787949', subscriptionFilters: [ 'TemperatureMonitorAppLogGroupStack-environmentdatamonitorapptempPressHumDataLogGroupsubscriptionFilter2793BF49-5Sot9gYWy7tW' ], logEvents: [ { id: '37380785236138197061213798472809550571675194767369306112', timestamp: 1676212382293, message: '{"temperature": 21.51, "pressure": 1018.13, "humidity": 42.67, "date": 20230212233302}' } ] }
改行コード追加
ここでAthenaでも処理できるようにログに改行コードの\nを追加します.
const logDataObjectAddIndention = logDataObject.logEvents[0].message + '\n';
エンコード
最後に処理が完了したログデータをBase64エンコードし直します。
const returnData = Buffer.from(logDataObjectAddIndention,'utf8',).toString('base64');
変換したログデータをFirehoseに返す
処理が完了した後Firehoseに返すデータは下のリンク先の公式ドキュメントに説明がある通り、recordId、result、data、となります。
dataに関してはBase64エンコードした形で返す必要があります
return { recordId: record.recordId, result: 'Ok', data: returnData, };
動作確認
ログ形式の確認
S3に渡ってきたログが以下のような形式にとなっており、期待したJSON Linsの形式になっています。
Athenaからのクエリ確認
期待通りの形式なっているため以下の通りAthenaでのクエリが成功しました。
参考
今回紹介した方法の他にもFirehoseの動的パーティショニングを使用してログデータに改行コードを入れる方法を弊社ブログで紹介しております。
その他の参考記事
- CloudWatch LogsのログデータをKinesis Data Firehose経由でS3に出力する
- [備忘録] Kinesis FirehoseのLambdaによるデータ変換について
以上