Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する

Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する

Clock Icon2025.06.09

はじめに

こんにちは!エノカワです。

Cloud Composer (Apache Airflow) を使用したデータパイプラインでは、Storage Transfer Service (STS) を活用してクラウドストレージ間のデータ転送を行うことが多くあります。
また、転送完了後に後続処理を実行するには、転送オペレーションの完了を待機する必要があります。

本記事では、Storage Transfer Service のオペレーション完了を待機するためのカスタムセンサーの実装と使用方法について紹介します。

前回のセンサー実装との違い

以前、「Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい」という記事で、Airflow標準のCloudDataTransferServiceJobStatusSensorを使用した監視方法を紹介しました。

https://dev.classmethod.jp/articles/cloud-composer-storage-transfer-service-status-sensor/

しかし、標準のCloudDataTransferServiceJobStatusSensor「ジョブに属する少なくとも1つの操作が期待どおりのステータスになるまで待機する」 センサーであり、「実行中のジョブの完了を待機する」 ものではありませんでした。

当初、私自身もこの点を誤解しており、実際のワークフローではジョブが完全に終了する前に後続タスクが実行されてしまうケースが発生していました。

そこで今回は、この課題を解決するために「転送オペレーション自体の完了を確実に待機する」カスタムセンサーを自前で実装しました。

このカスタムセンサーでは、個別の転送オペレーション(transferOperations)を直接監視し、そのステータスを追跡します。

Cloud Composer 環境の準備

検証環境として Cloud Composer 環境を準備します。
前回の記事「Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい」と同様の手順で環境を構築します。

cloud-composer-custom-sensor-storage-transfer-service_01

amazon パッケージをインストール

Google Cloud の Storage Transfer Service を使用するには、apache-airflow-providers-amazonパッケージが必要です。

これは少し意外かもしれませんが、カスタムセンサーで使用するCloudDataTransferServiceHookクラスが内部的にAmazon AWS関連の機能に依存しているためです。
Google Cloud の機能だけを使用する場合でも、内部的な依存関係により、amazon パッケージがないとエラーが発生します。

cloud-composer-custom-sensor-storage-transfer-service_02

エラーメッセージ

Broken DAG: [/home/airflow/gcs/dags/storage_transfer_monitor.py]
Traceback (most recent call last):
  File "/home/airflow/gcs/dags/storage_transfer_monitor.py", line 6, in <module>
    from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceRunJobOperator
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py", line 29, in <module>
    from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
ModuleNotFoundError: No module named 'airflow.providers.amazon'

Cloud Composer環境では、管理コンソールの[PyPIパッケージ]メニューでこのパッケージをインストールするか、gcloud composer environments updateコマンドを使用してインストールできます。

今回は管理コンソールからインストールしました。

Google Cloud Consoleから該当のComposer環境を開き、[環境の構成] > [PyPIパッケージ] タブに移動します。
パッケージ名にapache-airflow-providers-amazonを入力し、「保存」ボタンをクリックするだけでインストールできます。

cloud-composer-custom-sensor-storage-transfer-service_03

カスタムセンサーを実装する

以下は、カスタムセンサーの実装例です。

custom_sts_sensor.py
from typing import Dict, Sequence

from airflow.sensors.base import BaseSensorOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
    CloudDataTransferServiceHook,
)
from airflow.exceptions import AirflowException


