AWS CDK CloudWatchLogsからS3へAthenaでクエリできる形でログデータを出力する

AWS CDK CloudWatchLogsからS3へAthenaでクエリできる形でログデータを出力する

Clock Icon2023.02.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

CX事業本部Delivery部のアベシです。

この記事ではCloudWatchLogsのサブスクリプションフィルターから、Firehoseを通してログをS3に流す構成について、CDKで構築する方法を紹介します。また、ログは流す途中でAthenaからクエリできる形に変換します。 CDKで主に開発している方には参考になるかと思います
また、この構成ではCloudWatchLogsにログが生成されると都度Kinesisに渡されますので、リアルタイム性が求められるログデータの収集や解析に活用できそうです

Lambda Processorを使ったログデータの変換について

実際のコードを紹介する前にAthenaでクエリできようにするためのデータ変換について説明します。
クエリできる形のログデータ形式にするために以下の変換を行います

  • ログがgzip形式で圧縮されているので解凍する
  • その後ログをJSON Linesの形式に変換

サブスクリプションフィルターからFirehoseへ渡るログはgzip形式で圧縮されているので解凍が必要です。

また、FirehoseからS3にはバッファ時間内にサブスクリプションフィルターから受けた複数のログデータがまとめて流されます。
それら複数のログデータはJSONが1列に連結したような形となるため、そのままではAthenaでクエリができません。
FirehoseからS3に渡る連結されたログデータは以下のような形式です

{"temperature": 21.84, "pressure": 1018.09, "humidity": 55.04, "date": 20230212170241}{"temperature": 21.85, "pressure": 1018.07, "humidity": 54.73, "date": 20230212170245}{"temperature": 21.84, "pressure": 1018.06, "humidity": 55.04, "date": 20230212170256}{"temperature": 21.84, "pressure": 1018.04, "humidity": 54.75, "date": 20230212170302}{"temperature": 21.84, "pressure": 1018.09, "humidity": 54.91, "date": 20230212170312}{"temperature": 21.85, "pressure": 1018.11, "humidity": 54.91, "date": 20230212170322}{"temperature": 21.84, "pressure": 1018.14, "humidity": 54.85, "date": 20230212170332}{"temperature": 21.82, "pressure": 1018.11, "humidity": 54.98, "date": 20230212170342}

それぞれのログデータの末尾に改行コードを追加してJSON Linesの形式にします。これでAthenaからクエリできます。

{"temperature": 21.84, "pressure": 1018.09, "humidity": 55.04, "date": 20230212170241}
{"temperature": 21.85, "pressure": 1018.07, "humidity": 54.73, "date": 20230212170245}
{"temperature": 21.84, "pressure": 1018.06, "humidity": 55.04, "date": 20230212170256}
{"temperature": 21.84, "pressure": 1018.04, "humidity": 54.75, "date": 20230212170302}
{"temperature": 21.84, "pressure": 1018.09, "humidity": 54.91, "date": 20230212170312}
{"temperature": 21.85, "pressure": 1018.11, "humidity": 54.91, "date": 20230212170322}
{"temperature": 21.84, "pressure": 1018.14, "humidity": 54.85, "date": 20230212170332}
{"temperature": 21.82, "pressure": 1018.11, "humidity": 54.98, "date": 20230212170342}

これらの処理をFirehoseのLambda Processorを使って処理します。

構成

作成するアーキテクチャの構成イメージは以下です。

実行環境

項目名 バージョン
mac OS Ventura 13.2
npm 9.4.2
AWS CDK 2.64.0

CDKコード

構築に使用したAWS CDKのコードは以下です。

