Cloud Storage上の大量のオブジェクトを並行処理でコピーしてみた

Cloud Storage上の大量のオブジェクトを並行処理でコピーしてみた

Clock Icon2025.01.28

概要

最近Cloud Storage上の大量のオブジェクトを別のバケットに移動する必要がありました。オブジェクトの数は数万件に及び、1ファイルずつ移動する処理では時間がかかりすぎCloud Run Functionsがタイムアウトすることが予想されました。
そこでCloud Storage上の大量のオブジェクトをCloud Run Functionsで効率的に移動(コピー)する方法について、並行処理を活用して効率的に行うことができないかと思い、実際に実装して検証を行いました。今回の記事では検証の結果や実装、考察をまとめています。

やりたいこと

達成したいこと + 前提条件は以下です。

  • Cloud Run FunctionsでCloud Storage上のオブジェクトを数分程度でバケット間コピーしたい
  • コピー対象のファイルは1万ファイル
  • Cloud Run Functionsで実装。HTTPトリガーで起動する
  • pythonで実装

実装内容紹介

スクリプト全体

とりあえず実装したスクリプト全文です。

from google.cloud import storage
from flask import jsonify, Request
import functions_framework
import logging
from concurrent.futures import ThreadPoolExecutor
import time

def list_files_in_bucket(bucket_name):
    """
    指定されたバケット内のすべてのファイルをリスト取得
    """
    try:
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blobs = bucket.list_blobs()
        return [blob.name for blob in blobs]
    except Exception as e:
        logging.error(f"Failed to list files in bucket {bucket_name}: {e}")
        raise

def copy_blob(source_bucket_name, destination_bucket_name, source_blob_name, destination_blob_name):
    """
    バケット間でオブジェクトをコピーします。
    """
    try:
        client = storage.Client()
        source_bucket = client.bucket(source_bucket_name)
        destination_bucket = client.bucket(destination_bucket_name)
        source_blob = source_bucket.blob(source_blob_name)

        # オブジェクトをコピー
        source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
        logging.info(f"Copied {source_blob_name} to {destination_blob_name} in bucket {destination_bucket_name}")
    except Exception as e:
        logging.error(f"Failed to copy {source_blob_name} to {destination_blob_name}: {e}")
        raise

def copy_files_in_parallel(source_bucket_name, destination_bucket_name, files, max_workers=1):
    """
    ファイルを並行してコピー
    """
    success_count = 0
    failed_files = []

    def safe_copy(file_name):
        nonlocal success_count
        try:
            destination_blob_name = file_name
            copy_blob(source_bucket_name, destination_bucket_name, file_name, destination_blob_name)
            success_count += 1
        except Exception:
            logging.error(f"Error occurred while processing file: {file_name}")
            failed_files.append(file_name)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 並行してファイルをコピー
        executor.map(safe_copy, files)
    return success_count, failed_files

