Cloud Composer で Storage Transfer Service 転送後のGCSファイル検証と自動リトライを実装する
はじめに
こんにちは!エノカワです。
Cloud Composer (Apache Airflow) を使用したデータパイプラインでは、Storage Transfer Service (STS) でのファイル転送後に、転送されたファイルが実際に存在することを確認したいケースがあります。
特に、特定のファイル形式(.parquet
や .csv
など)が確実に転送されているかを検証し、問題があった場合は自動的にリトライする仕組みが必要になることがあります。
本記事では、STS転送完了後にGCSファイルの存在を検証し、ファイルが見つからない場合にDAG全体を自動リトライする実装方法について紹介します。
前回までのおさらい
これまでのCloud Composer × Storage Transfer Serviceシリーズでは、以下のような内容を紹介してきました。
標準センサーによる監視
標準センサー(CloudDataTransferServiceJobStatusSensor
)を使ったSTS転送ジョブのステータス監視方法を紹介しました。
カスタムセンサーによる完了待機
より確実なオペレーション完了待機を実現するカスタムセンサーの実装方法を紹介しました。
動的設定更新ワークフロー
転送履歴を解析し、lastModifiedSince
パラメータを動的に更新することで、ファイルの取りこぼしを防ぐ増分転送を実現する方法を紹介しました。
今回は、これらの技術を組み合わせ、さらに 「転送後のファイル検証」 と 「自動リトライ」 という実践的な要件を実装します。
今回やること
解決したい課題
Storage Transfer Service での転送が「成功」と報告されても、実際には特定のファイル形式が転送されていないケースがあります。
例えば:
- 転送元に期待したファイル形式が存在しなかった
- フィルタ条件により特定ファイルが除外された
- ネットワークエラーで一部ファイルの転送が失敗した
このような場合、後続のBigQueryへのロード処理などが失敗してしまいます。
解決アプローチ
以下の3つのアプローチで、この課題を解決します。
- GCSファイル検証: 転送完了後、転送先GCSバケットに特定ファイル形式が存在することを確認
- DAG全体の自動リトライ: ファイルが見つからない場合、TriggerDagRunOperatorを使用してDAG全体を最初から再実行
- リトライ回数管理: confパラメータを使用してDAG実行間でリトライ回数を引き継ぎ
Cloud Composer 環境の準備
検証環境として Cloud Composer 環境を準備します。
以前の記事「Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する」と同様の手順で環境を構築します。
また、Storage Transfer Service のオペレーターやフックを使用するには、apache-airflow-providers-amazon
パッケージのインストールが必要です。
こちらの手順についても、前回の記事の「amazon パッケージをインストール」セクションをご参照ください。
DAGを作成する
以下が、実際に動作検証を行ったDAGです。
前回記事で紹介した動的設定更新を含むSTS転送処理に加えて、GCSファイル検証と自動リトライ機能を実装しています。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
CloudDataTransferServiceRunJobOperator,
CloudDataTransferServiceUpdateJobOperator,
CloudDataTransferServiceListOperationsOperator
)
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import days_ago
from airflow.exceptions import AirflowException
# カスタムセンサーをインポート
from custom_sts_sensor import CustomStorageTransferOperationSensor
# 設定値(注:実際の値に置き換える)
PROJECT_ID = '{プロジェクトID}'
TRANSFER_JOB_ID = 'transferJobs/{転送ジョブID}'
GCS_TARGET_BUCKET = '{GCSバケット名}'
GCS_TARGET_PREFIX = 'data/processed/'
TARGET_FILE_EXTENSIONS = ['.parquet', '.csv']
MAX_RETRY_COUNT = 3
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
'retries': 0, # リトライなし
}
def prepare_update_body(**context):
"""転送履歴から最終成功開始時刻を抽出しジョブ更新用設定を生成"""
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
CloudDataTransferServiceHook
)
# 前タスクから転送履歴を取得
operations = context['task_instance'].xcom_pull(task_ids='sts_transfer_group.list_operations')
# フォールバック時間(履歴が無い場合は24時間前を使用)
fallback_time = (datetime.utcnow() - timedelta(hours=24)).isoformat() + 'Z'
# 最終成功時刻を抽出
if not operations:
last_success_time = fallback_time
else:
# 成功したオペレーションのみを抽出
success_operations = [
op for op in operations
if op.get('metadata', {}).get('status') == 'SUCCESS'
]
if not success_operations:
last_success_time = fallback_time
else:
# startTimeでソートして最新の成功開始時刻を取得
success_operations.sort(
key=lambda x: x.get('metadata', {}).get('startTime', ''),
reverse=True
)
last_success_time = success_operations[0].get('metadata', {}).get('startTime')
# 既存の転送ジョブ設定を取得
hook = CloudDataTransferServiceHook(gcp_conn_id='google_cloud_default')
transfer_job = hook.get_transfer_job(
job_name=TRANSFER_JOB_ID,
project_id=PROJECT_ID
)
# 更新用転送ジョブ設定を作成(変更するフィールドのみ)
updated_transfer_job = {
'transferSpec': {
**transfer_job.get('transferSpec', {}),
'objectConditions': {
**transfer_job.get('transferSpec', {}).get('objectConditions', {}),
'lastModifiedSince': last_success_time
}
}
}
# 更新用リクエストボディを作成
update_body = {
"projectId": PROJECT_ID,
"transferJob": updated_transfer_job
}
return update_body
def check_gcs_files_and_branch(**context):
"""GCSファイル存在チェック + リトライ分岐判定"""
# GCSファイル一覧を取得
file_list = context['task_instance'].xcom_pull(
task_ids='gcs_verification_group.list_gcs_files'
) or []
# 特定拡張子のファイルをフィルタリング
target_files = [
f for f in file_list
if any(f.endswith(ext) for ext in TARGET_FILE_EXTENSIONS)
]
# DAG実行時のconfからリトライ回数を取得
dag_run_conf = context['dag_run'].conf or {}
retry_count = dag_run_conf.get('retry_count', 0)
if target_files:
# ファイル検証成功
return 'gcs_verification_group.success_end'
else:
# ファイルが見つからない場合
if retry_count >= MAX_RETRY_COUNT:
raise AirflowException(
f"GCSファイル検証失敗: 対象ファイルが見つかりません (リトライ上限到達: {retry_count}/{MAX_RETRY_COUNT})"
)
else:
# リトライ回数を保存
context['task_instance'].xcom_push(key='retry_count', value=retry_count + 1)
return 'gcs_verification_group.prepare_retry_conf'
def prepare_retry_conf(**context):
"""DAG再実行時の設定を準備"""
import time
# 前タスクからリトライ回数を取得
retry_count = context['task_instance'].xcom_pull(
task_ids='gcs_verification_group.check_files_and_branch',
key='retry_count'
)
# リトライ回数に応じた待機時間(1回目: 60秒、2回目: 120秒、3回目: 180秒)
wait_seconds = retry_count * 60
time.sleep(wait_seconds)
# confとして渡す設定
conf = {
'retry_count': retry_count,
'previous_run_id': context['dag_run'].run_id,
'triggered_at': datetime.utcnow().isoformat()
}
return conf
# DAG定義
with DAG(
dag_id='sts_gcs_verification_with_retry',
default_args=default_args,
schedule_interval=None, # 手動トリガー
catchup=False,
render_template_as_native_obj=True, # XComデータを適切に処理
) as dag:
# STS転送グループ
with TaskGroup('sts_transfer_group') as sts_group:
# 転送履歴の取得
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id='list_operations',
project_id=PROJECT_ID,
request_filter={'job_names': [TRANSFER_JOB_ID]}
)
# 更新設定の準備
prepare_update = PythonOperator(
task_id='prepare_update_body',
python_callable=prepare_update_body,
provide_context=True
)
# 転送ジョブ設定の更新
update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
task_id='update_transfer_job',
job_name=TRANSFER_JOB_ID,
body="{{ task_instance.xcom_pull(task_ids='sts_transfer_group.prepare_update_body') }}",
project_id=PROJECT_ID
)
# 転送ジョブの実行
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id='run_transfer',
job_name=TRANSFER_JOB_ID,
project_id=PROJECT_ID
)
# 転送完了待機
wait_for_transfer = CustomStorageTransferOperationSensor(
task_id='wait_for_transfer',
operation_name="{{ task_instance.xcom_pull(task_ids='sts_transfer_group.run_transfer')['name'] }}",
project_id=PROJECT_ID,
poke_interval=60, # 60秒ごとにステータスを確認
timeout=60 * 10, # 最大10分間待機
mode='reschedule' # リソース効率のためrescheduleモードを使用
)
# タスク依存関係
list_operations >> prepare_update >> update_transfer_job >> run_transfer >> wait_for_transfer
# GCS検証グループ
with TaskGroup('gcs_verification_group') as verify_group:
# GCSファイル一覧を取得
list_gcs_files = GCSListObjectsOperator(
task_id='list_gcs_files',
bucket=GCS_TARGET_BUCKET,
prefix=GCS_TARGET_PREFIX
)
# ファイル存在チェック + 分岐判定
check_files = BranchPythonOperator(
task_id='check_files_and_branch',
python_callable=check_gcs_files_and_branch
)
# 成功終了
success_end = DummyOperator(
task_id='success_end',
trigger_rule='none_failed_min_one_success'
)
# リトライ設定準備
prepare_retry = PythonOperator(
task_id='prepare_retry_conf',
python_callable=prepare_retry_conf,
trigger_rule='none_failed_min_one_success'
)
# DAG再実行
retry_transfer = TriggerDagRunOperator(
task_id='retry_transfer',
trigger_dag_id=dag.dag_id,
conf="{{ task_instance.xcom_pull(task_ids='gcs_verification_group.prepare_retry_conf') }}",
wait_for_completion=False,
reset_dag_run=True,
trigger_rule='none_failed_min_one_success'
)
# タスク依存関係
list_gcs_files >> check_files >> [success_end, prepare_retry]
prepare_retry >> retry_transfer
# DAG全体のフロー
sts_group >> verify_group
DAGコードの解説
このDAGは2つのTaskGroupで構成され、STS転送からGCSファイル検証まで一連の処理を自動化します。
sts_transfer_group
)
STS転送グループ(-
list_operations: Storage Transfer Service のオペレーション履歴を取得します
CloudDataTransferServiceListOperationsOperator
を使用request_filter
で特定の転送ジョブに絞り込み- 取得した履歴はXComsに自動格納
-
prepare_update_body: 履歴から最終成功開始時刻を抽出し、ジョブ更新用リクエストボディを準備します
- Python関数を使用して成功オペレーションから
startTime
を取得 - 変更フィールドのみを指定して部分更新(前回記事と同じ実装)
- Python関数を使用して成功オペレーションから
-
update_transfer_job: 転送ジョブの設定を更新します
CloudDataTransferServiceUpdateJobOperator
を使用- XComsから動的にボディを取得し、
lastModifiedSince
パラメータを自動更新
-
run_transfer: 更新された設定で転送ジョブを実行します
CloudDataTransferServiceRunJobOperator
を使用- 実行結果(オペレーション名)をXComsに格納
-
wait_for_transfer: 転送オペレーションの完了を待機します
- カスタムセンサーを使用して完了待機
poke_interval=60
で60秒間隔でステータス確認mode='reschedule'
でリソース効率を最適化
gcs_verification_group
)
GCS検証グループ(-
list_gcs_files: GCSバケットからファイル一覧を取得します
GCSListObjectsOperator
を使用- 指定したバケットとプレフィックスのファイルをリスト化
-
check_files_and_branch: ファイル存在チェックと分岐判定を行います
BranchPythonOperator
を使用- 対象ファイル形式が存在すれば
success_end
へ - 存在しなければ
prepare_retry_conf
へ分岐
-
success_end: 検証成功時の終了タスク
DummyOperator
を使用trigger_rule='none_failed_min_one_success'
でBranchPythonOperator後の実行を保証
-
prepare_retry_conf: リトライ用の設定を準備します
- リトライ回数に応じた待機時間を設定(指数バックオフ)
- confパラメータを生成して次回実行に引き継ぎ
-
retry_transfer: DAG全体を再実行します
TriggerDagRunOperator
を使用- 新しいDAG実行を作成(同一DAGのリトライではない)
- confパラメータでリトライ回数を引き継ぎ
カスタムセンサーの活用
前回記事で実装したカスタムセンサー (CustomStorageTransferOperationSensor
) を再利用します。
カスタムセンサー (`CustomStorageTransferOperationSensor`)
"""
シンプルなStorage Transfer Service センサー
指定された転送ジョブのステータスを取得し、実行中であれば完了まで待機します。
"""
from typing import Dict, Sequence
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
CloudDataTransferServiceHook,
)
from airflow.exceptions import AirflowException
class CustomStorageTransferOperationSensor(BaseSensorOperator):
"""
Storage Transfer Service のオペレーション完了を待機するシンプルなセンサー
:param operation_name: 監視するオペレーションの完全な名前
:param project_id: Google Cloud プロジェクトID
:param gcp_conn_id: Google Cloud 接続ID
"""
template_fields: Sequence[str] = ('operation_name',)
def __init__(
self,
*,
operation_name: str,
project_id: str,
gcp_conn_id: str = 'google_cloud_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.operation_name = operation_name
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
def poke(self, context: Dict) -> bool:
hook = CloudDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
)
self.log.info("オペレーション %s の状態を確認中...", self.operation_name)
operation = hook.get_transfer_operation(operation_name=self.operation_name)
self.log.info("オペレーションの応答: %s", operation)
done = operation.get('done', False)
# メタデータからステータスを取得
metadata = operation.get('metadata', {})
status = metadata.get('status')
self.log.info("オペレーションのステータス: %s", status)
if done:
# エラー情報がある場合はエラーとして扱う
if 'error' in operation:
error = operation['error']
err_msg = error.get('message', 'Unknown error')
raise AirflowException(
f"オペレーション {self.operation_name} がエラーで終了しました: {err_msg}"
)
# ステータスがSUCCESSでない場合もエラーとして扱う
if status != 'SUCCESS':
raise AirflowException(
f"オペレーション {self.operation_name} が正常に完了しませんでした。ステータス: {status}"
)
# 明示的なエラーがなく、ステータスもSUCCESSの場合は成功として扱う
self.log.info("オペレーション %s が正常に完了しました", self.operation_name)
return True
self.log.info("オペレーション %s はまだ完了していません。現在のステータス: %s", self.operation_name, status)
return False
実装ポイント
ここでは、実装時に重要となるポイントや、ハマりやすい箇所について解説します。
TaskGroup内での分岐処理
BranchPythonOperatorがTaskGroup内のタスクIDを正しく参照するには、完全修飾名が必要です。
# ❌ TaskGroup名を省略した場合、タスクが見つからずエラーになる
return 'success_end'
# ✅ TaskGroup名を含む完全修飾名で指定する必要がある
return 'gcs_verification_group.success_end'
リトライ回数の管理
confパラメータを使用してDAG実行間でリトライ回数を引き継ぎます:
# リトライ回数の取得
dag_run_conf = context['dag_run'].conf or {}
retry_count = dag_run_conf.get('retry_count', 0)
# 次回実行への引き継ぎ
retry_transfer = TriggerDagRunOperator(
trigger_dag_id=dag.dag_id,
conf={'retry_count': retry_count + 1}
)
TriggerDagRunOperatorによる新規実行
TriggerDagRunOperatorは同一DAG実行のリトライではなく、新しいDAG実行を作成します。
これにより:
- 各リトライが独立したDAG実行として記録される
- 上流タスク(STS転送)から再実行できる
- 実行履歴が明確に追跡できる
実行履歴の例:
実行1: manual__2025-10-13T09:21:36+00:00 (retry_count=0)
├── sts_transfer_group ✅
├── gcs_verification_group
│ └── retry_transfer ✅ → 新DAG実行をトリガー
実行2: retry_2025101309xxxx (retry_count=1) ← 新しい実行
├── sts_transfer_group ✅
├── gcs_verification_group
│ └── success_end ✅
BranchPythonOperator後のtrigger_rule設定
BranchPythonOperatorを使用すると、選択されなかった分岐先のタスクは skipped(スキップ) 状態になります。
そのため、分岐後に実行されるタスクには trigger_rule='none_failed_min_one_success'
を設定する必要があります。
# 成功終了
success_end = DummyOperator(
task_id='success_end',
trigger_rule='none_failed_min_one_success' # ← 必須
)
# リトライ設定準備
prepare_retry = PythonOperator(
task_id='prepare_retry_conf',
python_callable=prepare_retry_conf,
trigger_rule='none_failed_min_one_success' # ← 必須
)
なぜ必要か?
- デフォルトの
trigger_rule
はall_success
(すべての上流タスクが成功) - BranchPythonOperatorでは一方が
success
、もう一方がskipped
になる all_success
では skipped を失敗とみなすため、タスクが実行されないnone_failed_min_one_success
は「失敗がなく、最低1つ成功があれば実行」という条件
この設定により、どちらの分岐を選択しても後続タスクが正しく実行されます。
リトライ前の待機時間
STS転送直後はGCSへのファイル反映に時間がかかる場合があるため、リトライ前に待機時間を設けています。
# リトライ回数に応じた待機時間(1回目: 60秒、2回目: 120秒、3回目: 180秒)
wait_seconds = retry_count * 60
time.sleep(wait_seconds)
注意点:
この実装では time.sleep()
を使用するため、Workerスロットを占有します。数分程度であれば問題ありませんが、より長時間の待機が必要な場合や、Workerリソースを効率的に使いたい場合は、TimeDeltaSensor
の使用などの代替案を検討してください。
動作確認
実際にDAGを実行し、動的設定更新とGCSファイル検証、自動リトライ機能の動作を確認してみました。
DAGのデプロイ
Cloud Composer の Web UI から DAG をデプロイします。
Graph Viewでは、2つのTaskGroup(sts_transfer_group
とgcs_verification_group
)が確認できます。
正常系:ファイルが存在する場合
対象ファイルが存在するケースでの動作確認です。
まず、DAGを手動実行します。画面右上の再生ボタンをクリックして実行を開始します。
STS転送が開始され、wait_for_transfer
タスクで転送完了を待機します。
転送が完了すると、GCS検証グループの check_files_and_branch
タスクが実行され、ファイルの存在を確認します。
対象ファイル(.parquet
や .csv
)が存在するため、success_end
タスクへ分岐し、DAG全体が正常に完了します。
Graph Viewでは、success_end
が実行され、prepare_retry_conf
と retry_transfer
がスキップ(ピンク色)されていることが確認できます。
リトライ系:ファイルが見つからない場合
対象ファイルが存在しないケースでの動作確認です。
check_files_and_branch
タスクで対象ファイルが見つからないため、prepare_retry_conf
タスクへ分岐します。
retry_transfer
タスクが実行され、新しいDAG実行がトリガーされます。
この時、confパラメータでリトライ回数(retry_count=1)が次回実行に引き継がれます。
DAG Runsタブを確認すると、新しいDAG実行が作成されていることが分かります。
これは同一DAG実行のリトライではなく、新しいDAG実行です。
リトライを繰り返しても対象ファイルが見つからない場合、最大リトライ回数(3回)に到達すると、DAGは失敗します。
check_files_and_branch
タスクで AirflowException
が発生し、DAGが失敗します。
Storage Transfer Service 転送履歴の確認
Google Cloud Console の Storage Transfer Service ページから転送履歴を確認すると、リトライにより複数回の転送が実行されていることが分かります。
これにより、前回成功時刻以降のファイルを漏れなく転送する増分転送 と ファイル検証に基づく自動リトライ が正しく動作していることが確認できました。
まとめ
本記事では、Cloud Composer で Storage Transfer Service 転送後の GCSファイル検証と自動リトライ を実装する方法を紹介しました。
この実装により、STS転送が成功したと報告されていても実際にはファイルが転送されていない場合に、DAG全体を自動的にリトライすることで、後続処理の失敗を防ぐことができます。
今回はprefixと拡張子でファイルをチェックしましたが、ファイル名にタイムスタンプが含まれる場合は、DAG実行日を動的に組み込んだフィルタリング条件を実装することも可能です。
また、さらなる拡張案としては、カスタムセンサーをDeferrable化してリソース効率を向上させる方法や、リトライ発生時やエラー時にSlack通知を送信してアラートを受け取る方法などがありそうですね。
以上、Cloud Composer でSTS転送後のGCSファイル検証と自動リトライを実装する方法でした。
データパイプラインの信頼性向上にお役立てください!
参考リンク
- Google Cloud Transfer Service Operators — apache-airflow-providers-google Documentation
- Google Cloud Storage Transfer Service API ドキュメント
- Apache Airflow XComs ドキュメント
- Cloud Composer 環境を作成する
- DAG のタスクをグループ化する | Cloud Composer | Google Cloud
- Cloud Composer で TriggerDagRunOperator を使ってDAGから別のDAGを呼び出す | DevelopersIO