Cloud Composer で GCS に特定のオブジェクトが存在する場合にタスクを実行させたい

2023.05.20

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

やりたいこと

GCS に特定のオブジェクトがアップロードされたときだけ Airflow のタスクを実行したい場合があります。
例えば、毎日特定の時間に生成されるログファイルを GCS にアップロードし、そのファイルがアップロードされたらすぐにそれを処理するといったシナリオが考えられます。

そのような要求を満たすために、Apache Airflowは多くのセンサーを提供しています。
今回はGCSObjectExistenceSensorを使用して、GCS に特定のオブジェクトが存在する場合に Airflow のタスクを実行する方法をご紹介します。

GCSObjectExistenceSensor

GCSObjectExistenceSensorは、GCS に特定のオブジェクトが存在するかをどうかを監視するセンサーです。
Apache Airflow の Google Cloud プロバイダパッケージに含まれており、指定したオブジェクトが存在するかどうかをポーリングによって定期的にチェックします。

パラメータを調整することでセンサーの挙動を細かく制御することが可能です。

  • bucket: 監視対象のGCSバケットの名前を指定します。
  • object: 監視対象のGCSオブジェクトの名前を指定します。
  • mode: ポーリングの動作モードを指定します。デフォルトの'poke'モードでは、定期的にポーリングを行い、指定した条件が満たされているかをチェックします。
  • poke_interval: ポーリングの間隔を秒単位で指定します。
  • timeout: タイムアウト時間を秒単位で指定します。指定した時間が経過しても指定した条件が満たされない場合、センサーは例外をスローします。
  • soft_fail: これが'True'に設定されていると、タイムアウト時にセンサータスクは失敗ではなくスキップ状態となります。デフォルトは'False'です。

例えば、以下のように設定できます。

gcs_sensor = GoogleCloudStorageObjectSensor(
    task_id='gcs_sensor',
    bucket='my-gcs-bucket',
    object='my-file-name',
    mode='poke',
    poke_interval=300,  # 5 minutes
    timeout=3600  # 1 hour
)