import {
  Stack,
  StackProps,
  aws_logs,
  RemovalPolicy,
  aws_s3,
  aws_kinesis,
  aws_logs_destinations,
  Duration,
  Size,
  aws_iam,
  aws_lambda_nodejs,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';
import { Construct } from 'constructs';
import { Runtime } from 'aws-cdk-lib/aws-lambda';

interface TemperatureMonitorAppLogGroupStackProps extends StackProps {
  projectName: string;
  environmentDataBucket: aws_s3.Bucket;
}

export class TemperatureMonitorAppLogGroupStack extends Stack {
  public readonly tempPressHumDataLogGroup: aws_logs.LogGroup;
  public readonly errorLogGroup: aws_logs.LogGroup;

  constructor(
    scope: Construct,
    id: string,
    props: TemperatureMonitorAppLogGroupStackProps,
  ) {
    super(scope, id, props);

    //ログ変換用Lambda関数の作成
    const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
      this,
      'Processor',
      {
        runtime: Runtime.NODEJS_18_X,
        functionName: `${props.projectName}-log-data-transform-func`,
        entry: 'src/lambda/handlers/log-data-transform-func.ts',
        timeout: Duration.minutes(1),
        logRetention: 30,
      },
    );

    //ログ変換用Lambda関数をLambdaFunctionProcessorに割り当て
    const lambdaProcessor = new firehose_alpha.LambdaFunctionProcessor(
      lambdaFunction,
      {
        bufferInterval: Duration.seconds(60),
        bufferSize: Size.mebibytes(1),
        retries: 1,
      },
    );

    // KinesisStreamの作成
    const sourceStream = new aws_kinesis.Stream(this, 'Source Stream');

    // DeliveryStreamの作成
    const DeliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'Delivery Stream',
      {
        sourceStream,
        destinations: [
          new firehose_destination_alpha.S3Bucket(props.environmentDataBucket, {
            processor: lambdaProcessor, //LambdaFunctionProcessorを割り当て
            dataOutputPrefix: 'environment/data/',
            errorOutputPrefix: 'error/',
            bufferingInterval: Duration.seconds(60), // 最低値60秒、最大値900秒
            bufferingSize: Size.mebibytes(3),
          }),
        ],
      },
    );

    //CloudWatchLogsのログ保存元LogGroup作成
    const tempPressHumDataLogGroupName = `${props.projectName}-tempPressHumDataLogGroup`;
    const tempPressHumDataLogGroup = new aws_logs.LogGroup(
      this,
      tempPressHumDataLogGroupName,
      {
        logGroupName: tempPressHumDataLogGroupName,
        removalPolicy: RemovalPolicy.RETAIN,
        retention: 365,
      },
    );

    // CloudWatchLogsからKinesisStreamへPutRecordを許可するためのロールとポリシーの作成
    const allowLogPutDeliveryStreamRole = new aws_iam.Role(
      this,
      'CloudWatchLogsCanPutRecords',
      {
        assumedBy: new aws_iam.ServicePrincipal(
          `logs.ap-northeast-1.amazonaws.com`, // リージョンの記載が必要なため注意!
        ),
      },
    );
    allowLogPutDeliveryStreamRole.addToPolicy(
      new aws_iam.PolicyStatement({
        actions: ['firehose:PutRecord'],
        effect: aws_iam.Effect.ALLOW,
        resources: [sourceStream.streamArn],
      }),
    );

    //SubscriptionFilter(ログデータをCloudWatchLogsからFireHoseへストリームデータで出力)
    const subscriptionFilter = tempPressHumDataLogGroup.addSubscriptionFilter(
      'subscriptionFilter',
      {
        destination: new aws_logs_destinations.KinesisDestination(sourceStream),
        filterPattern: aws_logs.FilterPattern.allEvents(),
      },
    );

    this.tempPressHumDataLogGroup = tempPressHumDataLogGroup;
  }
}

CDKコード解説

ログデータ変換用Lambda関数に関する部分

NodejsFunctionクラスを使用してLambda関数を作成します。

    const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
      this,
      'Processor',
      {
        runtime: Runtime.NODEJS_18_X,
        functionName: `${props.projectName}-log-data-transform-func`,
        entry: 'src/lambda/handlers/log-data-transform-func.ts',
        timeout: Duration.minutes(1),
        logRetention: 30,
      },
    );

作成したLambda関数をLambadaProcessor用の関数としてLambdaFunctionProcessorクラスの第一引数で割当てます。
bufferIntervalプロパティで指定した秒数の間に受信したデータをまとめて送る使用になっており60秒から900秒の間で設定できます。

    const lambdaProcessor = new firehose_alpha.LambdaFunctionProcessor(
      lambdaFunction,// ← ここでLambda関数を割当
      {
        bufferInterval: Duration.seconds(60),//min:60秒 ~ max:900秒
        bufferSize: Size.mebibytes(1),
        retries: 1,
      },
    );

Kinesisに関する部分

SubscriptionFilterからログデータを受けるKinesis Streamsを作成しています

    const sourceStream = new aws_kinesis.Stream(this, 'Source Stream');

Kinesis Streamsで受けたログデータを読み取りS3へ流すため、FireHoseのDeliveryStreamを作成しています。
先程定義したLambadaProcessorをここで割り当てています。

    const DeliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'Delivery Stream',
      {
        sourceStream,// ← ここでLambadaProcessorを割当
        destinations: [
          new firehose_destination_alpha.S3Bucket(props.environmentDataBucket, {
            processor: lambdaProcessor, //LambdaFunctionProcessorを割り当て
            dataOutputPrefix: 'environment/data/',
            errorOutputPrefix: 'error/',
            bufferingInterval: Duration.seconds(60), // 最低値60秒、最大値900秒
            bufferingSize: Size.mebibytes(3),
          }),
        ],
      },
    );

ポリシーに関して

CloudWatchLogsからKinesisStreamへログデータのPutRecord許可するためのロールとポリシーを作成しています。

