Cloud Composer で Storage Transfer Service の動的設定更新ワークフローを構築する

Cloud Composer で Storage Transfer Service の動的設定更新ワークフローを構築する

2025.09.02

はじめに

こんにちは!エノカワです。

Cloud Composer (Apache Airflow) を使用したデータパイプラインでは、Storage Transfer Service (STS) を活用してクラウドストレージ間のデータ転送を行うことが多くあります。
特に定期的な増分転送においては、転送対象ファイルの範囲を適切に管理することが重要な課題となります。

本記事では、Storage Transfer Service の転送ジョブ設定を動的に更新し、効率的な増分転送ワークフローを構築する方法について紹介します。

前回のおさらい

以前、「Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する」という記事で、転送オペレーションの確実な完了待機を実現するカスタムセンサーの実装方法を紹介しました。

https://dev.classmethod.jp/articles/cloud-composer-custom-sensor-storage-transfer-service/

前回のカスタムセンサー実装をベースに、今回は 転送履歴から最終成功時刻を自動抽出し、転送ジョブの設定を動的に更新する仕組み を実装します。

今回やること

解決したい課題

Storage Transfer Service を Cloud Composer から実行する場合、効率的な増分転送を実現するための課題があります。

外部トリガー実行における転送ファイルの取りこぼしリスク

Storage Transfer Service では、実行方式により異なる特徴があります。

STS組み込みスケジュールでは、Storage Transfer Service 自体のスケジュール機能により、ファイルタイムスタンプを元にしたフィルタ設定で効率的な転送が可能です。転送開始時刻が保証されるため、ファイルの取りこぼしが発生しません。

一方、外部トリガー実行(Cloud Composer等からAPI実行)では、以下の要因により転送ファイルの取りこぼしが発生する可能性があります。

実行タイミングの不確実性
  • DAGのPauseやTaskのキューイングにより、STS の起動時刻が保証されない
  • リソース不足やシステムメンテナンスによる遅延の可能性
  • 想定した時刻に転送が開始されない場合がある
固定的なフィルタ設定の限界

直近1時間に更新されたファイルのような固定的な設定では、実行遅延により取りこぼしが発生する可能性があります。

例:

  • DAGの開始予定時刻:10:00
  • 実際の開始時刻:10:30(30分遅延)
  • 転送設定:「直近1時間(9:00-10:00)のファイルを転送」
  • 結果:10:00-10:30の間に更新されたファイルが転送対象から漏れる

解決アプローチ

転送履歴に基づく動的設定更新

これらの課題を解決するため、転送履歴に基づく動的設定更新アプローチを採用します。

DAG実行時に以下の処理を自動実行します。

  • Storage Transfer Service API を使用して過去のオペレーション履歴を取得
  • 成功したオペレーションから最新の開始時刻を自動抽出
  • この時刻を次回転送の基準時刻(lastModifiedSince)として活用

これにより、DAGの実行遅延に関係なく、常に前回成功分以降のファイルを確実に転送できます。

例:

  • 前回転送開始時刻:09:45
  • 今回のDAG開始時刻:10:30(30分遅延)
  • 転送対象:09:45以降に更新されたファイル
  • 結果:取りこぼし無しで増分転送を実現

Cloud Composer 環境の準備

検証環境として Cloud Composer 環境を準備します。
前回の記事「Cloud Composer でカスタムセンサーを実装して Storage Transfer Service のオペレーション完了を待機する」と同様の手順で環境を構築します。

また、Storage Transfer Service のオペレーターやフックを使用するには、apache-airflow-providers-amazonパッケージのインストールが必要です。
こちらの手順についても、前回の記事の「amazon パッケージをインストール」セクションをご参照ください。

cloud-composer-storage-transfer-service-dynamic-config-update_01

DAGを作成する

以下が、実際に動作検証を行ったDAGです。

sts_dynamic_config_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
    CloudDataTransferServiceRunJobOperator,
    CloudDataTransferServiceUpdateJobOperator,
    CloudDataTransferServiceListOperationsOperator
)
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import sys
import os

# カスタムセンサーをインポート
from custom_sts_sensor import CustomStorageTransferOperationSensor

