7 日よりも経過時間が短い未完了のマルチパートアップロードを洗い出す

7 日よりも経過時間が短い未完了のマルチパートアップロードを洗い出す

Clock Icon2025.02.03

こんにちは!クラウド事業本部コンサルティング部のたかくに(@takakuni_)です。

みなさん、Amazon S3 のマルチパートアップロード使っていますでしょうか。

100 MB を超えるファイルの場合、マルチパートアップロードを利用することで、スループットの向上が見込めます。

https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/mpuoverview.html

AWS CLI では自動でマルチパートアップロードへ切り替える

実は AWS CLI では、デフォルトで 8 MB を超えるオブジェクトをやり取りする場合は、自動でマルチパートアップロードへ切り替えてアップロードを行ってくれます。そのため、aws s3 syncaws cp など普段利用している S3 コマンドでも、マルチパートアップロードが利用される可能性があります。

multipart_chunksize¶

Default - 8MB

Minimum For Uploads - 5MB

Once the S3 commands have decided to use multipart operations, the file is divided into chunks. This configuration option specifies what the chunk size (also referred to as the part size) should be. This value can specified using the same semantics as multipart_threshold, that is either as the number of bytes as an integer, or using a size suffix. If the specified chunk size does not fit within the established limits for S3 multipart uploads, the chunk size will be automatically adjusted to a valid value.

https://awscli.amazonaws.com/v2/documentation/api/latest/topic/s3-config.html#multipart-chunksize

マルチパートアップロードが完了すれば、スループット向上してハッピーなのですが、途中で中断してしまったなどの不完全なマルチパートアップロードは、コスト増に繋がるためライフルサイクルルールを利用して定期的に削除いただくのがオススメです。

https://dev.classmethod.jp/articles/recommend-incomplete-mpu-lifecycle-used-aws-cli/

今回は S3 バケットの一覧を取得し、各バケットで一定期間経過したマルチパートアップロードを洗い出してみようと思います。

本エントリを利用する前に AWS では Storage Lens という無料で利用可能な素晴らしい機能があります。Storage Lens を利用することで、7 日を超えるマルチパートアップロードは算出できますので、7 日間を超えるマルチパートアップロードを見たいケースでは、こちらを利用してください。

2025-02-03 at 19.05.11-default-account-dashboard - S3 Storage Lens ダッシュボード  S3  us-east-1.png

今回は 7 日よりも短い期間で確認したいケースとします。

Python で書いてみた

今回は Lambda で実装してみました。全文はこちらになります。

lambda_function.oy
import os
import json
import boto3
import datetime
import logging
from botocore.exceptions import BotoCoreError, ClientError

logger = logging.getLogger()
logger.setLevel("INFO")

STALE_THRESHOLD_DAYS = int(os.environ.get('STALE_THRESHOLD_DAYS', 1))

def check_stale_multipart_uploads(bucket_name, s3_client):
    """
    指定されたバケット内でマルチパートアップロードの存在をチェックし、現在の時刻の日付からの差分を計算する。
    """
    try:
        logger.debug(f"Processing bucket: {bucket_name}")
        response = s3_client.list_multipart_uploads(Bucket=bucket_name)
        uploads = response.get('Uploads', [])
        stale_uploads = []

        for upload in uploads:
            upload_date = upload['Initiated']
            if isinstance(upload_date, datetime.datetime):
                elapsed_days = (datetime.datetime.now(datetime.timezone.utc) - upload_date).days
                if elapsed_days > STALE_THRESHOLD_DAYS:
                    stale_uploads.append({
                        'Key': upload['Key'],
                        'Initiated': upload_date.isoformat(),
                        'InitiatorOwner': upload['Initiator']['DisplayName'],
                        'ElapsedDays': elapsed_days
                    })

        # バケットに未完了のアップロードが存在する場合のみ結果を返す
        if stale_uploads:
            return {
                'bucket_name': bucket_name,
                'stale_uploads': stale_uploads
            }
        return None

    except (BotoCoreError, ClientError) as e:
        logger.error(f"AWS API error for bucket {bucket_name}: {str(e)}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error for bucket {bucket_name}: {str(e)}")
        return None

def lambda_handler(event, context):
    stale_uploads_results = []
    try:
        client = boto3.client('s3')
        buckets = client.list_buckets().get('Buckets', [])
        logger.info(f"Processing {len(buckets)} buckets")

        for bucket in buckets:
            bucket_name = bucket['Name']

            logger.info(f"Processing bucket: {bucket_name}")
            # マルチパートアップロードのチェック
            logger.debug('Check stale uploads')
            stale_uploads_result = check_stale_multipart_uploads(bucket_name, client)
            if stale_uploads_result:  # None でない場合のみ追加
                logger.info(f"Bucket {bucket_name} has stale uploads")
                stale_uploads_results.append(stale_uploads_result)

        return {
            'stale_uploads': stale_uploads_results
        }
    except (BotoCoreError, ClientError) as e:
        logger.error(f"AWS API error: {str(e)}")
        return {'message': 'AWS API error occurred'}
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return {'message': 'Internal server error'}

マルチパートアップロードの洗い出し

