S3上のファイルの文字列置換を部分的にメモリに読み出しながら実施する方法

2024.02.02

データアナリティクス事業本部のueharaです。

今回は、S3上のファイル内の文字列の置換を部分的にメモリに読み出しながら実施する方法を紹介してみたいと思います。

なぜ部分的に読み込みながら処理したいのか?

例えば、Glue (Python Shell)でのデータETL処理の過程でS3上にあるファイル内の文字列の置換を実施したいケースを考えます。

以下の記事でご紹介しているように、Glue (Python Shell)で自由に使える一時ディレクトリの容量は約14GB程度となります。

したがって、例えば対象ファイルが20GBであれば「Glueの一時ディレクトリにS3からファイルをダウンロードし、置換処理を実施した後、処理したファイルをS3にアップロードする」という処理が不可能になります。

ファイルが10GB(<14GB)と過程した場合でも、仮に利用しているGlueのDPUが1/16DPUであればメモリに乗るデータ量はせいぜい1GB以下なので(※Glueがシステム的に利用している分もあるので、実際に利用できる分はもっと少ない)、どの道部分的に置換していく処理は必要になります。

対応方針

結論から言うと、S3のマルチパートアップロード機能を利用して処理を行うことができます。

公式のドキュメントを確認すると、マルチパートアップロードについて以下のように説明がされています。

マルチパートアップロードを使用すると、単一のオブジェクトをパートのセットとしてアップロードすることができます。

要は、part1, part2, ...といったように複数のパートに分けてファイルのアップロードを行い、最後に1つのオブジェクトとしてまとめることができる機能になります。

これを利用すれば、例えばS3上にある20GBのfoo.csvに対し

  1. foo.csvの最初の50MBメモリに読み込んで置換処理を行い、part1としてアップロード
  2. foo.csvの次の50MBを読み込んで置換処理を行い、part2としてアップロード
  3. (20GB分処理するまで繰り返し)
  4. 最後に1つのファイルにまとめる

といったようにメモリに読み出す量を絞りながら、かつ一時ディレクトリを利用せずに置換処理を行いアップロードを実行することができます。

注意点

ただし、マルチパートアップロードは5MB未満のファイルには対応しておりません。

したがって、置換処理の対象が5MB未満のファイルであった場合、マルチパートアップロードを利用しないような設計にする必要があります。

Pythonのコード

上記で説明した対応方針に沿って処理を行うPythonのコードは以下の通りです。

test.py

import os

import boto3

s3_client = None


def _replace_and_upload(
    bucket_name, object_key, new_object_key, str_to_find, str_to_replace
):
    global s3_client

    # download file
    print("download file")
    data = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    data = data["Body"].read()

    # replace data
    data = data.replace(str_to_find.encode(), str_to_replace.encode())

    # upload file
    print("upload file")
    s3_client.put_object(Body=data, Bucket=bucket_name, Key=new_object_key)

    return


def _replace_and_upload_chunks(
    upload_id,
    buffer_size,
    file_size,
    bucket_name,
    object_key,
    new_object_key,
    str_to_find,
    str_to_replace,
):
    global s3_client

    part_info = {"Parts": []}
    part_number = 1
    offset = 0
    overlap_size = len(str_to_find)
    carry_over_data = b''

    while offset < file_size:
        print(f"part_number = {part_number}")

        # calculate the last byte of the current chunk with overlap
        if part_number == 1:
            download_end = buffer_size + overlap_size
        else:
            download_end = min(offset + buffer_size, file_size)

        # download data from S3 by specifying byte range
        range_header = f"bytes={offset}-{download_end - 1}"

        print(f"download part_number = {part_number}, {range_header}")
        data = s3_client.get_object(
            Bucket=bucket_name, Key=object_key, Range=range_header
        )
        chunk = data["Body"].read()

        if part_number > 1:
            # add carry_over_data from previous chunk
            chunk = carry_over_data + chunk
        
        carry_over_data = chunk[-overlap_size:]

        # replace data in chunk
        chunk = chunk.replace(str_to_find.encode(), str_to_replace.encode())

        if part_number == 1:
            chunk = chunk[:-overlap_size]
        else:
            chunk = chunk[overlap_size:]

        # write processed chunks to new file stream
        print(f"upload part_number = {part_number}")
        response = s3_client.upload_part(
            Body=chunk,
            Bucket=bucket_name,
            Key=new_object_key,
            PartNumber=part_number,
            UploadId=upload_id,
        )

        # record information of uploaded parts
        part_info["Parts"].append({"PartNumber": part_number, "ETag": response["ETag"]})

        offset += buffer_size
        part_number += 1

        del chunk

    return part_info