class CustomStorageTransferOperationSensor(BaseSensorOperator):
    """
    Storage Transfer Service のオペレーション完了を待機するシンプルなセンサー
    :param operation_name: 監視するオペレーションの完全な名前
    :param project_id: Google Cloud プロジェクトID
    :param gcp_conn_id: Google Cloud 接続ID
    """

    template_fields: Sequence[str] = ('operation_name',)

    def __init__(
        self,
        *,
        operation_name: str,
        project_id: str,
        gcp_conn_id: str = 'google_cloud_default',
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.operation_name = operation_name
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id

    def poke(self, context: Dict) -> bool:
        hook = CloudDataTransferServiceHook(
            gcp_conn_id=self.gcp_conn_id,
        )

        self.log.info("オペレーション %s の状態を確認中...", self.operation_name)

        operation = hook.get_transfer_operation(operation_name=self.operation_name)
        self.log.info("オペレーションの応答: %s", operation)

        done = operation.get('done', False)

        # メタデータからステータスを取得
        metadata = operation.get('metadata', {})
        status = metadata.get('status')
        self.log.info("オペレーションのステータス: %s", status)

        if done:
            # エラー情報がある場合はエラーとして扱う
            if 'error' in operation:
                error = operation['error']
                err_msg = error.get('message', 'Unknown error')
                raise AirflowException(
                    f"オペレーション {self.operation_name} がエラーで終了しました: {err_msg}"
                )

            # ステータスがSUCCESSでない場合もエラーとして扱う
            if status != 'SUCCESS':
                raise AirflowException(
                    f"オペレーション {self.operation_name} が正常に完了しませんでした。ステータス: {status}"
                )

            # 明示的なエラーがなく、ステータスもSUCCESSの場合は成功として扱う
            self.log.info("オペレーション %s が正常に完了しました", self.operation_name)
            return True

        self.log.info("オペレーション %s はまだ完了していません。現在のステータス: %s", self.operation_name, status)
        return False

実装のポイント

  1. ステータスの取得

    • オペレーションの応答からmetadata.statusを取得して処理
    • 応答の構造はself.log.infoでログ出力して確認
  2. エラー処理

    • オペレーションが完了(done=True)しても、以下の場合はエラーとして扱う
      • errorキーが存在する場合(エラーメッセージを表示)
      • ステータスがSUCCESSでない場合(PAUSEDFAILEDなど)
  3. 待機ロジック

    • オペレーションが完了するまでFalseを返し、センサーはポーリングを継続
    • 成功時のみTrueを返して後続タスクに進む

DAGを作成する

以下は、実装したDAGの例です。

storage_transfer_monitor.py
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceRunJobOperator
from airflow.utils.dates import days_ago

from custom_sts_sensor import CustomStorageTransferOperationSensor

# 設定値(注:実際の値に置き換える)
PROJECT_ID = '{プロジェクトID}'
TRANSFER_JOB_ID = 'transferJobs/{転送ジョブID}'

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
    'retries': 0,  # リトライなし
}

# DAG定義
with DAG(
    dag_id='storage_transfer_monitor',
    default_args=default_args,
    schedule_interval=None,  # 手動トリガー
    catchup=False,
) as dag:

    # 転送ジョブを実行 (標準オペレーターを使用)
    run_transfer = CloudDataTransferServiceRunJobOperator(
        task_id='run_transfer',
        job_name=TRANSFER_JOB_ID,
        project_id=PROJECT_ID,
    )

    # 転送オペレーションの完了を待機
    wait_for_transfer = CustomStorageTransferOperationSensor(
        task_id='wait_for_transfer',
        operation_name="{{ task_instance.xcom_pull(task_ids='run_transfer')['name'] }}",
        project_id=PROJECT_ID,
        poke_interval=60,  # 60秒ごとに確認
        timeout=60 * 10,     # 最大10分間待機
        mode='reschedule'  # ポーリング中はワーカースロットを解放
    )

    # タスク間の依存関係を設定
    run_transfer >> wait_for_transfer

DAGコードの解説

このDAGは2つの主要なタスクで構成されています。

  1. run_transfer: Storage Transfer Service の既存ジョブを実行します

    • CloudDataTransferServiceRunJobOperatorを使用(標準オペレーター)
    • ジョブIDを指定して実行
    • 実行結果(オペレーション名)をXComsに保存
  2. wait_for_transfer: 転送オペレーションの完了を待機します

    • CustomStorageTransferOperationSensorを使用(カスタムセンサー)
    • XComsから取得したオペレーション名を監視
    • mode='reschedule'により、ポーリング中のワーカーリソース使用を最適化

