Cloud Composer で Storage Transfer Service 転送後のGCSファイル検証と自動リトライを実装する

Cloud Composer で Storage Transfer Service 転送後のGCSファイル検証と自動リトライを実装する

2025.10.13

はじめに

こんにちは!エノカワです。

Cloud Composer (Apache Airflow) を使用したデータパイプラインでは、Storage Transfer Service (STS) でのファイル転送後に、転送されたファイルが実際に存在することを確認したいケースがあります。
特に、特定のファイル形式(.parquet.csv など)が確実に転送されているかを検証し、問題があった場合は自動的にリトライする仕組みが必要になることがあります。

本記事では、STS転送完了後にGCSファイルの存在を検証し、ファイルが見つからない場合にDAG全体を自動リトライする実装方法について紹介します。

前回までのおさらい

これまでのCloud Composer × Storage Transfer Serviceシリーズでは、以下のような内容を紹介してきました。

標準センサーによる監視

https://dev.classmethod.jp/articles/cloud-composer-storage-transfer-service-status-sensor/

標準センサー(CloudDataTransferServiceJobStatusSensor)を使ったSTS転送ジョブのステータス監視方法を紹介しました。

カスタムセンサーによる完了待機

https://dev.classmethod.jp/articles/cloud-composer-custom-sensor-storage-transfer-service/

より確実なオペレーション完了待機を実現するカスタムセンサーの実装方法を紹介しました。

動的設定更新ワークフロー

https://dev.classmethod.jp/articles/cloud-composer-storage-transfer-service-dynamic-config-update/

転送履歴を解析し、lastModifiedSince パラメータを動的に更新することで、ファイルの取りこぼしを防ぐ増分転送を実現する方法を紹介しました。

今回は、これらの技術を組み合わせ、さらに 「転送後のファイル検証」「自動リトライ」 という実践的な要件を実装します。

今回やること

解決したい課題

Storage Transfer Service での転送が「成功」と報告されても、実際には特定のファイル形式が転送されていないケースがあります。

例えば:

  • 転送元に期待したファイル形式が存在しなかった
  • フィルタ条件により特定ファイルが除外された
  • ネットワークエラーで一部ファイルの転送が失敗した

このような場合、後続のBigQueryへのロード処理などが失敗してしまいます。

解決アプローチ

以下の3つのアプローチで、この課題を解決します。

  1. GCSファイル検証: 転送完了後、転送先GCSバケットに特定ファイル形式が存在することを確認
  2. DAG全体の自動リトライ: ファイルが見つからない場合、TriggerDagRunOperatorを使用してDAG全体を最初から再実行
  3. リトライ回数管理: confパラメータを使用してDAG実行間でリトライ回数を引き継ぎ

Cloud Composer 環境の準備

検証環境として Cloud Composer 環境を準備します。
以前の記事「Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する」と同様の手順で環境を構築します。

また、Storage Transfer Service のオペレーターやフックを使用するには、apache-airflow-providers-amazonパッケージのインストールが必要です。
こちらの手順についても、前回の記事の「amazon パッケージをインストール」セクションをご参照ください。

cloud-composer-sts-gcs-verification-auto-retry_01.png

DAGを作成する

以下が、実際に動作検証を行ったDAGです。

前回記事で紹介した動的設定更新を含むSTS転送処理に加えて、GCSファイル検証と自動リトライ機能を実装しています。

sts_gcs_verification_with_retry.py
			
			from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
    CloudDataTransferServiceRunJobOperator,
    CloudDataTransferServiceUpdateJobOperator,
    CloudDataTransferServiceListOperationsOperator
)
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import days_ago
from airflow.exceptions import AirflowException

# カスタムセンサーをインポート
from custom_sts_sensor import CustomStorageTransferOperationSensor