# 設定値(注:実際の値に置き換える)
PROJECT_ID = '{プロジェクトID}'
TRANSFER_JOB_ID = 'transferJobs/{転送ジョブID}'

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
    'retries': 0,  # リトライなし
}

def prepare_update_body(**context):
    """転送履歴から最終成功開始時刻を抽出しジョブ更新用設定を生成"""
    from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
        CloudDataTransferServiceHook
    )

    # 前タスクから転送履歴を取得
    operations = context['task_instance'].xcom_pull(task_ids='list_operations')

    # フォールバック時間(履歴が無い場合は24時間前を使用)
    fallback_time = (datetime.utcnow() - timedelta(hours=24)).isoformat() + 'Z'

    # 最終成功時刻を抽出
    if not operations:
        last_success_time = fallback_time
    else:
        # 成功したオペレーションのみを抽出
        success_operations = [
            op for op in operations 
            if op.get('metadata', {}).get('status') == 'SUCCESS'
        ]

        if not success_operations:
            last_success_time = fallback_time
        else:
            # startTimeでソートして最新の成功開始時刻を取得
            success_operations.sort(
                key=lambda x: x.get('metadata', {}).get('startTime', ''),
                reverse=True
            )
            last_success_time = success_operations[0].get('metadata', {}).get('startTime')

    # 既存の転送ジョブ設定を取得
    hook = CloudDataTransferServiceHook(gcp_conn_id='google_cloud_default')
    transfer_job = hook.get_transfer_job(
        job_name=TRANSFER_JOB_ID,
        project_id=PROJECT_ID
    )

    # 更新用転送ジョブ設定を作成(変更するフィールドのみ)
    updated_transfer_job = {
        'transferSpec': {
            **transfer_job.get('transferSpec', {}),
            'objectConditions': {
                **transfer_job.get('transferSpec', {}).get('objectConditions', {}),
                'lastModifiedSince': last_success_time
            }
        }
    }

    # 更新用リクエストボディを作成
    update_body = {
        "projectId": PROJECT_ID,
        "transferJob": updated_transfer_job
    }

    return update_body

# DAG定義
with DAG(
    dag_id='sts_dynamic_config_dag',
    default_args=default_args,
    schedule_interval=None,  # 手動トリガー
    catchup=False,
    render_template_as_native_obj=True,  # XComデータを適切に処理
) as dag:

    # 転送履歴の取得
    list_operations = CloudDataTransferServiceListOperationsOperator(
        task_id='list_operations',
        project_id=PROJECT_ID,
        request_filter={
            'job_names': [TRANSFER_JOB_ID]
        }
    )

    # 更新設定の準備
    prepare_update = PythonOperator(
        task_id='prepare_update_body',
        python_callable=prepare_update_body,
        provide_context=True
    )

    # 転送ジョブ設定の更新
    update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
        task_id='update_transfer_job',
        job_name=TRANSFER_JOB_ID,
        body="{{ task_instance.xcom_pull(task_ids='prepare_update_body') }}",
        project_id=PROJECT_ID
    )

    # 転送ジョブの実行
    run_transfer = CloudDataTransferServiceRunJobOperator(
        task_id='run_transfer',
        job_name=TRANSFER_JOB_ID,
        project_id=PROJECT_ID
    )

    # 転送完了待機
    wait_for_transfer = CustomStorageTransferOperationSensor(
        task_id='wait_for_transfer',
        operation_name="{{ task_instance.xcom_pull(task_ids='run_transfer')['name'] }}",
        project_id=PROJECT_ID,
        poke_interval=60,        # 60秒ごとにステータスを確認
        timeout=60 * 10,         # 最大10分間待機
        mode='reschedule'        # リソース効率のためrescheduleモードを使用
    )

    # タスク依存関係の定義
    (
        list_operations 
        >> prepare_update 
        >> update_transfer_job 
        >> run_transfer 
        >> wait_for_transfer
    )

DAGコードの解説

