S3バケットにあるJSONで1行に繋がった形式のレコードをLambdaのテストイベントでJSON Lines形式に変換して出力する(Python3.9)

S3バケットにある全オブジェクトを対象として、JSONで1行に繋がった形式のレコードをLambda(Python3.9)のテストイベントで、JSON Lines形式に変換してみる
2023.03.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、洲崎です。
S3バケットにあるJSONで1行に繋がった形式のレコードをLambda(Python3.9)のテストイベントで、JSON Lines形式に変換して出力してみたので共有します。

S3バケットにある全オブジェクトを一斉にパースさせたい

前回の記事の続きになりますが、Kinesis Data Firehoseをデフォルトの設定のまま利用すると、複数のJSONが1行にまとまってAthenaやQuickSightで処理できない問題がありました。
Kinesis Data Firehoseの動的パーティショニングの設定を使うことで、Kinesis Data Firehoseからパースできるのですが、すでに既存データがS3に溜まってしまっている場合、別途パース処理が必要です。

今回はLambda(Python3.9)を利用して、S3バケットにある全オブジェクトをパースしてみました。

Lambda

サンプルコード

S3バケットの指定の部分を対象のバケット名に変えるだけで動作するようになっています。
別のS3バケットに出力する場合はsourceとdestをそれぞれ指定します。
※気になる点等あればフィードバックいただけると幸いです。

既存のS3オブジェクトを上書きで保存する場合

import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    #S3バケット名を指定
    bucket_name = 'sample-bucket'

    # S3バケット内の全オブジェクトのキーをリストアップ
    bucket = s3.Bucket(bucket_name)

    # 各オブジェクトを読み込み、改行を追加してJSONL形式に変換して上書きする
    for obj in bucket.objects.all():
        # 元のオブジェクト内容を取得
        response = obj.get()
        contents = response['Body'].read().decode('utf-8')

        # JSON文字列を改行区切りのJSONL形式に変換
        new_data = contents.replace('}{', '}\n{')

        # 最後の改行コードを削除
        if new_data.endswith('\n'):
            new_data = new_data[:-1]

        # 変換したJSONL形式を上書きして保存
        obj.put(Body=new_data)

別のS3バケットに出力する場合

import boto3

s3 = boto3.resource('s3')

def lambda_handler(event, context):
    #S3バケット名を指定
    source_bucket_name = 'source_bucket'
    dest_bucket_name = 'dest_bucket'

    # S3バケット内の全オブジェクトのキーをリストアップ
    source_bucket = s3.Bucket(source_bucket_name)

    # 各オブジェクトを読み込み、改行を追加してJSONL形式に変換し、別のバケットへ保存する
    for object in source_bucket.objects.all():
        # 元のオブジェクト内容を取得
        obj = s3.Object(source_bucket_name, object.key)
        response = obj.get()
        contents = response['Body'].read().decode('utf-8')
        
        # JSON文字列を改行区切りのJSONL形式に変換
        new_data = contents.replace('}{', '}\n{')

        # 最後の改行コードを削除
        if new_data.endswith('\n'):
            new_data = new_data[:-1]

        # 変換したJSONL形式を別のバケットに保存
        dest_obj = s3.Object(dest_bucket_name, object.key)
        dest_obj.put(Body=new_data)

注意点

現時点でパッと思いつく注意点を上げます。

  • Lambdaのテストイベントで実行することを想定している為、Firehoseに設定したり、S3のプットイベント等の場合はコードの修正が必要になります。
  • S3のGetやPut料金が別途発生します。
  • オブジェクトの量によってタイムアウトやメモリ量等、Lambdaの性能面を考慮する必要があります。
    • あまりにオブジェクトの量が多い場合は分割してLambda処理を行うことや、Lambda以外の方法を考える必要があります。

最後に

今回はS3バケットにある全オブジェクトを対象として、JSONで1行に繋がった形式のレコードをLambda(Python3.9)のテストイベントで、JSON Lines形式に変換して出力してみました。
もし同様の事象で困っている方がいましたら参考にしてみてください。

ではまた!コンサルティング部の洲崎でした。