Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する
はじめに
こんにちは!エノカワです。
Cloud Composer (Apache Airflow) を使用したデータパイプラインでは、Storage Transfer Service (STS) を活用してクラウドストレージ間のデータ転送を行うことが多くあります。
また、転送完了後に後続処理を実行するには、転送オペレーションの完了を待機する必要があります。
本記事では、Storage Transfer Service のオペレーション完了を待機するためのカスタムセンサーの実装と使用方法について紹介します。
前回のセンサー実装との違い
以前、「Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい」という記事で、Airflow標準のCloudDataTransferServiceJobStatusSensor
を使用した監視方法を紹介しました。
しかし、標準のCloudDataTransferServiceJobStatusSensor
は 「ジョブに属する少なくとも1つの操作が期待どおりのステータスになるまで待機する」 センサーであり、「実行中のジョブの完了を待機する」 ものではありませんでした。
当初、私自身もこの点を誤解しており、実際のワークフローではジョブが完全に終了する前に後続タスクが実行されてしまうケースが発生していました。
そこで今回は、この課題を解決するために「転送オペレーション自体の完了を確実に待機する」カスタムセンサーを自前で実装しました。
このカスタムセンサーでは、個別の転送オペレーション(transferOperations)を直接監視し、そのステータスを追跡します。
Cloud Composer 環境の準備
検証環境として Cloud Composer 環境を準備します。
前回の記事「Cloud Composer で Storage Transfer Service の転送ジョブのステータスを監視したい」と同様の手順で環境を構築します。
amazon パッケージをインストール
Google Cloud の Storage Transfer Service を使用するには、apache-airflow-providers-amazon
パッケージが必要です。
これは少し意外かもしれませんが、カスタムセンサーで使用するCloudDataTransferServiceHook
クラスが内部的にAmazon AWS関連の機能に依存しているためです。
Google Cloud の機能だけを使用する場合でも、内部的な依存関係により、amazon パッケージがないとエラーが発生します。
エラーメッセージ
Broken DAG: [/home/airflow/gcs/dags/storage_transfer_monitor.py]
Traceback (most recent call last):
File "/home/airflow/gcs/dags/storage_transfer_monitor.py", line 6, in <module>
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceRunJobOperator
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py", line 29, in <module>
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
ModuleNotFoundError: No module named 'airflow.providers.amazon'
Cloud Composer環境では、管理コンソールの[PyPIパッケージ]メニューでこのパッケージをインストールするか、gcloud composer environments update
コマンドを使用してインストールできます。
今回は管理コンソールからインストールしました。
Google Cloud Consoleから該当のComposer環境を開き、[環境の構成] > [PyPIパッケージ] タブに移動します。
パッケージ名にapache-airflow-providers-amazon
を入力し、「保存」ボタンをクリックするだけでインストールできます。
カスタムセンサーを実装する
以下は、カスタムセンサーの実装例です。
from typing import Dict, Sequence
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
CloudDataTransferServiceHook,
)
from airflow.exceptions import AirflowException
class CustomStorageTransferOperationSensor(BaseSensorOperator):
"""
Storage Transfer Service のオペレーション完了を待機するシンプルなセンサー
:param operation_name: 監視するオペレーションの完全な名前
:param project_id: Google Cloud プロジェクトID
:param gcp_conn_id: Google Cloud 接続ID
"""
template_fields: Sequence[str] = ('operation_name',)
def __init__(
self,
*,
operation_name: str,
project_id: str,
gcp_conn_id: str = 'google_cloud_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.operation_name = operation_name
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
def poke(self, context: Dict) -> bool:
hook = CloudDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
)
self.log.info("オペレーション %s の状態を確認中...", self.operation_name)
operation = hook.get_transfer_operation(operation_name=self.operation_name)
self.log.info("オペレーションの応答: %s", operation)
done = operation.get('done', False)
# メタデータからステータスを取得
metadata = operation.get('metadata', {})
status = metadata.get('status')
self.log.info("オペレーションのステータス: %s", status)
if done:
# エラー情報がある場合はエラーとして扱う
if 'error' in operation:
error = operation['error']
err_msg = error.get('message', 'Unknown error')
raise AirflowException(
f"オペレーション {self.operation_name} がエラーで終了しました: {err_msg}"
)
# ステータスがSUCCESSでない場合もエラーとして扱う
if status != 'SUCCESS':
raise AirflowException(
f"オペレーション {self.operation_name} が正常に完了しませんでした。ステータス: {status}"
)
# 明示的なエラーがなく、ステータスもSUCCESSの場合は成功として扱う
self.log.info("オペレーション %s が正常に完了しました", self.operation_name)
return True
self.log.info("オペレーション %s はまだ完了していません。現在のステータス: %s", self.operation_name, status)
return False
実装のポイント
-
ステータスの取得
- オペレーションの応答から
metadata.status
を取得して処理 - 応答の構造は
self.log.info
でログ出力して確認
- オペレーションの応答から
-
エラー処理
- オペレーションが完了(
done=True
)しても、以下の場合はエラーとして扱うerror
キーが存在する場合(エラーメッセージを表示)- ステータスが
SUCCESS
でない場合(PAUSED
やFAILED
など)
- オペレーションが完了(
-
待機ロジック
- オペレーションが完了するまで
False
を返し、センサーはポーリングを継続 - 成功時のみ
True
を返して後続タスクに進む
- オペレーションが完了するまで
DAGを作成する
以下は、実装したDAGの例です。
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import CloudDataTransferServiceRunJobOperator
from airflow.utils.dates import days_ago
from custom_sts_sensor import CustomStorageTransferOperationSensor
# 設定値(注:実際の値に置き換える)
PROJECT_ID = '{プロジェクトID}'
TRANSFER_JOB_ID = 'transferJobs/{転送ジョブID}'
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
'retries': 0, # リトライなし
}
# DAG定義
with DAG(
dag_id='storage_transfer_monitor',
default_args=default_args,
schedule_interval=None, # 手動トリガー
catchup=False,
) as dag:
# 転送ジョブを実行 (標準オペレーターを使用)
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id='run_transfer',
job_name=TRANSFER_JOB_ID,
project_id=PROJECT_ID,
)
# 転送オペレーションの完了を待機
wait_for_transfer = CustomStorageTransferOperationSensor(
task_id='wait_for_transfer',
operation_name="{{ task_instance.xcom_pull(task_ids='run_transfer')['name'] }}",
project_id=PROJECT_ID,
poke_interval=60, # 60秒ごとに確認
timeout=60 * 10, # 最大10分間待機
mode='reschedule' # ポーリング中はワーカースロットを解放
)
# タスク間の依存関係を設定
run_transfer >> wait_for_transfer
DAGコードの解説
このDAGは2つの主要なタスクで構成されています。
-
run_transfer: Storage Transfer Service の既存ジョブを実行します
CloudDataTransferServiceRunJobOperator
を使用(標準オペレーター)- ジョブIDを指定して実行
- 実行結果(オペレーション名)をXComsに保存
-
wait_for_transfer: 転送オペレーションの完了を待機します
CustomStorageTransferOperationSensor
を使用(カスタムセンサー)- XComsから取得したオペレーション名を監視
mode='reschedule'
により、ポーリング中のワーカーリソース使用を最適化
センサーの動作を制御する重要なパラメータとして、以下の3つがあります。
poke_interval
: ステータス確認の間隔(秒)timeout
: タイムアウト時間(秒)mode
: センサーの動作モード('poke' または 'reschedule')
XComsの活用
Airflowでは、タスク間でデータを受け渡す仕組みとして XComs(Cross-Communications)を使用します。
本実装では、この機能を以下のように活用しています。
CloudDataTransferServiceRunJobOperator
は実行結果としてオペレーション名をXComsに自動的に格納CustomStorageTransferOperationSensor
はテンプレート機能を使って、このオペレーション名を取得
operation_name="{{ task_instance.xcom_pull(task_ids='run_transfer')['name'] }}"
XComsに格納された値は、Airflow Web UIの [XCom] タブから確認できます。
これにより、以下のことが可能になります。
- 実行されたオペレーションの詳細を後から確認
- デバッグ時に実際のオペレーション名を取得
- 他のタスクでも同じオペレーション情報を利用
見切れてしまっていますが、下記画面の赤枠で囲った箇所が XComs に格納されている値です。
転送ジョブコンソール画面へのリンク
ちなみに、CloudDataTransferServiceRunJobOperator
のタスクの詳細画面を開くと、[Cloud Storage Transfer Job] ボタンが表示されています。
クリックすると対象の Storage Transfer Service 転送ジョブのコンソール画面に移動できます。
転送の詳細な進捗状況やエラーが発生した場合の詳細情報など、Airflow UIでは表示されない情報も確認できます。便利!!
動作確認
実際に様々なケースでDAGを実行し、センサーの挙動を確認しました。
ここでは、実際のユースケースに応じた5つの代表的なシナリオを紹介します。
1. 転送ジョブ正常終了の場合
転送ジョブが正常に完了した場合、センサーは完了を検知してタスクを成功させます。
まず、DAGを手動で実行します。画面右上の「右矢印」ボタン(上の画像の赤枠で囲まれた部分)をクリックして、DAGを実行開始します。
DAGが実行されると、run_transfer
タスクが完了した後、wait_for_transfer
タスクがup_for_reschedule
ステータスになります。
これはセンサーが転送オペレーションの完了を待機している状態です。
センサーはmode='reschedule'
パラメータにより、ポーリング間隔の間はワーカースロットを解放しているため、リソースを効率的に使用できます。
転送ジョブが完了すると、センサーはそのステータスを検知し、wait_for_transfer
タスクはsuccess
ステータスになります。
これにより、DAG全体も正常に完了します。
タスクのログを確認すると、センサーが定期的にオペレーションのステータスをチェックしていることがわかります。
初期状態ではIN_PROGRESS
ステータスを検知し、最終的にSUCCESS
ステータスを検知して完了しています。
ログ抜粋:
[2025-06-06 11:19:48.200259+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 の状態を確認中...
[2025-06-06 11:19:49.489181+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:19:49.489267+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 はまだ完了していません。現在のステータス: IN_PROGRESS
[2025-06-06 11:20:52.148838+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 の状態を確認中...
[2025-06-06 11:20:53.280713+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: SUCCESS
[2025-06-06 11:20:53.281244+00:00] {custom_sts_sensor.py:72} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11436594877008165785 が正常に完了しました
DAGのガントチャート画面でも、wait_for_transfer
タスクが転送ジョブの完了まで待機していた時間を視覚的に確認できます。
これにより、ジョブの実行時間や各タスクの所要時間を把握することができ、パフォーマンス分析に役立ちます。
2. 転送ジョブ異常終了の場合
エラーが発生した場合、センサーはエラー情報を取得し、例外を発生させてタスクを失敗させます。
今回は、一時的に転送先のバケットから Storage Transfer Service サービスアカウントの権限を剥奪することで意図的に転送ジョブを失敗させました。
この画面では、wait_for_transfer
タスクがfailed
ステータスになっており、DAG全体も失敗していることがわかります。
ログを確認すると、センサーがオペレーションのFAILED
ステータスを検知し、例外をスローしていることがわかります。
[2025-06-06 11:27:08.957058+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-11892156487865896245 の状態を確認中...
[2025-06-06 11:27:10.032880+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: FAILED
[2025-06-06 11:27:10.280123+00:00] {taskinstance.py:3315} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 309, in execute
raise e
File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 289, in execute
poke_return = self.poke(context)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/gcs/dags/custom_sts_sensor.py", line 67, in poke
raise AirflowException(
airflow.exceptions.AirflowException: オペレーション transferOperations/transferJobs-10749929329704042585-11892156487865896245 が正常に完了しませんでした。ステータス: FAILED
3. 転送ジョブ一時停止の場合
転送ジョブが一時停止された場合の挙動も確認しました。
初めに、wait_for_transfer
タスクがup_for_reschedule
ステータスで待機している状態が示されています。
この時点では、転送ジョブは進行中です。
次に、Storage Transfer Serviceのコンソール画面で転送ジョブを一時停止しました。
画面上では、転送ジョブのステータスが一時停止になっています。
しばらくして、[実行を再開] ボタンをクリックして転送を再開しました。
この操作により、一時停止状態だった転送ジョブが処理を再開します。
転送ジョブが再開されると、センサーはそのステータスを検知し、wait_for_transfer
タスクはsuccess
ステータスになります。
これにより、DAG全体も正常に完了します。
ログを確認すると、センサーが転送ジョブの再開を検知し、wait_for_transfer
タスクがsuccess
ステータスになっていることがわかります。
[2025-06-06 11:35:55.370715+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:35:56.636724+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:35:56.637014+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: IN_PROGRESS
[2025-06-06 11:36:58.896430+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:37:00.062668+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:37:00.063206+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:38:02.372056+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:38:03.393090+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:38:03.393131+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:39:07.003992+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:39:08.231766+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:39:08.231830+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:40:10.492929+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 の状態を確認中...
[2025-06-06 11:40:11.506412+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: SUCCESS
[2025-06-06 11:40:11.506716+00:00] {custom_sts_sensor.py:72} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-10886834376507253781 が正常に完了しました
4. 転送ジョブ中止の場合
ジョブが中止された場合も、センサーは適切にステータスを検知します。
こちらは Storage Transfer Service のコンソール画面で、転送ジョブのステータスが中止になっています。
転送を一時停止中に [実行をキャンセル] ボタンをクリックして意図的に中止した状態です。
中止されたジョブは、wait_for_transfer
タスクがfailed
ステータスとなり、DAG全体も失敗しています。
センサーは中止(ABORTED
)ステータスを適切に検知し、エラーとして処理しています。
ログを見ると、センサーがABORTED
ステータスを検知し、例外をスローしていることがわかります。
[2025-06-06 11:42:38.602058+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 の状態を確認中...
[2025-06-06 11:42:39.824382+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:42:39.824483+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 はまだ完了していません。現在のステータス: IN_PROGRESS
[2025-06-06 11:43:41.929437+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 の状態を確認中...
[2025-06-06 11:43:42.513279+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: ABORTED
[2025-06-06 11:43:42.677410+00:00] {taskinstance.py:3315} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 309, in execute
raise e
File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 289, in execute
poke_return = self.poke(context)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/gcs/dags/custom_sts_sensor.py", line 67, in poke
raise AirflowException(
airflow.exceptions.AirflowException: オペレーション transferOperations/transferJobs-10749929329704042585-8886100068150818521 が正常に完了しませんでした。ステータス: ABORTED
5. タイムアウトの場合
最後に、タイムアウトの動作も確認しました。
センサーのタイムアウト値(timeout=60*10
)を10分に設定しており、転送ジョブがこの時間内に完了しない場合の挙動を検証しています。
この画面では、wait_for_transfer
タスクがfailed
ステータスになっており、DAG全体も失敗しています。
実行時間が約10分に達していることがわかります。
ログを見ると、センサーが定期的にステータスチェックを行い、最終的にタイムアウトに達したことがわかります。
[2025-06-06 11:46:03.641631+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 の状態を確認中...
[2025-06-06 11:46:04.866877+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: IN_PROGRESS
[2025-06-06 11:46:04.867465+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 はまだ完了していません。現在のステータス: IN_PROGRESS
...(中略)...
[2025-06-06 11:56:39.676275+00:00] {custom_sts_sensor.py:44} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 の状態を確認中...
[2025-06-06 11:56:40.849634+00:00] {custom_sts_sensor.py:54} INFO - オペレーションのステータス: PAUSED
[2025-06-06 11:56:40.849657+00:00] {custom_sts_sensor.py:75} INFO - オペレーション transferOperations/transferJobs-10749929329704042585-17816045825804983766 はまだ完了していません。現在のステータス: PAUSED
[2025-06-06 11:56:40.852550+00:00] {taskinstance.py:3315} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/sensors/base.py", line 326, in execute
raise AirflowSensorTimeout(message)
airflow.exceptions.AirflowSensorTimeout: Sensor has timed out; run duration of 637.910198 seconds exceeds the specified timeout of 600.0.
まとめ
本記事では、Cloud Composerで Storage Transfer Serviceの転送オペレーション完了を待機するカスタムセンサーの実装方法 をご紹介しました。
このカスタムセンサーを導入することで、Airflow標準のセンサーでは難しかった「実行中の転送ジョブの確実な完了待機」が可能になり、データパイプラインの信頼性と安定性を高めることができます。
特に、次のようなユースケースでカスタムセンサーが活用できそうです。
- STSでのデータ転送後に後続処理を実行したい
- 転送ステータス(成功、失敗、一時停止、中止など)に応じて柔軟なワークフローを構築したい
- 長時間の転送ジョブも効率的かつ確実に監視したい
実装にあたっては、apache-airflow-providers-amazon
パッケージをインストールする必要がある点や、適切なタイムアウト設定、エラーハンドリングなど、いくつかの留意点があります。
しかし、これらの点に留意すれば、Cloud Composer で効率的かつ堅牢な Storage Transfer Service 監視ワークフローを構築することが可能です。
今後は、Airflow 2.2以降で導入された「Deferrable オペレーター」を使用して、より効率的なセンサーの実装も検討してみたいです。
これにより、さらにワーカーリソースを効率的に使用することが可能になります。この点については、機会があれば別の記事でご紹介したいと思います。