@functions_framework.http
def copy_and_batch_delete_all_files(request: Request):
    try:
        request_json = request.get_json()
        if not request_json:
            return jsonify({"error": "Invalid request, JSON body is required."}), 400

        source_bucket = request_json.get("source_bucket")
        destination_bucket = request_json.get("destination_bucket")
        max_workers = request_json.get("max_workers", 10)

        if not source_bucket or not destination_bucket:
            return jsonify({"error": "Missing required parameters: source_bucket, destination_bucket"}), 400

        files = list_files_in_bucket(source_bucket)
        if not files:
            return jsonify({"message": f"No files found in bucket {source_bucket}."}), 200

        # コピー処理の時間を計測
        start_copy_time = time.time()
        success_count, failed_files = copy_files_in_parallel(source_bucket, destination_bucket, files, max_workers)
        end_copy_time = time.time()
        copy_duration = end_copy_time - start_copy_time

        # レスポンスメッセージを変更
        if failed_files:
            return jsonify({
                "message": "Some files failed to copy",
                "copy_duration_seconds": copy_duration,
                "success_count": success_count,
                "failed_count": len(failed_files)
            }), 200
        else:
            return jsonify({
                "message": "All files copied successfully",
                "copy_duration_seconds": copy_duration,
                "success_count": success_count,
                "failed_count": 0
            }), 200

    except Exception as e:
        logging.error(f"Error in Cloud Function: {e}")
        return jsonify({"error": str(e)}), 500```

実装の要点は以下です。

  1. バケット内のファイルのリスト取得: 指定されたバケット内のすべてのファイルをlistします
  2. 並行コピー処理: ThreadPoolExecutorを使用して、複数のファイルを並行してコピーします
  3. 成功・失敗の記録: コピーに成功したファイル数と失敗したファイル数を記録、レスポンスに含めます
    ※処理時間の計測処理も実装しています。

並行処理について

大量のオブジェクトを効率的に処理するために、ThreadPoolExecutor を使用しています。
これにより、複数のオブジェクトを並行してコピー処理することが可能になります。
以下の関数が該当の箇所です。

def copy_files_in_parallel(source_bucket_name, destination_bucket_name, files, max_workers=10):
    """
    ファイルを並行してコピー
    """
    success_count = 0
    failed_files = []

    def safe_copy(file_name):
        nonlocal success_count
        try:
            destination_blob_name = file_name
            copy_blob(source_bucket_name, destination_bucket_name, file_name, destination_blob_name)
            success_count += 1
        except Exception:
            logging.error(f"Error occurred while processing file: {file_name}")
            failed_files.append(file_name)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 並行してファイルをコピー
        executor.map(safe_copy, files)
    return success_count, failed_files

少し実装に触れます。

  1. スレッドプールの作成:
    • ThreadPoolExecutor(max_workers=N) を使用して、最大N個のスレッドを持つスレッドプールを作成します
  2. タスクの実行:
    • executor.map()関数を使用して、第二引数のファイルリストの各要素に対してコピー処理を適用する並行処理を行います
  3. スレッドプールの終了:
    • with句を使用すると、処理完了後スレッドプールが自動的に終了します

実際に検証してみる

上記の実装は、Cloud Run Functionsにデプロイした場合は以下のコマンドで実行することができます。

curl -X POST Cloud FunctionsのURL \
-H "Authorization: bearer $(gcloud auth print-identity-token)" \
-H "Content-Type: application/json" \
-d '{"source_bucket": "コピー元バケット名",  "destination_bucket": "コピー先バケット名",  "max_workers": ワーカー数 }'

検証の前提

関数を作成したら実行して検証してみます。
今回は以下の条件で検証しています。

項目
オブジェクト数 10,000件
オブジェクトサイズ 1MB
関数のメモリ 512MB ~ 1GiB
CPU 583 millis ~ 2CPU

検証結果

複数回、さまざまな条件で実行した結果を表にまとめました。

コピー処理時間 (秒) ワーカー(worker)数 コピー失敗数 割り当てメモリ CPU
1231.40 1 0 512 MB 2 CPU
129.42 10 0 512 MB 2 CPU
116.15 100 0 512 MB 2 CPU
109.13 50 0 512 MB 2 CPU
200.46 50 0 512 MB 1 CPU
189.45 100 0 1 GiB 1 CPU
210.50 10 0 1 GiB 1 CPU
380.84 1000 14 1 GiB 1 CPU
569.03 500 35 1 GiB 583 millis
384.22 100 0 1 GiB 583 millis
357.23 50 0 1 GiB 583 millis

考察

順次処理 vs 並行処理

  • max_workers=1 の場合、順次処理となり1ファイルずつコピー処理します。for文で順番にコピーした場合などはこのケースです。この場合処理時間がファイル数に比例して増加します。今回のケースでは20分(1231.40秒)かかっています。
  • 一方、並行処理(max_workers=10max_workers=50)の場合、処理時間が大幅に短縮されます。最短で1分強(109秒)で処理が完了しています。

ワーカー数と処理時間の関係

  • 今回のケースでは適正ワーカー数は10 - 50の間に見受けられました。50 - 100では処理時間の短縮はあまりみられませんでした。
  • ワーカー数が多すぎると、逆に処理時間が増加する傾向がみられました。max_workers=1000の場合、200秒近く遅くなっていました。またコピー処理に失敗するようになりました※。

メモリとCPUの関係
メモリを増加させても処理時間は短縮していないことが表よりわかります。これは以下の理由が原因と考えられます。

  1. メモリの影響が小さい理由
  • 今回の実装では、メモリは主にファイルリストや一時的なデータ(例: 失敗したファイルのリスト)の保持に使用されます。これらのデータは比較的小さいため、512MBや1GiBのメモリで十分に処理可能です。
  • ファイル内容をメモリに保持したりすることがないためメモリがボトルネックになることはなく、Cloud Storage APIでコピー処理をしているのでメモリを増やしても処理時間の短縮にはほとんど寄与しません。
  1. CPUの増加が処理時間短縮に寄与する理由
  • 今回の実装ではCloud Storage APIのコピー処理(リクエストの生成、レスポンスの処理)やエラーハンドリングなど、CPU依存のタスクが多く含まれます。
  • 並行処理(ThreadPoolExecutor)を使用している場合、スレッドごとにCPUリソースが必要です。CPUの性能が高いほど、複数のスレッドが効率的に動作し、全体の処理時間が短縮されます。
  1. I/O待ちの影響とCPUの役割
  • Cloud Storage APIのレスポンス待ちの間はI/O待ち状態になります。この間、CPUは使用されず、ネットワークの速度やサーバーの応答時間が処理時間に影響します。
  • ただし、I/O待ち中にGIL(Global Interpreter Lock)が解放されるため、他のスレッドが動作を継続できます。CPU性能が高いほど、I/O待ち以外のタスク(リクエスト生成やレスポンス処理など)を効率的に処理できるため、全体のスループットが向上します。
  • つまりは1つのファイルをコピー処理のリクエスト送信して、コピー処理完了までのレスポンスが返却されるまでの間(I/O待ち中)に別のコピー処理のリクエストを送信できるという寸法です。

上記より以下が言えます。

  • CPUの増加は、並行処理の効率を向上させ、I/O待ち以外のタスクを高速化するため、処理時間短縮につながります。
  • メモリの増加は、今回のスクリプトではほとんど影響を与えません。512MBや1GiBのメモリで十分に処理可能です。
  • I/O待ちが発生する処理(今回ならCloud Storage APIのコピー処理)では、CPU性能を高めることで、I/O待ち以外のタスクを効率的に処理し、全体のパフォーマンスを向上させることができます。

補足:worker数を増やしすぎることはリスクがある

worker数を500、1000にしたところ以下のエラーが発生しました。

Failed to copy xxxx.txt to xxxx.txt: HTTPSConnectionPool(host='storage.googleapis.com', port=443): Max retries exceeded with url: /storage/v1/b/バケット名/o/xxxx.txt/copyTo/b/バケット名/o/xxxx.txt?prettyPrint=false (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:2417)')))

→Cloud Storage API を使用してオブジェクトをコピーする際に、SSL通信のエラーが発生したことを示しています。SSL接続が途中で切断された(EOF occurred in violation of protocol)ため、リクエストが失敗しています。

DEFAULT 2025-01-27T14:49:35.674853Z Failed to copy xxxx.txt to file_9213.txt: HTTPSConnectionPool(host='storage.googleapis.com', port=443): Max retries exceeded with url: /storage/v1/b/バケット名/o/file_9213.txt/copyTo/b/バケット名/o/xxxx.txt?prettyPrint=false (Caused by SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1000)')))

→Cloud Storage API を使用してオブジェクトをコピーする際に、SSL通信のエラーが発生したことを示しています。具体的には、SSL接続が途中で切断され、プロトコル違反(EOF occurred in violation of protocol)が発生したため、リクエストが失敗しています。

推測になってしまいますが、エラーの理由としては以下が考えられると思います。

worker数を500や1000に設定すると、ThreadPoolExecutorによって500~1000個のスレッドが同時に動作します。
各スレッドが Google Cloud Storage API にリクエストを送信するため、短時間で非常に多くのリクエストがサーバーに到達します。SSL接続は、リクエストごとに確立されるため、短時間で大量のSSL接続が発生します。サーバーまたはクライアントがこれらの接続を処理しきれない場合、接続が切断され、SSL Errorが発生します。
また、短時間で大量のリクエストが送信されると、ネットワーク帯域が圧迫され、パケットロスやタイムアウトが発生する可能性があります。これにより、SSL接続が途中で切断されることがあります。
上記以外のエラーも発生する可能性があると考えます。worker数はレートリミット超過、相手先サーバへの意図せぬ負荷を与えるため十分検討が必要と考えます

また、Cloud StorageなどGoogle CloudのAPIを実行する際には、レートリミットを超過しないように事前に調査検討することが重要です。たとえば、Cloud StorageAPIでは、操作によっては1秒あたりのリクエスト数に制限があります(詳細は公式ドキュメントを参照)。これらの制限を考慮し、適切なworker数を設定することが重要です。
https://cloud.google.com/storage/quotas?hl=ja

まとめ

ThreadPoolExecutorを活用した並行処理は、I/O待ちが発生する実装において非常に効果的であることがよくわかりました。特に、Cloud Storage APIのようなネットワークI/Oが多い処理では、適切なリソース設定とworker数の調整により、処理時間を大幅に短縮できることがわかりました。ただし一方でworker数の設定によっては意図せぬ障害を引き起こすため、十分な検討と検証が必要だと考えます。

それではまた。ナマステー

参考

https://docs.python.org/ja/3.13/library/concurrent.futures.html

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.