センサーの動作を制御する重要なパラメータとして、以下の3つがあります。

  • poke_interval: ステータス確認の間隔(秒)
  • timeout: タイムアウト時間(秒)
  • mode: センサーの動作モード('poke' または 'reschedule'

XComsの活用

Airflowでは、タスク間でデータを受け渡す仕組みとして XComs(Cross-Communications)を使用します。

本実装では、この機能を以下のように活用しています。

  1. CloudDataTransferServiceRunJobOperatorは実行結果としてオペレーション名をXComsに自動的に格納
  2. CustomStorageTransferOperationSensorはテンプレート機能を使って、このオペレーション名を取得
operation_name="{{ task_instance.xcom_pull(task_ids='run_transfer')['name'] }}"

XComsに格納された値は、Airflow Web UIの [XCom] タブから確認できます。
これにより、以下のことが可能になります。

  • 実行されたオペレーションの詳細を後から確認
  • デバッグ時に実際のオペレーション名を取得
  • 他のタスクでも同じオペレーション情報を利用

見切れてしまっていますが、下記画面の赤枠で囲った箇所が XComs に格納されている値です。

cloud-composer-custom-sensor-storage-transfer-service_04

転送ジョブコンソール画面へのリンク

ちなみに、CloudDataTransferServiceRunJobOperatorのタスクの詳細画面を開くと、[Cloud Storage Transfer Job] ボタンが表示されています。
クリックすると対象の Storage Transfer Service 転送ジョブのコンソール画面に移動できます。

転送の詳細な進捗状況やエラーが発生した場合の詳細情報など、Airflow UIでは表示されない情報も確認できます。便利!!

cloud-composer-custom-sensor-storage-transfer-service_05

動作確認

実際に様々なケースでDAGを実行し、センサーの挙動を確認しました。
ここでは、実際のユースケースに応じた5つの代表的なシナリオを紹介します。

1. 転送ジョブ正常終了の場合

転送ジョブが正常に完了した場合、センサーは完了を検知してタスクを成功させます。

まず、DAGを手動で実行します。画面右上の「右矢印」ボタン(上の画像の赤枠で囲まれた部分)をクリックして、DAGを実行開始します。

cloud-composer-custom-sensor-storage-transfer-service_06

DAGが実行されると、run_transferタスクが完了した後、wait_for_transferタスクがup_for_rescheduleステータスになります。

cloud-composer-custom-sensor-storage-transfer-service_07

これはセンサーが転送オペレーションの完了を待機している状態です。
センサーはmode='reschedule'パラメータにより、ポーリング間隔の間はワーカースロットを解放しているため、リソースを効率的に使用できます。

転送ジョブが完了すると、センサーはそのステータスを検知し、wait_for_transferタスクはsuccessステータスになります。
これにより、DAG全体も正常に完了します。

cloud-composer-custom-sensor-storage-transfer-service_08

タスクのログを確認すると、センサーが定期的にオペレーションのステータスをチェックしていることがわかります。

cloud-composer-custom-sensor-storage-transfer-service_09

初期状態ではIN_PROGRESSステータスを検知し、最終的にSUCCESSステータスを検知して完了しています。

ログ抜粋:

[2025-06-06 11:19:48.200259+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 の状態を確認中...
[2025-06-06 11:19:49.489181+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:19:49.489267+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 はまだ完了していません。現在のステータス: IN_PROGRESS
[2025-06-06 11:20:52.148838+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 の状態を確認中...
[2025-06-06 11:20:53.280713+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: SUCCESS
[2025-06-06 11:20:53.281244+00:00] {custom_sts_sensor.py:72} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 が正常に完了しました

DAGのガントチャート画面でも、wait_for_transferタスクが転送ジョブの完了まで待機していた時間を視覚的に確認できます。

cloud-composer-custom-sensor-storage-transfer-service_10

これにより、ジョブの実行時間や各タスクの所要時間を把握することができ、パフォーマンス分析に役立ちます。

2. 転送ジョブ異常終了の場合

エラーが発生した場合、センサーはエラー情報を取得し、例外を発生させてタスクを失敗させます。

今回は、一時的に転送先のバケットから Storage Transfer Service サービスアカウントの権限を剥奪することで意図的に転送ジョブを失敗させました。

cloud-composer-custom-sensor-storage-transfer-service_11

この画面では、wait_for_transferタスクがfailedステータスになっており、DAG全体も失敗していることがわかります。

ログを確認すると、センサーがオペレーションのFAILEDステータスを検知し、例外をスローしていることがわかります。

[2025-06-06 11:27:08.957058+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11892156487865896245 の状態を確認中...
[2025-06-06 11:27:10.032880+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: FAILED
[2025-06-06 11:27:10.280123+00:00] {taskinstance.py:3315} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 309, in execute
    raise e
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 289, in execute
    poke_return = self.poke(context)
                  ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/gcs/dags/custom_sts_sensor.py", line 67, in poke
    raise AirflowException(
airflow.exceptions.AirflowException: オペレーション transferOperations/transferJobs-10749929329704042585-11892156487865896245 が正常に完了しませんでした。ステータス: FAILED

3. 転送ジョブ一時停止の場合

転送ジョブが一時停止された場合の挙動も確認しました。

初めに、wait_for_transferタスクがup_for_rescheduleステータスで待機している状態が示されています。
この時点では、転送ジョブは進行中です。

cloud-composer-custom-sensor-storage-transfer-service_12

次に、Storage Transfer Serviceのコンソール画面で転送ジョブを一時停止しました。
画面上では、転送ジョブのステータスが一時停止になっています。

cloud-composer-custom-sensor-storage-transfer-service_13

しばらくして、[実行を再開] ボタンをクリックして転送を再開しました。
この操作により、一時停止状態だった転送ジョブが処理を再開します。

転送ジョブが再開されると、センサーはそのステータスを検知し、wait_for_transferタスクはsuccessステータスになります。
これにより、DAG全体も正常に完了します。

cloud-composer-custom-sensor-storage-transfer-service_14

ログを確認すると、センサーが転送ジョブの再開を検知し、wait_for_transferタスクがsuccessステータスになっていることがわかります。

[2025-06-06 11:35:55.370715+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:35:56.636724+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:35:56.637014+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: IN_PROGRESS
[2025-06-06 11:36:58.896430+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:37:00.062668+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:37:00.063206+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:38:02.372056+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:38:03.393090+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:38:03.393131+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:39:07.003992+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:39:08.231766+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:39:08.231830+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:40:10.492929+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:40:11.506412+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: SUCCESS
[2025-06-06 11:40:11.506716+00:00] {custom_sts_sensor.py:72} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 が正常に完了しました

4. 転送ジョブ中止の場合

ジョブが中止された場合も、センサーは適切にステータスを検知します。

こちらは Storage Transfer Service のコンソール画面で、転送ジョブのステータスが中止になっています。
転送を一時停止中に [実行をキャンセル] ボタンをクリックして意図的に中止した状態です。

cloud-composer-custom-sensor-storage-transfer-service_15

中止されたジョブは、wait_for_transferタスクがfailedステータスとなり、DAG全体も失敗しています。
センサーは中止(ABORTED)ステータスを適切に検知し、エラーとして処理しています。

cloud-composer-custom-sensor-storage-transfer-service_16

ログを見ると、センサーがABORTEDステータスを検知し、例外をスローしていることがわかります。

[2025-06-06 11:42:38.602058+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 の状態を確認中...
[2025-06-06 11:42:39.824382+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:42:39.824483+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 はまだ完了していません。現在のステータス: IN_PROGRESS
[2025-06-06 11:43:41.929437+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 の状態を確認中...
[2025-06-06 11:43:42.513279+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: ABORTED
[2025-06-06 11:43:42.677410+00:00] {taskinstance.py:3315} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 309, in execute
    raise e
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 289, in execute
    poke_return = self.poke(context)
                  ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/gcs/dags/custom_sts_sensor.py", line 67, in poke
    raise AirflowException(
airflow.exceptions.AirflowException: オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 が正常に完了しませんでした。ステータス: ABORTED

5. タイムアウトの場合

最後に、タイムアウトの動作も確認しました。
センサーのタイムアウト値(timeout=60*10)を10分に設定しており、転送ジョブがこの時間内に完了しない場合の挙動を検証しています。

cloud-composer-custom-sensor-storage-transfer-service_17

この画面では、wait_for_transferタスクがfailedステータスになっており、DAG全体も失敗しています。
実行時間が約10分に達していることがわかります。

ログを見ると、センサーが定期的にステータスチェックを行い、最終的にタイムアウトに達したことがわかります。

[2025-06-06 11:46:03.641631+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 の状態を確認中...
[2025-06-06 11:46:04.866877+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:46:04.867465+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 はまだ完了していません。現在のステータス: IN_PROGRESS
...(中略)...
[2025-06-06 11:56:39.676275+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 の状態を確認中...
[2025-06-06 11:56:40.849634+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:56:40.849657+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:56:40.852550+00:00] {taskinstance.py:3315} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 326, in execute
    raise AirflowSensorTimeout(message)
airflow.exceptions.AirflowSensorTimeout: Sensor has timed out; run duration of 637.910198 seconds exceeds the specified timeout of 600.0.

まとめ

本記事では、Cloud Composerで Storage Transfer Serviceの転送オペレーション完了を待機するカスタムセンサーの実装方法 をご紹介しました。
このカスタムセンサーを導入することで、Airflow標準のセンサーでは難しかった「実行中の転送ジョブの確実な完了待機」が可能になり、データパイプラインの信頼性と安定性を高めることができます。

特に、次のようなユースケースでカスタムセンサーが活用できそうです。

  • STSでのデータ転送後に後続処理を実行したい
  • 転送ステータス(成功、失敗、一時停止、中止など)に応じて柔軟なワークフローを構築したい
  • 長時間の転送ジョブも効率的かつ確実に監視したい

実装にあたっては、apache-airflow-providers-amazonパッケージをインストールする必要がある点や、適切なタイムアウト設定、エラーハンドリングなど、いくつかの留意点があります。
しかし、これらの点に留意すれば、Cloud Composer で効率的かつ堅牢な Storage Transfer Service 監視ワークフローを構築することが可能です。

今後は、Airflow 2.2以降で導入された「Deferrable オペレーター」を使用して、より効率的なセンサーの実装も検討してみたいです。
これにより、さらにワーカーリソースを効率的に使用することが可能になります。この点については、機会があれば別の記事でご紹介したいと思います。

参考リンク

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.