# 設定値(注:実際の値に置き換える)
PROJECT_ID = '{プロジェクトID}'
TRANSFER_JOB_ID = 'transferJobs/{転送ジョブID}'
GCS_TARGET_BUCKET = '{GCSバケット名}'
GCS_TARGET_PREFIX = 'data/processed/'
TARGET_FILE_EXTENSIONS = ['.parquet', '.csv']
MAX_RETRY_COUNT = 3

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
    'retries': 0,  # リトライなし
}

def prepare_update_body(**context):
    """転送履歴から最終成功開始時刻を抽出しジョブ更新用設定を生成"""
    from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
        CloudDataTransferServiceHook
    )

    # 前タスクから転送履歴を取得
    operations = context['task_instance'].xcom_pull(task_ids='sts_transfer_group.list_operations')

    # フォールバック時間(履歴が無い場合は24時間前を使用)
    fallback_time = (datetime.utcnow() - timedelta(hours=24)).isoformat() + 'Z'

    # 最終成功時刻を抽出
    if not operations:
        last_success_time = fallback_time
    else:
        # 成功したオペレーションのみを抽出
        success_operations = [
            op for op in operations
            if op.get('metadata', {}).get('status') == 'SUCCESS'
        ]

        if not success_operations:
            last_success_time = fallback_time
        else:
            # startTimeでソートして最新の成功開始時刻を取得
            success_operations.sort(
                key=lambda x: x.get('metadata', {}).get('startTime', ''),
                reverse=True
            )
            last_success_time = success_operations[0].get('metadata', {}).get('startTime')

    # 既存の転送ジョブ設定を取得
    hook = CloudDataTransferServiceHook(gcp_conn_id='google_cloud_default')
    transfer_job = hook.get_transfer_job(
        job_name=TRANSFER_JOB_ID,
        project_id=PROJECT_ID
    )

    # 更新用転送ジョブ設定を作成(変更するフィールドのみ)
    updated_transfer_job = {
        'transferSpec': {
            **transfer_job.get('transferSpec', {}),
            'objectConditions': {
                **transfer_job.get('transferSpec', {}).get('objectConditions', {}),
                'lastModifiedSince': last_success_time
            }
        }
    }

    # 更新用リクエストボディを作成
    update_body = {
        "projectId": PROJECT_ID,
        "transferJob": updated_transfer_job
    }

    return update_body

def check_gcs_files_and_branch(**context):
    """GCSファイル存在チェック + リトライ分岐判定"""

    # GCSファイル一覧を取得
    file_list = context['task_instance'].xcom_pull(
        task_ids='gcs_verification_group.list_gcs_files'
    ) or []

    # 特定拡張子のファイルをフィルタリング
    target_files = [
        f for f in file_list
        if any(f.endswith(ext) for ext in TARGET_FILE_EXTENSIONS)
    ]

    # DAG実行時のconfからリトライ回数を取得
    dag_run_conf = context['dag_run'].conf or {}
    retry_count = dag_run_conf.get('retry_count', 0)

    if target_files:
        # ファイル検証成功
        return 'gcs_verification_group.success_end'
    else:
        # ファイルが見つからない場合
        if retry_count >= MAX_RETRY_COUNT:
            raise AirflowException(
                f"GCSファイル検証失敗: 対象ファイルが見つかりません (リトライ上限到達: {retry_count}/{MAX_RETRY_COUNT})"
            )
        else:
            # リトライ回数を保存
            context['task_instance'].xcom_push(key='retry_count', value=retry_count + 1)
            return 'gcs_verification_group.prepare_retry_conf'

def prepare_retry_conf(**context):
    """DAG再実行時の設定を準備"""
    import time

    # 前タスクからリトライ回数を取得
    retry_count = context['task_instance'].xcom_pull(
        task_ids='gcs_verification_group.check_files_and_branch',
        key='retry_count'
    )

    # リトライ回数に応じた待機時間(1回目: 60秒、2回目: 120秒、3回目: 180秒)
    wait_seconds = retry_count * 60
    time.sleep(wait_seconds)

    # confとして渡す設定
    conf = {
        'retry_count': retry_count,
        'previous_run_id': context['dag_run'].run_id,
        'triggered_at': datetime.utcnow().isoformat()
    }

    return conf

