Cloud Storage上の大量のオブジェクトをバッチリクエストでコピーしてみた
概要
Cloud Storage上の大量のオブジェクトを高速で削除、コピーすることに向き合っている最近です。以前は並行処理でコピーをしてみましたが、今回はCloud StoageのBatch APIを用いてコピーしてみます。
Batch APIに関してはこちらをご参照ください。
やりたいこと
- Cloud Storage上の大量のオブジェクトをBatch APIを用いてコピーしたい
- コピーしたら元ファイルを削除して、移動処理にしたい
注意点
Batch APIのリファレンスにはアップロード/ダウンロードのバッチ操作はできないと記載があります。
Warning: Cloud Storage does not support batch operations for uploading or downloading. Additionally, the current batch design does not support library methods whose return values depend on the response payload.
警告 クラウドストレージは、アップロードやダウンロードのバッチ操作をサポートしていません。 さらに、現在のバッチ設計では、戻り値がレスポンスのペイロードに依存するライブラリメソッドをサポートしていません。(DeepL翻訳)
Batch APIの動作として、1つのHTTPリクエストにまとめてリクエストをするのでアップロードやダウンロードの場合はHTTPリクエストのボディにオブジェクトのデータが含む形で行われ、リクエストが分割される場合があるのでサポートされないのではないかと考えます。コピーの場合はリクエストボディにオブジェクトのデータは含みません。
やってみる
スクリプト
とりあえずスクリプト全文です。バッチ削除した時のコピー版です。
from google.cloud import storage
from flask import jsonify, Request
import functions_framework
import time # 時間計測のために追加
@functions_framework.http
def copy_objects_in_gcs(request: Request):
try:
# リクエストからパラメータを取得
request_json = request.get_json()
if not request_json:
return jsonify({"error": "Invalid request, JSON body is required."}), 400
source_bucket_name = request_json.get("source_bucket_name") # コピー元バケット名
destination_bucket_name = request_json.get("destination_bucket_name") # コピー先バケット名
if not source_bucket_name or not destination_bucket_name:
return jsonify({"error": "Missing 'source_bucket_name' or 'destination_bucket_name' in request."}), 400
method = request_json.get("method", "batch") # デフォルトは "batch"
chunk_size = 100 # 固定で100件ずつコピー
# クライアントを初期化
client = storage.Client()
source_bucket = client.bucket(source_bucket_name)
destination_bucket = client.bucket(destination_bucket_name)
# コピー元バケット内のすべてのオブジェクトを取得
blobs = list(source_bucket.list_blobs())
if not blobs:
return jsonify({"message": f"No objects found in source bucket '{source_bucket_name}'."}), 200
# オブジェクトをチャンクに分割
chunks = [blobs[i:i + chunk_size] for i in range(0, len(blobs), chunk_size)]
# コピー処理の開始時刻を記録
start_time = time.time()
# コピー方法に応じて処理を分岐
if method == "batch":
# バッチコピー
for i, chunk in enumerate(chunks):
with client.batch():
for blob in chunk:
# コピー元のオブジェクトをコピー先バケットにコピー
source_bucket.copy_blob(blob, destination_bucket)
print(f"Batch {i + 1}/{len(chunks)}: Copied {len(chunk)} objects.")
elif method == "sequential":
# 順次コピー
for i, blob in enumerate(blobs):
# コピー元のオブジェクトをコピー先バケットにコピー
source_bucket.copy_blob(blob, destination_bucket)
if (i + 1) % chunk_size == 0 or (i + 1) == len(blobs):
print(f"Sequential: Copied {i + 1}/{len(blobs)} objects.")
else:
return jsonify({"error": f"Invalid method '{method}'. Use 'batch' or 'sequential'."}), 400
# コピー処理の終了時刻を記録
end_time = time.time()
# コピー処理にかかった時間を計算
elapsed_time = end_time - start_time
return jsonify({
"message": f"Successfully copied {len(blobs)} objects from bucket '{source_bucket_name}' to bucket '{destination_bucket_name}' using method '{method}'.",
"elapsed_time_seconds": elapsed_time
}), 200
except Exception as e:
print(f"Error: {e}")
return jsonify({"error": str(e)}), 500
簡単にポイントだけ解説します。
# オブジェクトをチャンクに分割
chunks = [blobs[i:i + chunk_size] for i in range(0, len(blobs), chunk_size)]
Batch APIでは1回の処理の上限は100件までとなるのでチャンクに分割をします。今回の実装では100件に分割しています。
# バッチコピー
if method == "batch":
# バッチコピー
for i, chunk in enumerate(chunks):
with client.batch():
for blob in chunk:
# コピー元のオブジェクトをコピー先バケットにコピー
source_bucket.copy_blob(blob, destination_bucket)
print(f"Batch {i + 1}/{len(chunks)}: Copied {len(chunk)} objects.")
- 処理の流れ
chunks
に分割されたオブジェクトリストを1つずつ処理with client.batch()
を使用して、バッチ処理のコンテキストを開始- このブロック内で実行されるリクエストは、1つのHTTPリクエストにまとめられます
for blob in chunk
で、チャンク内の各オブジェクトを処理source_bucket.copy_blob(blob, destination_bucket)
を使用して、オブジェクトをコピー
- チャンクごとに進捗をログに出力
このスクリプトを実行すると、100件単位でオブジェクトがコピーされます。
※動作速度比較のため1ファイルずつコピーする、順次コピーの処理も入れています。
また、起動はコピー元・コピー先バケット・処理種別を設定してcurlで実行します。
# 起動方法例
curl -m 3610 -X POST 関数のURL -H "Authorization: bearer $(gcloud auth print-identity-token)" -H "Content-Type: application/json" -d \
'{
"source_bucket_name": "コピー元バケット名",
"destination_bucket_name": "コピー先バケット名",
"method": "sequential"
}'
パフォーマンスを見てみる
1MBのファイルを1万ファイル用意して、バッチコピーと順次コピーでの動作速度を比較してみました。
コピー方法 | 処理時間 (秒) | コピーしたオブジェクト数 |
---|---|---|
batch |
53.32 秒 | 10,000 |
sequential |
368.34 秒 | 10,000 |
上記より、バッチコピー(batch)のほうが順次コピーと比較して約6倍程度高速に動作することがわかります。
実際動かしてみるととても早いです。
移動処理も作ってみる
せっかくなので移動処理(コピー + 元データ削除)も実装してみました。この場合の注意点としては、Batch APIは順序を保証しないので1リクエスト内にコピーと削除を詰めてしまうと、コピーする前に削除される可能性があるという点です。
注意: サーバーは、任意の順序でリクエストを実行する可能性があります。指定順序で実行されるリクエストに依存しないでください。2 つの呼び出しが特定の順序で行われるようにするには、最初の呼び出しを単独で送信し、最初の呼び出しに対するレスポンスを受信してから 2 番目の呼び出しを送信します。
引用:https://cloud.google.com/storage/docs/batch?hl=ja#response_to_a_batch_request
# 良くないコードの例
with client.batch():
for blob in chunk:
try:
# コピー元のオブジェクトをコピー先バケットにコピー
source_bucket.copy_blob(blob, destination_bucket)
except (GoogleAPICallError, NotFound, Forbidden, Conflict) as e:
# コピーエラーを記録
errors.append({
"object_name": blob.name,
"error": f"Copy failed: {str(e)}"
})
else:
try:
# コピーが成功したら、元のオブジェクトを削除
blob.delete()
except (GoogleAPICallError, NotFound, Forbidden, Conflict) as e:
# 削除エラーを記録
errors.append({
"object_name": blob.name,
"error": f"Delete failed: {str(e)}"
})
この場合、コピー処理よりも前に削除される可能性があります。
この点を踏まえ、以下の順序で実装をします。
- バッチコピー
- バッチ削除
以下に実装例を示します。
スクリプト全文
from google.cloud import storage
from flask import jsonify, Request
import functions_framework
import time # 時間計測のために追加
def process_chunk_for_copy(chunk, source_bucket, destination_bucket, errors):
# チャンク内のオブジェクトをコピーする処理
for blob in chunk:
try:
# コピー元のオブジェクトをコピー先バケットにコピー
source_bucket.copy_blob(blob, destination_bucket)
except Exception as e:
# コピーエラーを記録
errors.append({
"object_name": blob.name,
"error": f"Copy failed: {str(e)}"
})
def process_chunk_for_delete(chunk, errors):
# チャンク内のオブジェクトを削除する処理
for blob in chunk:
# コピーエラーが記録されていないオブジェクトのみ削除
if blob.name not in [error["object_name"] for error in errors if "Copy failed" in error["error"]]:
try:
# コピーが成功したオブジェクトを削除
blob.delete()
except Exception as e:
# 削除エラーを記録
errors.append({
"object_name": blob.name,
"error": f"Delete failed: {str(e)}"
})
@functions_framework.http
def move_objects_in_gcs(request: Request):
try:
# リクエストからパラメータを取得
request_json = request.get_json()
if not request_json:
return jsonify({"error": "Invalid request, JSON body is required."}), 400
source_bucket_name = request_json.get("source_bucket_name") # コピー元バケット名
destination_bucket_name = request_json.get("destination_bucket_name") # コピー先バケット名
if not source_bucket_name or not destination_bucket_name:
return jsonify({"error": "Missing 'source_bucket_name' or 'destination_bucket_name' in request."}), 400
method = request_json.get("method", "batch") # デフォルトは "batch"
chunk_size = 100 # 固定で100件ずつ処理
# クライアントを初期化
client = storage.Client()
source_bucket = client.bucket(source_bucket_name)
destination_bucket = client.bucket(destination_bucket_name)
# コピー元バケット内のすべてのオブジェクトを取得
blobs = list(source_bucket.list_blobs())
if not blobs:
return jsonify({"message": f"No objects found in source bucket '{source_bucket_name}'."}), 200
# オブジェクトをチャンクに分割
chunks = [blobs[i:i + chunk_size] for i in range(0, len(blobs), chunk_size)]
# 処理の開始時刻を記録
start_time = time.time()
# エラーを記録するリスト
errors = []
# コピー処理
for i, chunk in enumerate(chunks):
if method == "batch":
with client.batch():
process_chunk_for_copy(chunk, source_bucket, destination_bucket, errors)
elif method == "sequential":
process_chunk_for_copy(chunk, source_bucket, destination_bucket, errors)
else:
return jsonify({"error": f"Invalid method '{method}'. Use 'batch' or 'sequential'."}), 400
print(f"Chunk {i + 1}/{len(chunks)}: Copied {len(chunk)} objects.")
# 削除処理 (コピーが成功したオブジェクトのみ削除)
for i, chunk in enumerate(chunks):
if method == "batch":
with client.batch():
process_chunk_for_delete(chunk, errors)
elif method == "sequential":
process_chunk_for_delete(chunk, errors)
print(f"Chunk {i + 1}/{len(chunks)}: Deleted {len(chunk)} objects.")
# 処理の終了時刻を記録
end_time = time.time()
# 処理にかかった時間を計算
elapsed_time = end_time - start_time
# レスポンスにエラー情報を含める
response = {
"message": f"Successfully moved {len(blobs) - len(errors)} objects from bucket '{source_bucket_name}' to bucket '{destination_bucket_name}' using method '{method}'.",
"elapsed_time_seconds": elapsed_time,
"errors": errors # エラー情報
}
return jsonify(response), 200
except Exception as e:
print(f"Error: {e}")
return jsonify({"error": str(e)}), 500
バッチコピー時に失敗したファイルを記録してそれを除いたファイルをバッチ削除するという実装です。
今回もバッチ処理・順次処理それぞれの動作速度を見てみます。
パフォーマンスを見てみる
1MBのファイルを1万ファイル用意して、バッチ処理と順次処理での動作速度を比較してみました。
移動方法 | 処理時間 (秒) | 移動したオブジェクト数 |
---|---|---|
sequential |
622.50 | 10,000 |
batch |
108.68 | 10,000 |
上記より、移動処理もバッチ処理の方が6倍程度高速になっているのがわかります。この差は大きいですね。
所感
Batch APIを用いると順次処理に比べてかなり高速に処理できることがわかりました。以前、並行処理を用いてコピー処理を高速化してみましたが並行処理の場合はWorker数を調整する必要があったりCPU数も考慮したりで考慮ポイントが多かったのですがこのBatch APIを用いた処理なら基本的に順次処理を置き換えるだけでよいので検討事項も減るかなと思います。
とはいえリトライの考慮は当然必要なので、エラーを記録して再度リトライするというような仕組みは実装する必要があると考えます。
それではまた。ナマステー
参考