このDAGは5つのタスクで構成され、STSの履歴取得から転送完了まで一連の処理を自動化します。

  1. list_operations: Storage Transfer Service のオペレーション履歴を取得します

    • CloudDataTransferServiceListOperationsOperatorを使用
    • request_filterで特定の転送ジョブに絞り込み
    • 取得した履歴はXComsに自動格納
  2. prepare_update_body: 履歴から最終成功開始時刻を抽出し、ジョブ更新用リクエストボディを準備します

    • Python関数を使用して成功オペレーションからstartTimeを取得
    • 変更フィールドのみを指定して部分更新
  3. update_transfer_job: 転送ジョブの設定を更新します

    • CloudDataTransferServiceUpdateJobOperatorを使用
    • XComsから動的にボディを取得し、lastModifiedSinceパラメータを自動更新
  4. run_transfer: 更新された設定で転送ジョブを実行します

    • CloudDataTransferServiceRunJobOperatorを使用
    • 実行結果(オペレーション名)をXComsに格納
  5. wait_for_transfer: 転送オペレーションの完了を待機します

    • カスタムセンサーを使用して完了待機
    • poke_interval=60で60秒間隔でステータス確認
    • mode='reschedule'でリソース効率を最適化

カスタムセンサーの活用

前回記事で実装したカスタムセンサー (CustomStorageTransferOperationSensor) を再利用します。

カスタムセンサー (`CustomStorageTransferOperationSensor`)
custom_sts_sensor.py
"""
シンプルなStorage Transfer Service センサー

指定された転送ジョブのステータスを取得し、実行中であれば完了まで待機します。
"""

from typing import Dict, Sequence

from airflow.sensors.base import BaseSensorOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
    CloudDataTransferServiceHook,
)
from airflow.exceptions import AirflowException