# DAG定義
with DAG(
    dag_id='sts_gcs_verification_with_retry',
    default_args=default_args,
    schedule_interval=None,  # 手動トリガー
    catchup=False,
    render_template_as_native_obj=True,  # XComデータを適切に処理
) as dag:

    # STS転送グループ
    with TaskGroup('sts_transfer_group') as sts_group:

        # 転送履歴の取得
        list_operations = CloudDataTransferServiceListOperationsOperator(
            task_id='list_operations',
            project_id=PROJECT_ID,
            request_filter={'job_names': [TRANSFER_JOB_ID]}
        )

        # 更新設定の準備
        prepare_update = PythonOperator(
            task_id='prepare_update_body',
            python_callable=prepare_update_body,
            provide_context=True
        )

        # 転送ジョブ設定の更新
        update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
            task_id='update_transfer_job',
            job_name=TRANSFER_JOB_ID,
            body="{{ task_instance.xcom_pull(task_ids='sts_transfer_group.prepare_update_body') }}",
            project_id=PROJECT_ID
        )

        # 転送ジョブの実行
        run_transfer = CloudDataTransferServiceRunJobOperator(
            task_id='run_transfer',
            job_name=TRANSFER_JOB_ID,
            project_id=PROJECT_ID
        )

        # 転送完了待機
        wait_for_transfer = CustomStorageTransferOperationSensor(
            task_id='wait_for_transfer',
            operation_name="{{ task_instance.xcom_pull(task_ids='sts_transfer_group.run_transfer')['name'] }}",
            project_id=PROJECT_ID,
            poke_interval=60,        # 60秒ごとにステータスを確認
            timeout=60 * 10,         # 最大10分間待機
            mode='reschedule'        # リソース効率のためrescheduleモードを使用
        )

        # タスク依存関係
        list_operations >> prepare_update >> update_transfer_job >> run_transfer >> wait_for_transfer

    # GCS検証グループ
    with TaskGroup('gcs_verification_group') as verify_group:

        # GCSファイル一覧を取得
        list_gcs_files = GCSListObjectsOperator(
            task_id='list_gcs_files',
            bucket=GCS_TARGET_BUCKET,
            prefix=GCS_TARGET_PREFIX
        )

        # ファイル存在チェック + 分岐判定
        check_files = BranchPythonOperator(
            task_id='check_files_and_branch',
            python_callable=check_gcs_files_and_branch
        )

        # 成功終了
        success_end = DummyOperator(
            task_id='success_end',
            trigger_rule='none_failed_min_one_success'
        )

        # リトライ設定準備
        prepare_retry = PythonOperator(
            task_id='prepare_retry_conf',
            python_callable=prepare_retry_conf,
            trigger_rule='none_failed_min_one_success'
        )

        # DAG再実行
        retry_transfer = TriggerDagRunOperator(
            task_id='retry_transfer',
            trigger_dag_id=dag.dag_id,
            conf="{{ task_instance.xcom_pull(task_ids='gcs_verification_group.prepare_retry_conf') }}",
            wait_for_completion=False,
            reset_dag_run=True,
            trigger_rule='none_failed_min_one_success'
        )

        # タスク依存関係
        list_gcs_files >> check_files >> [success_end, prepare_retry]
        prepare_retry >> retry_transfer

    # DAG全体のフロー
    sts_group >> verify_group

		

DAGコードの解説

このDAGは2つのTaskGroupで構成され、STS転送からGCSファイル検証まで一連の処理を自動化します。