該当するアクションはfirehose:PutRecord です

  • 注意点
    プリンシパルの指定はlogs.の後にリージョンを明記してください。書かない場合デプロイ中にSubscriptionFilterからテストメッセージが送れずに以下の内容のエラーが発生します。エラー内容からは分かりづらいですがリージョンを記載すると発生しなくなります。

    //エラーメッセージ
    "Could not deliver test message to specified Kinesis stream. Check if the given kinesis stream is in ACTIVE state"
    
        const allowLogPutDeliveryStreamRole = new aws_iam.Role(
          this,
          'CloudWatchLogsCanPutRecords',
          {
            assumedBy: new aws_iam.ServicePrincipal(
              `logs.ap-northeast-1.amazonaws.com`, // logs.の後にリージョンの記載が必要
            ),
          },
        );
        allowLogPutDeliveryStreamRole.addToPolicy(
          new aws_iam.PolicyStatement({
            actions: ['firehose:PutRecord'],
            effect: aws_iam.Effect.ALLOW,
            resources: [sourceStream.streamArn],
          }),
        );
    

ログデータ変換用Lambda関数のコード

import { Buffer } from 'buffer';
import * as zlib from 'zlib';
import * as lambda from 'aws-lambda';

interface Event {
  records: [];
}
interface Record {
  data: string;
  recordId: string;
}
exports.handler = (
  event: Event,
  context: lambda.Context,
  callback: lambda.Callback,
) => {
  console.log('get event', event);
  const output = event.records.map((record: Record) => {
    // FireHoseから渡されたevent内のdataをデコード
    const decodeData = Buffer.from(record.data, 'base64');
    // デコードしたgzipデータを解凍(解凍されたデータはJSON)
    const logData = zlib.gunzipSync(decodeData).toString('utf-8');
    const logDataObject = JSON.parse(logData);

    //logデータの末尾に改行コード(\n)を連結する
    const logDataObjectAddIndention =
      logDataObject.logEvents[0].message + '\n';

    //改行コードを追加したlogデータをBase64エンコード
    const returnData = Buffer.from(
      logDataObjectAddIndention,
      'utf8',
    ).toString('base64');

    return {
      recordId: record.recordId,
      result: 'Ok',
      data: returnData,
    };
  });
  console.log('After processed logs are', JSON.stringify({ records: output }));
  callback(null, { records: output });
};

Lambdaコード解説

ログデータのデコード

FirehoseからLambda関数に渡されたeventの複数のdataにログが格納されてます。これらはBase64エンコードされてますのでまずデコードします。
この段階ではまだデータがgzipで圧縮されています

const decodeData = Buffer.from(record.data, 'base64');

デコードしたデータを解凍

前述したとおり、CloudWatchLogsからKinesisに渡された時点でgzip圧縮されてますのでデータを解凍します。

const logData = zlib.gunzipSync(decodeData).toString('utf-8');

解凍したデータは以下のような構成のオブジェクトになっていて、logEvents内のmessageの値が実際のCloudWatchLogsのログとなります

//実際の解凍したデータ
{
messageType: 'DATA_MESSAGE',
  owner: '************',
  logGroup: 'environment-data-monitor-app-tempPressHumDataLogGroup',
  logStream: 'environment_data_to_Cloud_Watch--311787949',
  subscriptionFilters: [
    'TemperatureMonitorAppLogGroupStack-environmentdatamonitorapptempPressHumDataLogGroupsubscriptionFilter2793BF49-5Sot9gYWy7tW'
  ],
  logEvents: [
    {
      id: '37380785236138197061213798472809550571675194767369306112',
      timestamp: 1676212382293,
      message: '{"temperature": 21.51, "pressure": 1018.13, "humidity": 42.67, "date": 20230212233302}'
    }
  ]
}

改行コード追加

ここでAthenaでも処理できるようにログに改行コードの\nを追加します.

const logDataObjectAddIndention = logDataObject.logEvents[0].message + '\n';

エンコード

最後に処理が完了したログデータをBase64エンコードし直します。

const returnData = Buffer.from(logDataObjectAddIndention,'utf8',).toString('base64');

変換したログデータをFirehoseに返す

処理が完了した後Firehoseに返すデータは下のリンク先の公式ドキュメントに説明がある通り、recordId、result、data、となります。

dataに関してはBase64エンコードした形で返す必要があります

return {
  recordId: record.recordId,
  result: 'Ok',
  data: returnData,
};

動作確認

ログ形式の確認

S3に渡ってきたログが以下のような形式にとなっており、期待したJSON Linsの形式になっています。

Athenaからのクエリ確認

期待通りの形式なっているため以下の通りAthenaでのクエリが成功しました。

参考

今回紹介した方法の他にもFirehoseの動的パーティショニングを使用してログデータに改行コードを入れる方法を弊社ブログで紹介しております。

その他の参考記事
- CloudWatch LogsのログデータをKinesis Data Firehose経由でS3に出力する
- [備忘録] Kinesis FirehoseのLambdaによるデータ変換について

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.