Cloud Composer で GCS に特定のオブジェクトが存在する場合にタスクを実行させたい
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
やりたいこと
GCS に特定のオブジェクトがアップロードされたときだけ Airflow のタスクを実行したい場合があります。
例えば、毎日特定の時間に生成されるログファイルを GCS にアップロードし、そのファイルがアップロードされたらすぐにそれを処理するといったシナリオが考えられます。
そのような要求を満たすために、Apache Airflowは多くのセンサーを提供しています。
今回はGCSObjectExistenceSensor
を使用して、GCS に特定のオブジェクトが存在する場合に Airflow のタスクを実行する方法をご紹介します。
GCSObjectExistenceSensor
GCSObjectExistenceSensor
は、GCS に特定のオブジェクトが存在するかをどうかを監視するセンサーです。
Apache Airflow の Google Cloud プロバイダパッケージに含まれており、指定したオブジェクトが存在するかどうかをポーリングによって定期的にチェックします。
パラメータを調整することでセンサーの挙動を細かく制御することが可能です。
bucket
: 監視対象のGCSバケットの名前を指定します。object
: 監視対象のGCSオブジェクトの名前を指定します。mode
: ポーリングの動作モードを指定します。デフォルトの'poke'モードでは、定期的にポーリングを行い、指定した条件が満たされているかをチェックします。poke_interval
: ポーリングの間隔を秒単位で指定します。timeout
: タイムアウト時間を秒単位で指定します。指定した時間が経過しても指定した条件が満たされない場合、センサーは例外をスローします。soft_fail
: これが'True'に設定されていると、タイムアウト時にセンサータスクは失敗ではなくスキップ状態となります。デフォルトは'False'です。
例えば、以下のように設定できます。
gcs_sensor = GoogleCloudStorageObjectSensor( task_id='gcs_sensor', bucket='my-gcs-bucket', object='my-file-name', mode='poke', poke_interval=300, # 5 minutes timeout=3600 # 1 hour )
この設定では、センサーは5分ごとに指定のオブジェクト(gs://my-gcs-bucket/my-file-name
)が存在するかをチェックし、1時間以内に見つかった場合にはタスクを成功、見つからなかった場合は失敗として扱います。
これは例えば、GCSにファイルが必ず存在するわけではないが、存在すれば何らかの処理を行いたい場合などに便利です。
それでは、GCSObjectExistenceSensor
は使ったDAGを作成し、Cloud Composer で動かしてみましょう。
Cloud Composer 環境を作成
DAGを動かす Cloud Composer 環境を作成します。
Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。
test-composer
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。
DAG を作成
Cloud Composer 環境で実行する DAGを作成します。
このDAGでは、まずGCSObjectExistenceSensor
を使用して、指定したオブジェクト(gs://composer-gcs-bucket/upload_file.txt)がGCSバケットに存在するかどうかを確認します。
オブジェクトが存在すれば、センサータスクは成功し、次に定義されているダミータスクが実行されます。
センサータスクは1分ごとにオブジェクトが存在するかをチェックし、タイムアウト時間5分が過ぎると失敗します。
from airflow import DAG from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.operators.dummy_operator import DummyOperator from datetime import datetime with DAG( dag_id="gcs_sensor_dag", start_date=datetime.now(), schedule_interval=None, ) as dag: # GCSバケット内の指定したオブジェクトの存在を確認するセンサー gcs_sensor = GCSObjectExistenceSensor( task_id="gcs_sensor", bucket="composer-gcs-bucket", object="upload_file.txt", mode="poke", poke_interval=60, # 1 minute timeout=60 * 5 # 5 minutes ) # ダミータスク dummy_task = DummyOperator( task_id="dummy_task" ) # センサータスク成功後にダミータスクを実行 gcs_sensor >> dummy_task
test-composer
の[DAG]リンクからDAGフォルダに移動します。
DAGフォルダにファイルをアップロードしてDAGをデプロイします。
DAG を実行
デプロイしたDAGを実行してみましょう。
手動でDAGをトリガーします。
[DAGs]ページのgcs_sensor_dag
をクリックしてDAGの詳細画面に移動します。
画面右上の[再生マーク]ボタンからTrigger DAG
をクリックします。
DAG が起動し、センサータスクが実行されました。
GCSにオブジェクトが存在する場合
センサータスクが実行中に、GCSバケットgs://composer-gcs-bucket/にupload_file.txtをアップロードします。
しばらくすると、センサータスクが成功し、後続のダミータスクも成功しました。
GCSObjectExistenceSensor
が先ほどアップロードした監視対象のオブジェクトgs://composer-gcs-bucket/upload_file.txtを検知できたようです。
GCSにオブジェクトが存在しない場合
次に、GCSにオブジェクトがない場合の挙動を確認してみましょう。
GCSバケットgs://composer-gcs-bucket/からupload_file.txtを削除しておきます。
先ほどと同じようにDAG を起動します。
5分経過すると、センサータスクが失敗しました。
GCSObjectExistenceSensor
が監視対象のオブジェクトgs://composer-gcs-bucket/upload_file.txtを検知できないままタイムアウト時間5分が過ぎたので失敗し、後続のダミータスクは上流タスク失敗により実行されなかったようです。
定期的にオブジェクトの存在チェックをしたい
schedule_interval
パラメータを使用して、DAGの定期実行を設定することができます。
例えば、schedule_interval
に@hourly
を指定すると、DAGは毎時0分実行されます。
3時間に1回、指定のオブジェクトがGCSにアップロードされる場合、DAGの実行結果は以下のようになります。
3時間に1回(3回に1回)、センサータスクがオブジェクトを検知して成功しています。
センサータスクを失敗させたくない
GCSObjectExistenceSensor
は指定したGCSバケットに特定のオブジェクトが存在しない場合、タスクは失敗します。
しかし、特定のオブジェクトが存在しない場合でもDAG全体を失敗させたくない場合があります。
そのような場合には、soft_fail
パラメータをTrue
に設定します。
これにより、特定のオブジェクトが存在しない場合、センサータスクはスキップされ、DAG全体の実行が続行されます。
先ほど動かしたDAGのsoft_fail
パラメータをTrue
に設定し、GCSにオブジェクトが存在しない状態でDAGを実行すると以下のようになります。
センサータスクがスキップ扱いとなり、後続のダミータスクも実行されていません。
まとめ
以上、Cloud Composer で GCS に特定のオブジェクトが存在する場合にタスクを実行する方法をご紹介しました。
GCSObjectExistenceSensor
を使用することで、指定したオブジェクトが GCS にアップロードされたこと検知し、後続のタスクを動かすことができました。
アップロードしてからセンサータスクが成功するまで若干のタイムラグがありましたが、これはアップロードするタイミングやpoke_interval
の値によって変わりそうです。
GCSObjectExistenceSensor
センサータスクなので、オブジェクトのアップロードを検知するためにはDAGが動いている必要があります。
アップロードをリアルタイムに検知したい場合は、Cloud Storage トリガー の Cloud Functions を使用して Cloud Composer のDAGを起動するという方法が最適かもしれません。(今度試してみたいと思います!)
オブジェクト名にワイルドカード指定はできない
GCSObjectExistenceSensor
は、具体的なオブジェクト名を指定し、その存在をチェックするセンサーです。
そのため、ワイルドカードを指定してオブジェクト名を指定することはできません。
GCSObjectExistenceSensor
に似たセンサーにGCSObjectsWithPrefixExistenceSensor
があります。
GCSObjectsWithPrefixExistenceSensor
は、特定のプレフィックスを持つ1つ以上のオブジェクトがGCSバケット内に存在するかどうかをチェックします。
ワイルドカード指定ができるわけではないですが、特定のプレフィックスを持つすべてのオブジェクトをチェックするので、複数のオブジェクトを一度にチェックする必要がある場合に便利そうです。