S3のイベントをトリガーにLambdaを実行して、ログデータの中身からJSONのみを抽出する(Python3.11)

ログデータの中身が途中からJSONになっていて困っている方向け
2023.10.13

こんにちは、洲崎です。
CloudWatch LogsのログデータをS3に転送してAthenaで分析しようとした時に、ログデータが途中からJSONになっている場面に遭遇しました。
ログデータをS3バケットに保存したタイミングでJSONのみを別のS3バケットに抽出してみたのでブログにします。

構成図

赤枠が今回の対象です。
S3バケットにログデータがプットされたタイミングでLambdaを起動し、別のS3バケットにJSONのみ抽出したファイルを格納します。

途中から始まるJSON

今回の対象は下記のような、途中からJSONが始まるログを対象とします。
{"hogehoge":"fugafuga"}の部分のみを抜き出します)

[rails_log] INFO 2023-10-05 00:00:00 +0900: {"hogehoge":"fugafuga"}

Lambda

サンプルコード

S3トリガーを設定しているS3バケットに入るデータが対象です。
Kinesis Data Firehoseは一定の間隔でまとめて複数のログを出力するので、各行のログをPythonで見に行く形にしています。
特定の文字列(今回は[rails_log])が含まれていたら、{"から始まり、最後までの文字列を変数(今回はjson_string)に格納します。(割と強引です)
最後にファイル形式を.jsonにして、output_bucket_nameで指定しているS3バケットに出力します。

import json
import boto3
import os

s3 = boto3.client('s3')

def lambda_handler(event, context):
    # バケット名とオブジェクトキーをイベントデータから取得
    input_bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    # 入力S3オブジェクトを取得
    s3_object = s3.get_object(Bucket=input_bucket_name, Key=object_key)
    file_content = s3_object['Body'].read().decode('utf-8')

    # 出力S3バケットにJSON形式で出力することを指定
    output_bucket_name = 'output_bucket_name'
    file_name, _ = os.path.splitext(object_key)
    output_object_key = f"{file_name}.json"

    # json_stringsを空のリストとして定義
    json_strings = []
    # テキストの各行から、[rails_log]があるか見る
    for line in file_content.split('\n'):
        if '[rails_log]' in line:
            # "[rails_log]"のあとのJSON文字列を抽出する
            loc = line.find('{"')
            if loc != -1:
                json_string = line[loc:]
                try:
                    # JSON文字列の整合性をチェック
                    json.loads(json_string)
                    json_strings.append(json_string)
                except json.JSONDecodeError:
                    print(f"Invalid JSON: {json_string}")

    # json_stringsがある場合、各JSONを改行して指定のS3バケットに格納する
    if json_strings:
        output_data = "\n".join(json_strings)
        print(output_data)
        s3.put_object(Body=output_data, Bucket=output_bucket_name, Key=output_object_key)

あとは、Lambdaの「設定」→「トリガー」でトリガー元となるS3バケットを指定し、Event typesはPUTを指定します。
Recursive invocation(入力と出力の両方で同じS3バケットを使っていないか)の確認にチェックを入れます。

これで設定完了です。

注意点

  • LambdaのIAMロールにS3バケットの読み取りや書き込むポリシーの付与が必要です。
  • このサンプルコードの内容にはエラー処理や例外処理を含めていないので、実際に利用する際はそこを考慮することを推奨します。
  • オブジェクトの量によってタイムアウトやメモリ量等、Lambdaの性能面を考慮する必要があります。

最後に

途中からJSONが始まるログに対して、JSONのみを抽出してみました。
Athena等の分析で利用したいけど困っている等ありましたら参考にしてみてください。

ではまた!コンサルティング部の洲崎でした。