この設定では、センサーは5分ごとに指定のオブジェクト(gs://my-gcs-bucket/my-file-name)が存在するかをチェックし、1時間以内に見つかった場合にはタスクを成功、見つからなかった場合は失敗として扱います。
これは例えば、GCSにファイルが必ず存在するわけではないが、存在すれば何らかの処理を行いたい場合などに便利です。

それでは、GCSObjectExistenceSensorは使ったDAGを作成し、Cloud Composer で動かしてみましょう。

Cloud Composer 環境を作成

DAGを動かす Cloud Composer 環境を作成します。

Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。

test-composerという名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。

DAG を作成

Cloud Composer 環境で実行する DAGを作成します。

このDAGでは、まずGCSObjectExistenceSensorを使用して、指定したオブジェクト(gs://composer-gcs-bucket/upload_file.txt)がGCSバケットに存在するかどうかを確認します。
オブジェクトが存在すれば、センサータスクは成功し、次に定義されているダミータスクが実行されます。

センサータスクは1分ごとにオブジェクトが存在するかをチェックし、タイムアウト時間5分が過ぎると失敗します。

gcs_sensor_dag.py

from airflow import DAG
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

with DAG(
    dag_id="gcs_sensor_dag",
    start_date=datetime.now(),
    schedule_interval=None,
) as dag:

    # GCSバケット内の指定したオブジェクトの存在を確認するセンサー
    gcs_sensor = GCSObjectExistenceSensor(
        task_id="gcs_sensor",
        bucket="composer-gcs-bucket",
        object="upload_file.txt",
        mode="poke",
        poke_interval=60,  # 1 minute
        timeout=60 * 5  # 5 minutes
    )

    # ダミータスク
    dummy_task = DummyOperator(
        task_id="dummy_task"
    )

    # センサータスク成功後にダミータスクを実行
    gcs_sensor >> dummy_task

test-composer[DAG]リンクからDAGフォルダに移動します。

DAGフォルダにファイルをアップロードしてDAGをデプロイします。

DAG を実行

デプロイしたDAGを実行してみましょう。

手動でDAGをトリガーします。
[DAGs]ページのgcs_sensor_dagをクリックしてDAGの詳細画面に移動します。

画面右上の[再生マーク]ボタンからTrigger DAGをクリックします。

DAG が起動し、センサータスクが実行されました。

GCSにオブジェクトが存在する場合

センサータスクが実行中に、GCSバケットgs://composer-gcs-bucket/upload_file.txtをアップロードします。

しばらくすると、センサータスクが成功し、後続のダミータスクも成功しました。

GCSObjectExistenceSensorが先ほどアップロードした監視対象のオブジェクトgs://composer-gcs-bucket/upload_file.txtを検知できたようです。

GCSにオブジェクトが存在しない場合

次に、GCSにオブジェクトがない場合の挙動を確認してみましょう。

GCSバケットgs://composer-gcs-bucket/からupload_file.txtを削除しておきます。

先ほどと同じようにDAG を起動します。

5分経過すると、センサータスクが失敗しました。

GCSObjectExistenceSensorが監視対象のオブジェクトgs://composer-gcs-bucket/upload_file.txtを検知できないままタイムアウト時間5分が過ぎたので失敗し、後続のダミータスクは上流タスク失敗により実行されなかったようです。

定期的にオブジェクトの存在チェックをしたい

schedule_intervalパラメータを使用して、DAGの定期実行を設定することができます。

例えば、schedule_interval@hourlyを指定すると、DAGは毎時0分実行されます。
3時間に1回、指定のオブジェクトがGCSにアップロードされる場合、DAGの実行結果は以下のようになります。

3時間に1回(3回に1回)、センサータスクがオブジェクトを検知して成功しています。

センサータスクを失敗させたくない

GCSObjectExistenceSensorは指定したGCSバケットに特定のオブジェクトが存在しない場合、タスクは失敗します。
しかし、特定のオブジェクトが存在しない場合でもDAG全体を失敗させたくない場合があります。

そのような場合には、soft_failパラメータをTrueに設定します。
これにより、特定のオブジェクトが存在しない場合、センサータスクはスキップされ、DAG全体の実行が続行されます。

先ほど動かしたDAGのsoft_failパラメータをTrueに設定し、GCSにオブジェクトが存在しない状態でDAGを実行すると以下のようになります。

センサータスクがスキップ扱いとなり、後続のダミータスクも実行されていません。

まとめ

以上、Cloud Composer で GCS に特定のオブジェクトが存在する場合にタスクを実行する方法をご紹介しました。

GCSObjectExistenceSensorを使用することで、指定したオブジェクトが GCS にアップロードされたこと検知し、後続のタスクを動かすことができました。

アップロードしてからセンサータスクが成功するまで若干のタイムラグがありましたが、これはアップロードするタイミングやpoke_intervalの値によって変わりそうです。
GCSObjectExistenceSensorセンサータスクなので、オブジェクトのアップロードを検知するためにはDAGが動いている必要があります。
アップロードをリアルタイムに検知したい場合は、Cloud Storage トリガー の Cloud Functions を使用して Cloud Composer のDAGを起動するという方法が最適かもしれません。(今度試してみたいと思います!)

オブジェクト名にワイルドカード指定はできない

GCSObjectExistenceSensorは、具体的なオブジェクト名を指定し、その存在をチェックするセンサーです。
そのため、ワイルドカードを指定してオブジェクト名を指定することはできません。

GCSObjectExistenceSensorに似たセンサーにGCSObjectsWithPrefixExistenceSensorがあります。
GCSObjectsWithPrefixExistenceSensorは、特定のプレフィックスを持つ1つ以上のオブジェクトがGCSバケット内に存在するかどうかをチェックします。
ワイルドカード指定ができるわけではないですが、特定のプレフィックスを持つすべてのオブジェクトをチェックするので、複数のオブジェクトを一度にチェックする必要がある場合に便利そうです。

参考