list_multipart_uploads は S3 バケットに対して作成されたマルチパートアップロードが存在しているかをチェックします。

チェックしたマルチパートアップロードに対して、現在時刻との差分を求め日付を計算してみました。

計算された日付が STALE_THRESHOLD_DAYS を越す場合に不完全なマルチパートアップロードとみなし、洗い出しを行っています。

def check_stale_multipart_uploads(bucket_name, s3_client):
    """
    指定されたバケット内でマルチパートアップロードの存在をチェックし、現在の時刻の日付からの差分を計算する。
    """
    try:
        logger.debug(f"Processing bucket: {bucket_name}")
+        response = s3_client.list_multipart_uploads(Bucket=bucket_name)
        uploads = response.get('Uploads', [])
        stale_uploads = []

+        for upload in uploads:
+            upload_date = upload['Initiated']
+            if isinstance(upload_date, datetime.datetime):
+                elapsed_days = (datetime.datetime.now(datetime.timezone.utc) - upload_date).days
+                if elapsed_days > STALE_THRESHOLD_DAYS:
+                    stale_uploads.append({
+                        'Key': upload['Key'],
+                        'Initiated': upload_date.isoformat(),
+                        'InitiatorOwner': upload['Initiator']['DisplayName'],
+                        'ElapsedDays': elapsed_days
+                    })

        # バケットに未完了のアップロードが存在する場合のみ結果を返す
        if stale_uploads:
            return {
                'bucket_name': bucket_name,
                'stale_uploads': stale_uploads
            }
        return None

    except (BotoCoreError, ClientError) as e:
        logger.error(f"AWS API error for bucket {bucket_name}: {str(e)}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error for bucket {bucket_name}: {str(e)}")
        return None

実行結果

実行結果は以下になります。Storage Lens では洗い出せなかった、4 日経過したマルチパートアップロードが出てきましたね。

{
  "stale_uploads": [
    {
      "bucket_name": "custom-model-import-123456789012",
      "stale_uploads": [
        {
          "Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/4f/57/4f57a2538539d61f9f22785d3c48f089227829948ee66572808dd13d5b08d63e",
          "Initiated": "2025-01-30T10:36:35+00:00",
          "InitiatorOwner": "hoge-s3-user",
          "ElapsedDays": 3
        },
        {
          "Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/57/2e/572ed974eb237aacf3de9ddfeaa373c62d39038a96ffd7f2beec6ea8f8a7b16e",
          "Initiated": "2025-01-30T10:36:35+00:00",
          "InitiatorOwner": "hoge-s3-user",
          "ElapsedDays": 3
        },
        {
          "Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/60/5b/605b29e86ec80aaa3a38bb3aab4bc859e4df26928fe49bbca00f127f55897e8d",
          "Initiated": "2025-01-30T10:36:46+00:00",
          "InitiatorOwner": "hoge-s3-user",
          "ElapsedDays": 3
        },
        {
          "Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/62/a8/62a83ba7af95d532065e46322ae2c17ee020c3c8e92b7285420c09ca0d6f68f0",
          "Initiated": "2025-01-30T10:36:48+00:00",
          "InitiatorOwner": "hoge-s3-user",
          "ElapsedDays": 3
        },
        {
          "Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/81/89/8189753d14d5f9b3c31466d1f5a185bb7ffc7fc346cbc06a5453e48e07c2b97b",
          "Initiated": "2025-01-30T10:37:18+00:00",
          "InitiatorOwner": "hoge-s3-user",
          "ElapsedDays": 3
        },
        {
          "Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/.git/lfs/objects/09/ae/09ae5c566b1ec8d3ff6fb609b22ea89f1613111ef7bed68ddb6aa36929398b97",
          "Initiated": "2025-01-29T09:55:33+00:00",
          "InitiatorOwner": "takakuni/botocore-session-1738144525",
          "ElapsedDays": 5
        },
        {
          "Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00011-of-00014.safetensors",
          "Initiated": "2025-01-29T12:27:17+00:00",
          "InitiatorOwner": "takakuni/botocore-session-1738151165",
          "ElapsedDays": 4
        },
        {
          "Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00012-of-00014.safetensors",
          "Initiated": "2025-01-29T12:27:18+00:00",
          "InitiatorOwner": "takakuni/botocore-session-1738151165",
          "ElapsedDays": 4
        },
        {
          "Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00013-of-00014.safetensors",
          "Initiated": "2025-01-29T12:27:27+00:00",
          "InitiatorOwner": "takakuni/botocore-session-1738151165",
          "ElapsedDays": 4
        },
        {
          "Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00014-of-00014.safetensors",
          "Initiated": "2025-01-29T12:27:39+00:00",
          "InitiatorOwner": "takakuni/botocore-session-1738151165",
          "ElapsedDays": 4
        }
      ]
    }
  ]
}

まとめ

以上、「7 日よりも経過時間が短い未完了のマルチパートアップロードを洗い出す」でした。

ほとんどのユースケースでは Storage Lens + ライフサイクルルールの設定をいただくのが良いかと思いますが、誰かに刺されば嬉しいです。

クラウド事業本部コンサルティング部のたかくに(@takakuni_)でした!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.