def replace_character_in_s3(s3_path, str_to_find, str_to_replace):
    global s3_client
    s3_client = boto3.client("s3")

    bucket_name, object_key = s3_path.replace("s3://", "").split("/", 1)
    splited_path = os.path.splitext(object_key)
    new_object_key = splited_path[0] + "_replaced" + splited_path[1]

    # set the buffer size handled in memory
    buffer_size = 50 * 1024 * 1024  # 50MB

    # get file size
    response = s3_client.head_object(Bucket=bucket_name, Key=object_key)
    file_size = response["ContentLength"]

    # single upload
    if file_size < buffer_size:
        _replace_and_upload(
            bucket_name, object_key, new_object_key, str_to_find, str_to_replace
        )
    # multipart upload
    else:
        try:
            multipart_upload = s3_client.create_multipart_upload(
                Bucket=bucket_name, Key=new_object_key
            )
            upload_id = multipart_upload["UploadId"]

            part_info = _replace_and_upload_chunks(
                upload_id,
                buffer_size,
                file_size,
                bucket_name,
                object_key,
                new_object_key,
                str_to_find,
                str_to_replace,
            )

            # notify completion when all parts are uploaded
            s3_client.complete_multipart_upload(
                Bucket=bucket_name,
                Key=new_object_key,
                UploadId=upload_id,
                MultipartUpload=part_info,
            )

        except Exception as e:
            # cancel multipart upload if error occurs
            s3_client.abort_multipart_upload(
                Bucket=bucket_name, Key=new_object_key, UploadId=upload_id
            )
            raise e

    return bucket_name, new_object_key


if __name__ == "__main__":
    replace_character_in_s3(
        "s3://cm-da-uehara/foo/test.csv",
        str_to_find=r"\"",
        str_to_replace=r'""',
    )

上記コードについて、メモリに読み出すbuffer_size50MBに設定しています。

ファイルが50MB未満であればマルチパートアップロードを行わず、一回で処理を行うようにしています。(これで、5MB未満のファイルであっても問題ありません)

50MB以上であれば、50MBずつファイルを読み込んで置換処理を行い、マルチパートアップロードにより部分的にアップロードを行うようにしています。

また、boto3のS3 Clientのget_object()関数は引数にRangeを取ることができ、GetObject操作でのバイト範囲を指定することができますので、部分的な読み出しについてはそちらを利用しています。

なお、chunk間でstr_to_findの文字列が跨がっている際に、それを見つけて置き換えるためoverlap_sizecarry_over_dataを利用しています。具体的には、単純に50MB読み込むだけではstr_to_findがチャンクの境目に位置している場合は正しく置き換えができないので、chunkを少し重ねて読み込み、次のchunkと組み合わせて処理できるようにしています。

ただし、上記は置換前と置換後の文字列の長さが同じことを前提としていることにご注意下さい。置換前後で文字列の長さが異なる場合は、別途それを考慮した処理にする必要があります。

肝心の置換の内容は\"という文字列を""にするという処理にしています。("に対するエスケープ文字を\ではなく"にする置換処理)

その他の細かい処理についてはコメントに記載の通りです。

動作テスト

S3バケットに、軽めのcsvファイルであるtest_small.csvと、比較的重めのファイルであるtest_medium.csvファイルを用意します。

まず、test_small.csvを対象としてプログラムを実行します。

$ python test.py
download file
upload file

S3を確認すると、正しく置換が実行されたtest_small_replaced.csvが作成されていることが確認できました。

次に、test_medium.csvを対象としてプログラムを実行します。

$ python test.py
part_number = 1
download part_number = 1, bytes=0-52428801
upload part_number = 1
part_number = 2
download part_number = 2, bytes=52428800-104857599
upload part_number = 2
part_number = 3
download part_number = 3, bytes=104857600-157286399
upload part_number = 3
part_number = 4
download part_number = 4, bytes=157286400-209715199
upload part_number = 4
part_number = 5
download part_number = 5, bytes=209715200-229528106
upload part_number = 5

部分的に処理が実行され、test_medium_replaced.csvが作成されていることを確認できました。

ファイルの内容についても、正しく置換処理が実施されていることを確認しました。

最後に

今回は、S3上のファイル内の文字列の置換を部分的にメモリに読み出しながら実施する方法をご紹介しました。

参考になりましたら幸いです。

参考文献