Cloud Storage上の大量のオブジェクトをバッチリクエストで削除してみた
はじめに
Cloud Storageを使ったデータパイプラインやバッチ処理では、大量のオブジェクトを削除する必要がある場面がよくあります。例えば、定期的に生成される一時ファイルや連携済みの古いデータを削除する場合です。
しかし、Cloud Storage上のオブジェクトを削除する際、単純に1つずつ削除していく方法では時間がかかりすぎることがあります。特に、数千〜数万のオブジェクトを削除する場合、効率的な方法を考える必要があります。
そこで今回は、Cloud StorageのBatch API
を活用して、大量のオブジェクトを高速で削除する方法について考え、実際に試してみた結果を共有します。
Batch API の概要
Cloud StoragのBatch API
は、複数のリクエスト(オブジェクトの削除、メタデータの更新などを1つのHTTPリクエストにまとめて送信するための仕組みです。これにより、ネットワークのオーバーヘッドを削減し、大量のオブジェクトを効率的に操作できます。
以下に、Batch API
の概要を示します。
Batch APIのメリット
- 効率的なリクエスト
- 複数の操作を1つのHTTPリクエストにまとめることで、オーバーヘッドが削減
- リクエスト数の削減
- 1つのバッチリクエストに最大 100件 の操作を含めることが可能
- 一括処理
- 複数の操作を一括で処理するため、大量のオブジェクトを操作する場合に高速
サポートされる操作
Batch API は、以下のようなリクエストをサポートしています。
-
削除
blob.delete()
を使用して、複数のオブジェクトを一括で削除可能
-
メタデータの更新
blob.patch()
を使用して、複数のオブジェクトのメタデータを一括で更新可能
-
ACL の変更
blob.acl.save()
を使用して、複数のオブジェクトのアクセス制御リスト (ACL) を一括で変更可能
サポートされない操作
Batch API は、以下のような操作をサポートしていません。
-
アップロード(コピー)
blob.upload_from_file()
やblob.upload_from_string()
のようなアップロード操作は使用不可
-
ダウンロード
blob.download_as_string()
やblob.download_to_filename()
のようなダウンロード操作は使用不可
-
レスポンスを返す操作
- レスポンスのペイロードを必要とする操作(例:
blob.exists()
)は使用不可
- レスポンスのペイロードを必要とする操作(例:
Batch APIを用いた実装イメージ
1. バッチコンテキストの作成
client.batch()
を使用してバッチコンテキストを作成- このコンテキスト内で実行された操作は、すべて1つのバッチリクエストにまとめられます
2. バッチ内での操作
- バッチコンテキスト内で実行された操作(例:
blob.delete()
)は、すぐにはサーバーに送信されません - 代わりに、バッチリクエストに追加されます
3. バッチリクエストの送信
- バッチコンテキストを抜けると、バッチリクエストがサーバーに送信されます
- サーバーはバッチリクエストを受け取り、各操作を個別に処理します
具体的な実装イメージとしては以下となります。
# Batch APIを用いた実装イメージ
with client.batch():
for blob in chunk:
blob.delete()
このAPIを用いてバッチ削除を実装してみました。
やってみる
削除方法として バッチ削除 (Batch) と 順次削除 (Sequential) の2つの方法を持たせたスクリプトを実装しました。
このスクリプトを実行して、Batch APIを使用したバッチ削除と、1件ずつ削除する順次削除を同じファイル件数に対してそれぞれ実行してみて処理時間の比較を行います。
スクリプト全文
from google.cloud import storage
from flask import jsonify, Request
import functions_framework
import time # 時間計測のために追加
@functions_framework.http
def delete_objects_in_gcs(request: Request):
try:
# リクエストからパラメータを取得
request_json = request.get_json()
if not request_json:
return jsonify({"error": "Invalid request, JSON body is required."}), 400
bucket_name = request_json.get("bucket_name") # バケット名をリクエストから取得
if not bucket_name:
return jsonify({"error": "Missing 'bucket_name' in request."}), 400
method = request_json.get("method", "batch") # デフォルトは "batch"
chunk_size = 100 # 固定で100件ずつ削除
# クライアントを初期化
client = storage.Client()
bucket = client.bucket(bucket_name)
# バケット内のすべてのオブジェクトを取得
blobs = list(bucket.list_blobs())
if not blobs:
return jsonify({"message": f"No objects found in bucket '{bucket_name}'."}), 200
# オブジェクトをチャンクに分割
chunks = [blobs[i:i + chunk_size] for i in range(0, len(blobs), chunk_size)]
# 削除処理の開始時刻を記録
start_time = time.time()
# 削除方法に応じて処理を分岐
if method == "batch":
# バッチ削除
for i, chunk in enumerate(chunks):
with client.batch():
for blob in chunk:
blob.delete()
print(f"Batch {i + 1}/{len(chunks)}: Deleted {len(chunk)} objects.")
elif method == "sequential":
# 順次削除
for i, blob in enumerate(blobs):
blob.delete()
if (i + 1) % chunk_size == 0 or (i + 1) == len(blobs):
print(f"Sequential: Deleted {i + 1}/{len(blobs)} objects.")
else:
return jsonify({"error": f"Invalid method '{method}'. Use 'batch' or 'sequential'."}), 400
# 削除処理の終了時刻を記録
end_time = time.time()
# 削除処理にかかった時間を計算
elapsed_time = end_time - start_time
return jsonify({
"message": f"Successfully deleted {len(blobs)} objects from bucket '{bucket_name}' using method '{method}'.",
"elapsed_time_seconds": elapsed_time
}), 200
except Exception as e:
print(f"Error: {e}")
return jsonify({"error": str(e)}), 500
実行時の引数としては以下となります。
引数 | 設定値 |
---|---|
bucket_name | バケット名 |
method | batchまたはsequential。batch:バッチ削除、sequential:順次削除 |
バッチ削除のリクエスト例
curl -X POST Cloud Run関数のURL \
-H "Content-Type: application/json" \
-d '{
"bucket_name": "バケット名",
"method": "batch"
}'
順次削除のリクエスト例
curl -X POST Cloud Run関数のURL \
-H "Content-Type: application/json" \
-d '{
"bucket_name": "バケット名",
"method": "sequential"
}'
スクリプトの解説
ポイントだけ触れます。
オブジェクトをチャンクに分割
chunks = [blobs[i:i + chunk_size] for i in range(0, len(blobs), chunk_size)]
- チャンク分割:
- 削除対象のオブジェクトを
chunk_size
(100件)ごとに分割 - Batch APIの1リクエストの上限が100件なので100件ごとに分割する必要があるため
- 削除対象のオブジェクトを
バッチ削除
if method == "batch":
for i, chunk in enumerate(chunks):
with client.batch():
for blob in chunk:
blob.delete()
print(f"Batch {i + 1}/{len(chunks)}: Deleted {len(chunk)} objects.")
-
client.batch()
:- バッチコンテキストを作成
- このコンテキスト内(With句内)で実行された操作(
blob.delete()
)は、すべて1つのバッチリクエストにまとめられます
-
blob.delete()
:- 指定されたオブジェクトを削除
順次削除
elif method == "sequential":
for i, blob in enumerate(blobs):
blob.delete()
if (i + 1) % chunk_size == 0 or (i + 1) == len(blobs):
print(f"Sequential: Deleted {i + 1}/{len(blobs)} objects.")
- 順次削除:
- 各オブジェクトを1つずつ削除
- 削除対象が多い場合には時間がかかります
検証結果
以下の条件で検証を行いました。
- 削除対象のオブジェクト数: 100個、1000個、10,000個
- 1ファイルあたりのデータ量: 1MB
- 削除方法: バッチ削除、順次削除
- 環境: Cloud Run Functions を使用
それぞれの方法で削除した結果
バッチ削除、順次削除で削除した場合の結果を以下にまとめました。
削除方法 | 削除対象のオブジェクト数 | 処理時間 (秒) |
---|---|---|
Batch | 100 | 0.497 |
Sequential | 100 | 2.934 |
Batch | 1000 | 4.343 |
Sequential | 1000 | 27.002 |
Batch | 10,000 | 57.893 |
Sequential | 10,000 | 251.471 |
わかりやすいよう可視化しました。
100個では数秒程度の差分ですが、10000個だと200秒(3分ちょい)と大きく差が出ています。バッチ削除の方がオーバヘッドが少ないため効率的に削除できているものと考えます。
以下に両者のメリデメをまとめます。
削除方法 | メリット | デメリット |
---|---|---|
バッチ削除 | オーバヘッドが少ないため高速 | 実装が少し複雑化(分割する必要あり) |
順次削除 | 削除の進捗を細かくログにだすことができる、1件ずつ扱える | オーバヘッドが大きいため大量オブジェクト削除時は時間がかかる |
所感
検証を通じて、Cloud Storage 上の大量のオブジェクトを削除する際には削除方法によって処理時間に大きな差が出ることが確認できました。特に、削除対象が数千〜数万件に及ぶ場合、バッチ削除を使用することで、ネットワークのオーバーヘッドを大幅に削減し、効率的に削除を行えることがわかりました。
一方で、順次削除は、削除の進捗を細かく制御したい場合や、削除対象が少ない場合には適していると感じました。ただし、大量のオブジェクトを削除する用途には時間がかかりすぎます。
ただし、Batch APIを使用する際には、1リクエストあたりの上限が100件であるため、削除対象をチャンクに分割する実装が必要です。この点は少し手間がかかってしまいデメリットだなと思います。とはいえ削除速度の向上を考えると十分に価値があると感じました。
削除対象のオブジェクト数が増えるほど、バッチ削除の効率性が際立つ結果です。特に、10,000件の削除では、順次削除に比べて約4倍以上の速度差が出ており、大量のオブジェクトを扱うシステムではバッチ削除が適していると考えます。
それではまた。ナマステー
参考