S3 Concatを使ってS3上にある大量の小さいオブジェクトを圧縮してファイルサイズを最適化しよう

2023.08.31

データアナリティクス事業本部インテグレーション部コンサルティングチーム・新納(にいの)です。

日々Amazon Athenaを使われている方にとって、Amazon S3へのスロットリングの軽減はつきものですよね。Please reduce your request rate.エラーに枕を濡らす方も多いのではないでしょうか。私です。

多数の小さいオブジェクトをAthenaでテーブル化すべく、今回はS3上のオブジェクトをコンパクションしてくれるPythonパッケージのS3-concatを使ってみました。

S3のオブジェクトのファイル数とサイズを最適化してうれしいこと

「すでにS3にオブジェクトがあるんだから、Athenaで普通にCREATE TABLEすれば普通にテーブルが作れるやん」と思われる方もいらっしゃるかもしれませんが、オブジェクトを圧縮すると以下のようなメリットがあります。

S3へのスロットリングを軽減

Amazon S3には1秒あたり5500件のリクエスト制限があり、S3上のファイルをクエリするAthenaにもこの制限があります。多数の小さいオブジェクトに対してクエリを発行すると、Please reduce your request rate.エラーが発生してしまうケースがあります。

Amazon S3 スロットリングの防止 - Amazon Athena

Athenaのクエリパフォーマンス向上

Athenaでクエリする場合、対象となるオブジェクトのファイルサイズは100MB以上1GB未満であることが推奨されています。AWS公式ドキュメントにも以下の通りの記載があります。

ただしファイルサイズが非常に小さい場合、特に 128MB 未満の場合には、実行エンジンは S3ファイルのオープン、ディレクトリのリスト表示、オブジェクトメタデータの取得、データ転送のセットアップ、ファイルヘッダーの読み込み、圧縮ディレクトリの読み込み、といった処理に余分な時間がかかります。

Amazon Athena のパフォーマンスチューニング Tips トップ 10 | Amazon Web Services ブログ

リクエスト料金を軽減

Athena自身はクエリのデータスキャン量で課金され非常に安価に使えるサービスではありますが、S3上のオブジェクトを参照するため、S3へのリクエスト料金が別途発生します。

といっても、PUT・COPY・POST・LIST リクエストであれば1,000 リクエストあたり0.0047USDであり、ほとんどのケースでは数ドル以下に収まるくらいの料金です。ですが、大量にあるファイルに対して何度も何度もクエリしてしまうと、思いもよらぬ課金が発生してしまう恐れがあります。

S3 Concatとは

S3バケット内の大量にある小さいオブジェクトを連結し、より少ない大きなファイルに加工してくれるPythonパッケージです。マルチパートアップロード機能を利用しています。

基本的な使い方やサンプルコードは以下をご参照ください。

実際のコードは後述しますが、ポイントとなるのは以下です。

  • 最小ファイルサイズが指定でき、サイズを超えた分は分割されてファイル出力されます。
    • 例:file-1.jsonfile-2.json...
  • 実行スレッド数が指定可能
    • 連結対象のファイルが5MB未満の場合、s3 multipart_upload APIの制限があるため、一度ファイルをローカルにダウンロードし、連結してから再度アップロードされます。スレッドカウントを設定すると、連結処理をより高速に行うために並行してファイルをダウンロードします。

実際に使ってみた

上述したURLからサンプルコードを動かせば簡単に動作は確認可能です。せっかくなので本エントリでは実際の利用シーンをイメージして実行してみました。

想定シーン

毎日、日付でパーティションされたファイルが細かいサイズでたくさん連携されます。これを適切なファイルサイズになるよう結合し、結合後は元ファイルを削除します。

コードを実行

今回は以下のようなコードを実行してみました。

from s3_concat import S3Concat
import boto3
from datetime import datetime as dt
from datetime import timedelta

# 日付条件の設定
strdt = dt.strptime("2023-08-15", '%Y-%m-%d')  # 開始日
enddt = dt.strptime("2023-08-19", '%Y-%m-%d')  # 終了日

# 日数を算出
days_num = (enddt - strdt).days + 1  

# 日付をリスト化
dt_list = []
for i in range(days_num):
    dt_list.append(strdt + timedelta(days=i))


BUCKET = '<S3バケット名>'
PROFILE_NAME = '<AWS認証情報のプロファイル名>' # Optional

my_session = boto3.Session()
if PROFILE_NAME in boto3.Session().available_profiles:
    my_session = boto3.Session(profile_name=PROFILE_NAME)

s3 = my_session.resource('s3')
s3_client = my_session.client('s3')

# ファイルをコンパクションする
def concat_objects(bucket,folder_date):
    concatenated_file = f'{folder_date}-temp/{folder_date}-concat.txt'
    origin_key = folder_date + '/'
    to_key = folder_date + '/' + folder_date + '-concat.txt'
    job = S3Concat(bucket, concatenated_file, min_file_size,
                content_type='text/plain',
                session=my_session,  # For custom aws session
                min_file_size = '128MB'  # ex: FILE_TO_SAVE_TO-1.json, FILE_TO_SAVE_TO-2.json, ...
                )

    job.add_files(origin_key)
    job.concat(small_parts_threads=4, main_threads=2)
    concatenated_file_1 = folder_date + '-temp/' + folder_date + '-concat-1.txt'

    # コンパクション後、元ファイルをフォルダごと削除   
    bucket_object = s3.Bucket(bucket)
    bucket_object.objects.filter(Prefix=origin_key).delete()

    try:
        s3_client.copy_object(Bucket=bucket, Key=to_key, CopySource={'Bucket': bucket, 'Key': concatenated_file_1})
        print(f'moved s3://{bucket}/{folder_date}/{concatenated_file_1} to s3://{bucket}/{folder_date}')

        # 一時フォルダを削除
        s3_client.delete_object(Bucket=bucket, Key=concatenated_file_1)
        print(f'Deleted s3://{bucket}/{concatenated_file_1}')

    except Exception as e:
        print(e)
            
def main():
    print('boto3 version is {0}'.format(boto3.__version__))

    for folder_date in dt_list:
        try:
           folder_date = folder_date.strftime('%Y-%m-%d')
           concat_objects(BUCKET, folder_date)
        except KeyError:
           print('no ' + folder_date + ' data')

if __name__ == "__main__":
    main()

全てのファイルを合わせても128MBを超えないことから末尾が'-concat-1.txt'のファイルのみを処理対象としています。(乱暴な作りですが…)

実行するとファイルがまとまって出力されました。

実行後のファイルの中身を確認したところ、一番最後の行に空行が入っていましたので、この結果を使ってテーブルを作成するなど、後続の処理では留意した方がよさそうです。

定期的にコンパクション処理を実行するのであれば、LambdaやEventBridgeと組み合わせれば実現可能です。LambdaでS3 Concatを使うにあたっては、S3 ConcatをインストールしたLambdaレイヤーの追加が必要です。以下のコマンドで事前にレイヤーを作成しておきます。この時、Lambda関数のPythonバージョンと合わせておきましょう。

$ mkdir python
$ pip install -t python s3-concat
$ zip -r9 layer.zip python

最後に

S3 Concatを使って、S3上のファイルを指定したサイズでまとめてみました。一番最後の行に空行が入るところは注意が必要ですが、今までは自前でこのコンパクション処理を実装したりGlueを使って出力しなおしたりといった作業をしていたのが、かなり楽に処理できるようになったと感じます。この記事が特にS3のスロットリングでお悩みの方に役立てば幸いです。