class CustomStorageTransferOperationSensor(BaseSensorOperator):
    """
    Storage Transfer Service のオペレーション完了を待機するシンプルなセンサー
    :param operation_name: 監視するオペレーションの完全な名前
    :param project_id: Google Cloud プロジェクトID
    :param gcp_conn_id: Google Cloud 接続ID
    """

    template_fields: Sequence[str] = ('operation_name',)

    def __init__(
        self,
        *,
        operation_name: str,
        project_id: str,
        gcp_conn_id: str = 'google_cloud_default',
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.operation_name = operation_name
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id

    def poke(self, context: Dict) -> bool:
        hook = CloudDataTransferServiceHook(
            gcp_conn_id=self.gcp_conn_id,
        )

        self.log.info("オペレーション %s の状態を確認中...", self.operation_name)

        operation = hook.get_transfer_operation(operation_name=self.operation_name)
        self.log.info("オペレーションの応答: %s", operation)

        done = operation.get('done', False)

        # メタデータからステータスを取得
        metadata = operation.get('metadata', {})
        status = metadata.get('status')
        self.log.info("オペレーションのステータス: %s", status)

        if done:
            # エラー情報がある場合はエラーとして扱う
            if 'error' in operation:
                error = operation['error']
                err_msg = error.get('message', 'Unknown error')
                raise AirflowException(
                    f"オペレーション {self.operation_name} がエラーで終了しました: {err_msg}"
                )

            # ステータスがSUCCESSでない場合もエラーとして扱う
            if status != 'SUCCESS':
                raise AirflowException(
                    f"オペレーション {self.operation_name} が正常に完了しませんでした。ステータス: {status}"
                )

            # 明示的なエラーがなく、ステータスもSUCCESSの場合は成功として扱う
            self.log.info("オペレーション %s が正常に完了しました", self.operation_name)
            return True

        self.log.info("オペレーション %s はまだ完了していません。現在のステータス: %s", self.operation_name, status)
        return False

動作確認

実際にDAGを実行し、動的設定更新ワークフローの動作を確認してみました。

実行前の転送ジョブ設定確認

まず、DAG実行前の転送ジョブ設定を確認します。

cloud-composer-storage-transfer-service-dynamic-config-update_02

転送ジョブの設定で「最終更新日時で含める」フィルタが未設定(空欄)の状態です。
この状態では、すべてのファイルが転送対象になってしまいます。

コマンドラインからも確認できます。

$ gcloud transfer jobs describe transferJobs/{転送ジョブID} | jq '.transferSpec.objectConditions.lastModifiedSince'
null

DAGの実行

Cloud Composer の Web UI からDAGを手動で実行します。
画面右上の再生ボタンをクリックして実行を開始します。

cloud-composer-storage-transfer-service-dynamic-config-update_03

DAG の詳細画面では、5つのタスクが確認できます。
左側のタスクリストに各タスクが表示されており、実行順序通りに並んでいます。

cloud-composer-storage-transfer-service-dynamic-config-update_04

DAGが実行中の状態です。
Status: running となっており、各タスクが順次実行されていることが分かります。

cloud-composer-storage-transfer-service-dynamic-config-update_05

全てのタスクが正常に完了し、DAG全体のステータスも success になっています。

動的設定内容の確認

prepare_update_body タスクで生成された動的設定を XComs で確認してみます。

cloud-composer-storage-transfer-service-dynamic-config-update_07

XComsタブで、動的に生成された更新リクエストボディを確認できます。

objectConditionslastModifiedSince"2025-09-01T05:57:18.622936746Z" が設定されており、前回成功転送の開始時刻が自動抽出されていることが分かります。

この時刻は、転送履歴から最新の成功オペレーションを抽出し、その startTime を使用して生成されたものです。

生成されたリクエストボディは以下のようになります。

{
    "projectId": "{プロジェクトID}",
    "transferJob": {
        "transferSpec": {
            "gcsDataSink": {
                "bucketName": "cm_enokawa_work"
            },
            "gcsDataSource": {
                "bucketName": "cm_enokawa_work_source"
            },
            "objectConditions": {
                "lastModifiedSince": "2025-09-01T05:57:18.622936746Z"
            },
            "transferOptions": {
                "metadataOptions": {
                    "acl": "ACL_DESTINATION_BUCKET_DEFAULT",
                    "kmsKey": "KMS_KEY_DESTINATION_BUCKET_DEFAULT",
                    "storageClass": "STORAGE_CLASS_DESTINATION_BUCKET_DEFAULT",
                    "temporaryHold": "TEMPORARY_HOLD_PRESERVE",
                    "timeCreated": "TIME_CREATED_SKIP"
                },
                "overwriteWhen": "DIFFERENT"
            }
        }
    }
}

実行後の転送ジョブ設定確認

DAG実行後の転送ジョブ設定を確認します。

cloud-composer-storage-transfer-service-dynamic-config-update_06_v2

転送ジョブの「最終更新日時で含める」フィルタが「2025/09/01 以降」に自動更新されています。

Cloud Console UIでは日付単位で表示されていますが、実際にはナノ秒単位まで含む精密な時刻で管理されています。

コマンドラインから詳細な設定値を確認できます。

$ gcloud transfer jobs describe transferJobs/{転送ジョブID} | jq '.transferSpec.objectConditions.lastModifiedSince'
"2025-09-01T05:57:18.622936746Z"

次回実行時には、この時刻以降に更新されたファイルのみが転送対象となります。

DAGの実行タイミングに関係なく、常に前回成功分以降のファイルを確実に転送できる増分転送ワークフローが実現できました。

まとめ

本記事では、Cloud Composer で Storage Transfer Service の 動的設定更新ワークフロー を構築する方法をご紹介しました。

この動的設定更新により、従来の外部トリガー実行で課題となっていた「DAGの実行遅延による転送ファイルの取りこぼし」を解決できました。

実装にあたっては、転送履歴の適切な解析や、フォールバック機能の実装、XComsを活用したタスク間連携など、いくつかの技術的なポイントがあります。
今回は lastModifiedSince に前回の成功時刻をそのまま指定しましたが、前回成功時刻よりも数分早い時刻をバッファとして設定することで、タイミングの境界にあるファイルも漏れなく転送できるようにする方法もあります。

今後は、Deferrable Operators との組み合わせによるリソース効率化や、転送失敗時のリトライ制御などについても検討してみたいと思います。

参考リンク

この記事をシェアする

facebookのロゴhatenaのロゴtwitterのロゴ

© Classmethod, Inc. All rights reserved.