STS転送グループ(sts_transfer_group

  1. list_operations: Storage Transfer Service のオペレーション履歴を取得します

    • CloudDataTransferServiceListOperationsOperatorを使用
    • request_filterで特定の転送ジョブに絞り込み
    • 取得した履歴はXComsに自動格納
  2. prepare_update_body: 履歴から最終成功開始時刻を抽出し、ジョブ更新用リクエストボディを準備します

    • Python関数を使用して成功オペレーションからstartTimeを取得
    • 変更フィールドのみを指定して部分更新(前回記事と同じ実装)
  3. update_transfer_job: 転送ジョブの設定を更新します

    • CloudDataTransferServiceUpdateJobOperatorを使用
    • XComsから動的にボディを取得し、lastModifiedSinceパラメータを自動更新
  4. run_transfer: 更新された設定で転送ジョブを実行します

    • CloudDataTransferServiceRunJobOperatorを使用
    • 実行結果(オペレーション名)をXComsに格納
  5. wait_for_transfer: 転送オペレーションの完了を待機します

    • カスタムセンサーを使用して完了待機
    • poke_interval=60で60秒間隔でステータス確認
    • mode='reschedule'でリソース効率を最適化

GCS検証グループ(gcs_verification_group

  1. list_gcs_files: GCSバケットからファイル一覧を取得します

    • GCSListObjectsOperatorを使用
    • 指定したバケットとプレフィックスのファイルをリスト化
  2. check_files_and_branch: ファイル存在チェックと分岐判定を行います

    • BranchPythonOperatorを使用
    • 対象ファイル形式が存在すればsuccess_end
    • 存在しなければprepare_retry_confへ分岐
  3. success_end: 検証成功時の終了タスク

    • DummyOperatorを使用
    • trigger_rule='none_failed_min_one_success'でBranchPythonOperator後の実行を保証
  4. prepare_retry_conf: リトライ用の設定を準備します

    • リトライ回数に応じた待機時間を設定(指数バックオフ)
    • confパラメータを生成して次回実行に引き継ぎ
  5. retry_transfer: DAG全体を再実行します

    • TriggerDagRunOperatorを使用
    • 新しいDAG実行を作成(同一DAGのリトライではない)
    • confパラメータでリトライ回数を引き継ぎ

カスタムセンサーの活用

前回記事で実装したカスタムセンサー (CustomStorageTransferOperationSensor) を再利用します。

カスタムセンサー (`CustomStorageTransferOperationSensor`)
custom_sts_sensor.py
			
			"""
シンプルなStorage Transfer Service センサー

指定された転送ジョブのステータスを取得し、実行中であれば完了まで待機します。
"""

from typing import Dict, Sequence

from airflow.sensors.base import BaseSensorOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
    CloudDataTransferServiceHook,
)
from airflow.exceptions import AirflowException

class CustomStorageTransferOperationSensor(BaseSensorOperator):
    """
    Storage Transfer Service のオペレーション完了を待機するシンプルなセンサー
    :param operation_name: 監視するオペレーションの完全な名前
    :param project_id: Google Cloud プロジェクトID
    :param gcp_conn_id: Google Cloud 接続ID
    """

    template_fields: Sequence[str] = ('operation_name',)

    def __init__(
        self,
        *,
        operation_name: str,
        project_id: str,
        gcp_conn_id: str = 'google_cloud_default',
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.operation_name = operation_name
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id

    def poke(self, context: Dict) -> bool:
        hook = CloudDataTransferServiceHook(
            gcp_conn_id=self.gcp_conn_id,
        )

        self.log.info("オペレーション %s の状態を確認中...", self.operation_name)

        operation = hook.get_transfer_operation(operation_name=self.operation_name)
        self.log.info("オペレーションの応答: %s", operation)

        done = operation.get('done', False)

        # メタデータからステータスを取得
        metadata = operation.get('metadata', {})
        status = metadata.get('status')
        self.log.info("オペレーションのステータス: %s", status)

        if done:
            # エラー情報がある場合はエラーとして扱う
            if 'error' in operation:
                error = operation['error']
                err_msg = error.get('message', 'Unknown error')
                raise AirflowException(
                    f"オペレーション {self.operation_name} がエラーで終了しました: {err_msg}"
                )

            # ステータスがSUCCESSでない場合もエラーとして扱う
            if status != 'SUCCESS':
                raise AirflowException(
                    f"オペレーション {self.operation_name} が正常に完了しませんでした。ステータス: {status}"
                )

            # 明示的なエラーがなく、ステータスもSUCCESSの場合は成功として扱う
            self.log.info("オペレーション %s が正常に完了しました", self.operation_name)
            return True

        self.log.info("オペレーション %s はまだ完了していません。現在のステータス: %s", self.operation_name, status)
        return False

		

実装ポイント

ここでは、実装時に重要となるポイントや、ハマりやすい箇所について解説します。

TaskGroup内での分岐処理

BranchPythonOperatorがTaskGroup内のタスクIDを正しく参照するには、完全修飾名が必要です。

			
			# ❌ TaskGroup名を省略した場合、タスクが見つからずエラーになる
return 'success_end'

# ✅ TaskGroup名を含む完全修飾名で指定する必要がある
return 'gcs_verification_group.success_end'

		

リトライ回数の管理

confパラメータを使用してDAG実行間でリトライ回数を引き継ぎます:

			
			# リトライ回数の取得
dag_run_conf = context['dag_run'].conf or {}
retry_count = dag_run_conf.get('retry_count', 0)

# 次回実行への引き継ぎ
retry_transfer = TriggerDagRunOperator(
    trigger_dag_id=dag.dag_id,
    conf={'retry_count': retry_count + 1}
)

		

TriggerDagRunOperatorによる新規実行

TriggerDagRunOperatorは同一DAG実行のリトライではなく、新しいDAG実行を作成します。

これにより:

  • 各リトライが独立したDAG実行として記録される
  • 上流タスク(STS転送)から再実行できる
  • 実行履歴が明確に追跡できる

実行履歴の例:

			
			実行1: manual__2025-10-13T09:21:36+00:00 (retry_count=0)
├── sts_transfer_group ✅
├── gcs_verification_group
│   └── retry_transfer ✅ → 新DAG実行をトリガー

実行2: retry_2025101309xxxx (retry_count=1) ← 新しい実行
├── sts_transfer_group ✅
├── gcs_verification_group
│   └── success_end ✅

		

BranchPythonOperator後のtrigger_rule設定

BranchPythonOperatorを使用すると、選択されなかった分岐先のタスクは skipped(スキップ) 状態になります。
そのため、分岐後に実行されるタスクには trigger_rule='none_failed_min_one_success' を設定する必要があります。

			
			# 成功終了
success_end = DummyOperator(
    task_id='success_end',
    trigger_rule='none_failed_min_one_success'  # ← 必須
)

# リトライ設定準備
prepare_retry = PythonOperator(
    task_id='prepare_retry_conf',
    python_callable=prepare_retry_conf,
    trigger_rule='none_failed_min_one_success'  # ← 必須
)

		

なぜ必要か?

  • デフォルトの trigger_ruleall_success(すべての上流タスクが成功)
  • BranchPythonOperatorでは一方が success、もう一方が skipped になる
  • all_success では skipped を失敗とみなすため、タスクが実行されない
  • none_failed_min_one_success は「失敗がなく、最低1つ成功があれば実行」という条件

この設定により、どちらの分岐を選択しても後続タスクが正しく実行されます。

リトライ前の待機時間

STS転送直後はGCSへのファイル反映に時間がかかる場合があるため、リトライ前に待機時間を設けています。

			
			# リトライ回数に応じた待機時間(1回目: 60秒、2回目: 120秒、3回目: 180秒)
wait_seconds = retry_count * 60
time.sleep(wait_seconds)

		

注意点:
この実装では time.sleep() を使用するため、Workerスロットを占有します。数分程度であれば問題ありませんが、より長時間の待機が必要な場合や、Workerリソースを効率的に使いたい場合は、TimeDeltaSensor の使用などの代替案を検討してください。

https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/sensors/datetime.html

動作確認

実際にDAGを実行し、動的設定更新とGCSファイル検証、自動リトライ機能の動作を確認してみました。

DAGのデプロイ

Cloud Composer の Web UI から DAG をデプロイします。
Graph Viewでは、2つのTaskGroup(sts_transfer_groupgcs_verification_group)が確認できます。

cloud-composer-sts-gcs-verification-auto-retry_02.png

正常系:ファイルが存在する場合

対象ファイルが存在するケースでの動作確認です。
まず、DAGを手動実行します。画面右上の再生ボタンをクリックして実行を開始します。

cloud-composer-sts-gcs-verification-auto-retry_03.png

STS転送が開始され、wait_for_transferタスクで転送完了を待機します。

cloud-composer-sts-gcs-verification-auto-retry_04.png

転送が完了すると、GCS検証グループの check_files_and_branch タスクが実行され、ファイルの存在を確認します。

対象ファイル(.parquet.csv)が存在するため、success_end タスクへ分岐し、DAG全体が正常に完了します。

cloud-composer-sts-gcs-verification-auto-retry_07.png

Graph Viewでは、success_end が実行され、prepare_retry_confretry_transfer がスキップ(ピンク色)されていることが確認できます。

リトライ系:ファイルが見つからない場合

対象ファイルが存在しないケースでの動作確認です。

check_files_and_branch タスクで対象ファイルが見つからないため、prepare_retry_conf タスクへ分岐します。

cloud-composer-sts-gcs-verification-auto-retry_08.png

retry_transfer タスクが実行され、新しいDAG実行がトリガーされます。
この時、confパラメータでリトライ回数(retry_count=1)が次回実行に引き継がれます。

cloud-composer-sts-gcs-verification-auto-retry_09.png

DAG Runsタブを確認すると、新しいDAG実行が作成されていることが分かります。
これは同一DAG実行のリトライではなく、新しいDAG実行です。

リトライを繰り返しても対象ファイルが見つからない場合、最大リトライ回数(3回)に到達すると、DAGは失敗します。

cloud-composer-sts-gcs-verification-auto-retry_10.png

cloud-composer-sts-gcs-verification-auto-retry_12.png

check_files_and_branch タスクで AirflowException が発生し、DAGが失敗します。

Storage Transfer Service 転送履歴の確認

Google Cloud Console の Storage Transfer Service ページから転送履歴を確認すると、リトライにより複数回の転送が実行されていることが分かります。

cloud-composer-sts-gcs-verification-auto-retry_13.png

これにより、前回成功時刻以降のファイルを漏れなく転送する増分転送ファイル検証に基づく自動リトライ が正しく動作していることが確認できました。

まとめ

本記事では、Cloud Composer で Storage Transfer Service 転送後の GCSファイル検証と自動リトライ を実装する方法を紹介しました。

この実装により、STS転送が成功したと報告されていても実際にはファイルが転送されていない場合に、DAG全体を自動的にリトライすることで、後続処理の失敗を防ぐことができます。

今回はprefixと拡張子でファイルをチェックしましたが、ファイル名にタイムスタンプが含まれる場合は、DAG実行日を動的に組み込んだフィルタリング条件を実装することも可能です。

また、さらなる拡張案としては、カスタムセンサーをDeferrable化してリソース効率を向上させる方法や、リトライ発生時やエラー時にSlack通知を送信してアラートを受け取る方法などがありそうですね。

https://dev.classmethod.jp/articles/cloud-composer-deferrable-operators-bigquery/

https://dev.classmethod.jp/articles/cloud-composer-notify-slack/

以上、Cloud Composer でSTS転送後のGCSファイル検証と自動リトライを実装する方法でした。

データパイプラインの信頼性向上にお役立てください!

参考リンク

この記事をシェアする

FacebookHatena blogX

関連記事

Cloud Composer で Storage Transfer Service 転送後のGCSファイル検証と自動リトライを実装する | DevelopersIO