7 日よりも経過時間が短い未完了のマルチパートアップロードを洗い出す
こんにちは!クラウド事業本部コンサルティング部のたかくに(@takakuni_)です。
みなさん、Amazon S3 のマルチパートアップロード使っていますでしょうか。
100 MB を超えるファイルの場合、マルチパートアップロードを利用することで、スループットの向上が見込めます。
AWS CLI では自動でマルチパートアップロードへ切り替える
実は AWS CLI では、デフォルトで 8 MB を超えるオブジェクトをやり取りする場合は、自動でマルチパートアップロードへ切り替えてアップロードを行ってくれます。そのため、aws s3 sync
や aws cp
など普段利用している S3 コマンドでも、マルチパートアップロードが利用される可能性があります。
multipart_chunksize¶
Default - 8MB
Minimum For Uploads - 5MB
Once the S3 commands have decided to use multipart operations, the file is divided into chunks. This configuration option specifies what the chunk size (also referred to as the part size) should be. This value can specified using the same semantics as multipart_threshold, that is either as the number of bytes as an integer, or using a size suffix. If the specified chunk size does not fit within the established limits for S3 multipart uploads, the chunk size will be automatically adjusted to a valid value.
マルチパートアップロードが完了すれば、スループット向上してハッピーなのですが、途中で中断してしまったなどの不完全なマルチパートアップロードは、コスト増に繋がるためライフルサイクルルールを利用して定期的に削除いただくのがオススメです。
今回は S3 バケットの一覧を取得し、各バケットで一定期間経過したマルチパートアップロードを洗い出してみようと思います。
本エントリを利用する前に AWS では Storage Lens という無料で利用可能な素晴らしい機能があります。Storage Lens を利用することで、7 日を超えるマルチパートアップロードは算出できますので、7 日間を超えるマルチパートアップロードを見たいケースでは、こちらを利用してください。
今回は 7 日よりも短い期間で確認したいケースとします。
Python で書いてみた
今回は Lambda で実装してみました。全文はこちらになります。
import os
import json
import boto3
import datetime
import logging
from botocore.exceptions import BotoCoreError, ClientError
logger = logging.getLogger()
logger.setLevel("INFO")
STALE_THRESHOLD_DAYS = int(os.environ.get('STALE_THRESHOLD_DAYS', 1))
def check_stale_multipart_uploads(bucket_name, s3_client):
"""
指定されたバケット内でマルチパートアップロードの存在をチェックし、現在の時刻の日付からの差分を計算する。
"""
try:
logger.debug(f"Processing bucket: {bucket_name}")
response = s3_client.list_multipart_uploads(Bucket=bucket_name)
uploads = response.get('Uploads', [])
stale_uploads = []
for upload in uploads:
upload_date = upload['Initiated']
if isinstance(upload_date, datetime.datetime):
elapsed_days = (datetime.datetime.now(datetime.timezone.utc) - upload_date).days
if elapsed_days > STALE_THRESHOLD_DAYS:
stale_uploads.append({
'Key': upload['Key'],
'Initiated': upload_date.isoformat(),
'InitiatorOwner': upload['Initiator']['DisplayName'],
'ElapsedDays': elapsed_days
})
# バケットに未完了のアップロードが存在する場合のみ結果を返す
if stale_uploads:
return {
'bucket_name': bucket_name,
'stale_uploads': stale_uploads
}
return None
except (BotoCoreError, ClientError) as e:
logger.error(f"AWS API error for bucket {bucket_name}: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error for bucket {bucket_name}: {str(e)}")
return None
def lambda_handler(event, context):
stale_uploads_results = []
try:
client = boto3.client('s3')
buckets = client.list_buckets().get('Buckets', [])
logger.info(f"Processing {len(buckets)} buckets")
for bucket in buckets:
bucket_name = bucket['Name']
logger.info(f"Processing bucket: {bucket_name}")
# マルチパートアップロードのチェック
logger.debug('Check stale uploads')
stale_uploads_result = check_stale_multipart_uploads(bucket_name, client)
if stale_uploads_result: # None でない場合のみ追加
logger.info(f"Bucket {bucket_name} has stale uploads")
stale_uploads_results.append(stale_uploads_result)
return {
'stale_uploads': stale_uploads_results
}
except (BotoCoreError, ClientError) as e:
logger.error(f"AWS API error: {str(e)}")
return {'message': 'AWS API error occurred'}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {'message': 'Internal server error'}
マルチパートアップロードの洗い出し
list_multipart_uploads は S3 バケットに対して作成されたマルチパートアップロードが存在しているかをチェックします。
チェックしたマルチパートアップロードに対して、現在時刻との差分を求め日付を計算してみました。
計算された日付が STALE_THRESHOLD_DAYS を越す場合に不完全なマルチパートアップロードとみなし、洗い出しを行っています。
def check_stale_multipart_uploads(bucket_name, s3_client):
"""
指定されたバケット内でマルチパートアップロードの存在をチェックし、現在の時刻の日付からの差分を計算する。
"""
try:
logger.debug(f"Processing bucket: {bucket_name}")
+ response = s3_client.list_multipart_uploads(Bucket=bucket_name)
uploads = response.get('Uploads', [])
stale_uploads = []
+ for upload in uploads:
+ upload_date = upload['Initiated']
+ if isinstance(upload_date, datetime.datetime):
+ elapsed_days = (datetime.datetime.now(datetime.timezone.utc) - upload_date).days
+ if elapsed_days > STALE_THRESHOLD_DAYS:
+ stale_uploads.append({
+ 'Key': upload['Key'],
+ 'Initiated': upload_date.isoformat(),
+ 'InitiatorOwner': upload['Initiator']['DisplayName'],
+ 'ElapsedDays': elapsed_days
+ })
# バケットに未完了のアップロードが存在する場合のみ結果を返す
if stale_uploads:
return {
'bucket_name': bucket_name,
'stale_uploads': stale_uploads
}
return None
except (BotoCoreError, ClientError) as e:
logger.error(f"AWS API error for bucket {bucket_name}: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error for bucket {bucket_name}: {str(e)}")
return None
実行結果
実行結果は以下になります。Storage Lens では洗い出せなかった、4 日経過したマルチパートアップロードが出てきましたね。
{
"stale_uploads": [
{
"bucket_name": "custom-model-import-123456789012",
"stale_uploads": [
{
"Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/4f/57/4f57a2538539d61f9f22785d3c48f089227829948ee66572808dd13d5b08d63e",
"Initiated": "2025-01-30T10:36:35+00:00",
"InitiatorOwner": "hoge-s3-user",
"ElapsedDays": 3
},
{
"Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/57/2e/572ed974eb237aacf3de9ddfeaa373c62d39038a96ffd7f2beec6ea8f8a7b16e",
"Initiated": "2025-01-30T10:36:35+00:00",
"InitiatorOwner": "hoge-s3-user",
"ElapsedDays": 3
},
{
"Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/60/5b/605b29e86ec80aaa3a38bb3aab4bc859e4df26928fe49bbca00f127f55897e8d",
"Initiated": "2025-01-30T10:36:46+00:00",
"InitiatorOwner": "hoge-s3-user",
"ElapsedDays": 3
},
{
"Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/62/a8/62a83ba7af95d532065e46322ae2c17ee020c3c8e92b7285420c09ca0d6f68f0",
"Initiated": "2025-01-30T10:36:48+00:00",
"InitiatorOwner": "hoge-s3-user",
"ElapsedDays": 3
},
{
"Key": "DeepSeek-R1-Distill-Llama-70B/.git/lfs/objects/81/89/8189753d14d5f9b3c31466d1f5a185bb7ffc7fc346cbc06a5453e48e07c2b97b",
"Initiated": "2025-01-30T10:37:18+00:00",
"InitiatorOwner": "hoge-s3-user",
"ElapsedDays": 3
},
{
"Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/.git/lfs/objects/09/ae/09ae5c566b1ec8d3ff6fb609b22ea89f1613111ef7bed68ddb6aa36929398b97",
"Initiated": "2025-01-29T09:55:33+00:00",
"InitiatorOwner": "takakuni/botocore-session-1738144525",
"ElapsedDays": 5
},
{
"Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00011-of-00014.safetensors",
"Initiated": "2025-01-29T12:27:17+00:00",
"InitiatorOwner": "takakuni/botocore-session-1738151165",
"ElapsedDays": 4
},
{
"Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00012-of-00014.safetensors",
"Initiated": "2025-01-29T12:27:18+00:00",
"InitiatorOwner": "takakuni/botocore-session-1738151165",
"ElapsedDays": 4
},
{
"Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00013-of-00014.safetensors",
"Initiated": "2025-01-29T12:27:27+00:00",
"InitiatorOwner": "takakuni/botocore-session-1738151165",
"ElapsedDays": 4
},
{
"Key": "DeepSeek-R1-Distill-Qwen-32B-Japanese/model-00014-of-00014.safetensors",
"Initiated": "2025-01-29T12:27:39+00:00",
"InitiatorOwner": "takakuni/botocore-session-1738151165",
"ElapsedDays": 4
}
]
}
]
}
まとめ
以上、「7 日よりも経過時間が短い未完了のマルチパートアップロードを洗い出す」でした。
ほとんどのユースケースでは Storage Lens + ライフサイクルルールの設定をいただくのが良いかと思いますが、誰かに刺されば嬉しいです。
クラウド事業本部コンサルティング部のたかくに(@takakuni_)でした!