Building a Dynamic Configuration Update Workflow for Storage Transfer Service in Cloud Composer
Introduction
Hello! This is Enokawa.
In data pipelines using Cloud Composer (Apache Airflow), it's common to use Storage Transfer Service (STS) to transfer data between cloud storages.
Especially in regular incremental transfers, properly managing the range of files to be transferred becomes an important challenge.
In this article, I'll introduce how to dynamically update Storage Transfer Service job settings to build an efficient incremental transfer workflow.
Recap of the Previous Article
Previously, in an article titled "Implementing a Custom Sensor in Cloud Composer to Wait for Storage Transfer Service Operation Completion", I introduced how to implement a custom sensor that ensures waiting for transfer operation completion.
Building on the previous custom sensor implementation, this time we will implement a mechanism that automatically extracts the last successful time from the transfer history and dynamically updates the transfer job settings.
What We'll Do Today
Challenges to Solve
When executing Storage Transfer Service from Cloud Composer, there are challenges in implementing efficient incremental transfers.
Risk of Missing Files in External Trigger Execution
Storage Transfer Service has different characteristics depending on the execution method.
With STS Built-in Scheduling, efficient transfer is possible using filter settings based on file timestamps through Storage Transfer Service's own scheduling function. Since the transfer start time is guaranteed, no files are missed.
On the other hand, with External Trigger Execution (API execution from Cloud Composer, etc.), there is a possibility of missing transfer files due to the following factors:
Uncertainty of Execution Timing
- STS start time is not guaranteed due to DAG pauses or task queuing
- Possibility of delays due to resource shortages or system maintenance
Limitations of Fixed Filter Settings
Fixed settings like "files updated in the last hour" can cause files to be missed due to execution delays.
Example:
- Scheduled DAG start time: 10:00
- Actual start time: 10:30 (30-minute delay)
- Transfer setting: "Transfer files updated in the last hour (9:30-10:30)"
- Result: Files updated between 9:00-9:30 are missed from the transfer target
Solution Approach
Dynamic Setting Updates Based on Transfer History
To solve these challenges, we will adopt a dynamic setting update approach based on transfer history.
The following processes will be automatically executed when the DAG runs:
- Retrieve past operation history using the Storage Transfer Service API
- Automatically extract the latest start time from successful operations
- Use this time as the reference time (
lastModifiedSince
) for the next transfer
This ensures that files modified since the previous successful transfer are always transferred reliably, regardless of DAG execution delays.
Example:
- Previous transfer start time: 09:00
- Current DAG start time: 10:30 (30-minute delay)
- Transfer target: Files updated after 09:00
- Result: Incremental transfer achieved without missing files## Preparing the Cloud Composer Environment
We will prepare a Cloud Composer environment as a validation environment.
We will build the environment using the same procedure as in the previous article "Implementing a Custom Sensor in Cloud Composer to Wait for Storage Transfer Service Operation Completion".
Additionally, to use Storage Transfer Service operators and hooks, it is necessary to install the apache-airflow-providers-amazon
package.
For this procedure as well, please refer to the "Installing the amazon package" section in the previous article.
## Create a DAG
Below is the DAG that I actually used for operational verification.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
CloudDataTransferServiceRunJobOperator,
CloudDataTransferServiceUpdateJobOperator,
CloudDataTransferServiceListOperationsOperator
)
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import sys
import os
# Import custom sensor
from custom_sts_sensor import CustomStorageTransferOperationSensor
# Configuration values (Note: Replace with actual values)
PROJECT_ID = '{project ID}'
TRANSFER_JOB_ID = 'transferJobs/{transfer job ID}'
# Basic DAG settings
default_args = {
'start_date': days_ago(1),
'retries': 0, # No retries
}
def prepare_update_body(**context):
"""Extract the last successful start time from transfer history and generate job update configuration"""
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
CloudDataTransferServiceHook
)
# Get transfer history from previous task
operations = context['task_instance'].xcom_pull(task_ids='list_operations')
# Fallback time (use 24 hours ago if no history exists)
fallback_time = (datetime.utcnow() - timedelta(hours=24)).isoformat() + 'Z'
# Extract the last successful time
if not operations:
last_success_time = fallback_time
else:
# Extract only successful operations
success_operations = [
op for op in operations
if op.get('metadata', {}).get('status') == 'SUCCESS'
]
if not success_operations:
last_success_time = fallback_time
else:
# Sort by startTime and get the most recent successful start time
success_operations.sort(
key=lambda x: x.get('metadata', {}).get('startTime', ''),
reverse=True
)
last_success_time = success_operations[0].get('metadata', {}).get('startTime')
# Get existing transfer job configuration
hook = CloudDataTransferServiceHook(gcp_conn_id='google_cloud_default')
transfer_job = hook.get_transfer_job(
job_name=TRANSFER_JOB_ID,
project_id=PROJECT_ID
)
# Create updated transfer job configuration (only fields to change)
updated_transfer_job = {
'transferSpec': {
**transfer_job.get('transferSpec', {}),
'objectConditions': {
**transfer_job.get('transferSpec', {}).get('objectConditions', {}),
'lastModifiedSince': last_success_time
}
}
}
# Create update request body
update_body = {
"projectId": PROJECT_ID,
"transferJob": updated_transfer_job
}
return update_body
# DAG definition
with DAG(
dag_id='sts_dynamic_config_dag',
default_args=default_args,
schedule_interval=None, # Manual trigger
catchup=False,
render_template_as_native_obj=True, # Properly process XCom data
) as dag:
# Get transfer history
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id='list_operations',
project_id=PROJECT_ID,
request_filter={
'job_names': [TRANSFER_JOB_ID]
}
)
# Prepare update configuration
prepare_update = PythonOperator(
task_id='prepare_update_body',
python_callable=prepare_update_body,
provide_context=True
)
# Update transfer job configuration
update_transfer_job = CloudDataTransferServiceUpdateJobOperator(
task_id='update_transfer_job',
job_name=TRANSFER_JOB_ID,
body="{{ task_instance.xcom_pull(task_ids='prepare_update_body') }}",
project_id=PROJECT_ID
)
# Execute transfer job
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id='run_transfer',
job_name=TRANSFER_JOB_ID,
project_id=PROJECT_ID
)
# Wait for transfer completion
wait_for_transfer = CustomStorageTransferOperationSensor(
task_id='wait_for_transfer',
operation_name="{{ task_instance.xcom_pull(task_ids='run_transfer')['name'] }}",
project_id=PROJECT_ID,
poke_interval=60, # Check status every 60 seconds
timeout=60 * 10, # Wait up to 10 minutes
mode='reschedule' # Use reschedule mode for resource efficiency
)
# Define task dependencies
(
list_operations
>> prepare_update
>> update_transfer_job
>> run_transfer
>> wait_for_transfer
)
```### Explanation of DAG Code
This DAG consists of 5 tasks that automate a series of processes from retrieving STS history to completing the transfer.
1. **list_operations**: Retrieves operation history from the Storage Transfer Service
- Uses `CloudDataTransferServiceListOperationsOperator`
- Narrows down to specific transfer jobs with `request_filter`
- Retrieved history is automatically stored in XComs
2. **prepare_update_body**: Extracts the last successful start time from the history and prepares a request body for job updates
- Uses Python function to get `startTime` from successful operations
- Specifies only the changed fields for partial updates
3. **update_transfer_job**: Updates the transfer job configuration
- Uses `CloudDataTransferServiceUpdateJobOperator`
- Dynamically retrieves the body from XComs and automatically updates the `lastModifiedSince` parameter
4. **run_transfer**: Runs the transfer job with the updated configuration
- Uses `CloudDataTransferServiceRunJobOperator`
- Stores the execution result (operation name) in XComs
5. **wait_for_transfer**: Waits for the transfer operation to complete
- Uses a custom sensor to wait for completion
- Checks status at 60-second intervals with `poke_interval=60`
- Optimizes resource efficiency with `mode='reschedule'`#### Using Custom Sensors
We will reuse the custom sensor (`CustomStorageTransferOperationSensor`) implemented in the previous article.
:::details Custom Sensor (`CustomStorageTransferOperationSensor`)
```python:custom_sts_sensor.py
"""
Simple Storage Transfer Service sensor
Retrieves the status of a specified transfer job and waits for completion if it's running.
"""
from typing import Dict, Sequence
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
CloudDataTransferServiceHook,
)
from airflow.exceptions import AirflowException
class CustomStorageTransferOperationSensor(BaseSensorOperator):
"""
A simple sensor that waits for a Storage Transfer Service operation to complete
:param operation_name: The full name of the operation to monitor
:param project_id: Google Cloud project ID
:param gcp_conn_id: Google Cloud connection ID
"""
template_fields: Sequence[str] = ('operation_name',)
def __init__(
self,
*,
operation_name: str,
project_id: str,
gcp_conn_id: str = 'google_cloud_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.operation_name = operation_name
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
def poke(self, context: Dict) -> bool:
hook = CloudDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
)
self.log.info("Checking status of operation %s...", self.operation_name)
operation = hook.get_transfer_operation(operation_name=self.operation_name)
self.log.info("Operation response: %s", operation)
done = operation.get('done', False)
# Get status from metadata
metadata = operation.get('metadata', {})
status = metadata.get('status')
self.log.info("Operation status: %s", status)
if done:
# If there is error information, treat it as an error
if 'error' in operation:
error = operation['error']
err_msg = error.get('message', 'Unknown error')
raise AirflowException(
f"Operation {self.operation_name} ended with an error: {err_msg}"
)
# If status is not SUCCESS, also treat as an error
if status != 'SUCCESS':
raise AirflowException(
f"Operation {self.operation_name} did not complete successfully. Status: {status}"
)
# If there is no explicit error and status is SUCCESS, treat as success
self.log.info("Operation %s completed successfully", self.operation_name)
return True
self.log.info("Operation %s is not yet complete. Current status: %s", self.operation_name, status)
return False
```:::
## Operational Verification
I actually executed the DAG and verified the operation of the dynamic configuration update workflow.
### Transfer Job Configuration Check Before Execution
First, I checked the transfer job configuration before executing the DAG.

In the transfer job settings, the "Include by last modified date" filter is not set (blank).
In this state, all files would be subject to transfer.
This can also be confirmed from the command line.
```bash
$ gcloud transfer jobs describe transferJobs/{transfer job ID} | jq '.transferSpec.objectConditions.lastModifiedSince'
null
DAG Execution
I manually executed the DAG from the Cloud Composer Web UI.
Click the play button in the upper right corner to start execution.
On the DAG details screen, five tasks can be confirmed.
Each task is displayed in the task list on the left, arranged in execution order.
The DAG is in running state.
You can see Status: running
and that each task is being executed sequentially.
All tasks completed successfully, and the overall DAG status is also success
.### Checking the dynamic configuration content
Let's check the dynamic configuration generated in the prepare_update_body
task via XComs.
In the XComs tab, you can verify the dynamically generated update request body.
In objectConditions
, the lastModifiedSince
is set to "2025-09-01T05:57:18.622936746Z"
, which shows that the start time of the previous successful transfer has been automatically extracted.
This timestamp was generated by extracting the latest successful operation from the transfer history and using its startTime
.
The generated request body looks like this:
{
"projectId": "{projectID}",
"transferJob": {
"transferSpec": {
"gcsDataSink": {
"bucketName": "cm_enokawa_work"
},
"gcsDataSource": {
"bucketName": "cm_enokawa_work_source"
},
"objectConditions": {
"lastModifiedSince": "2025-09-01T05:57:18.622936746Z"
},
"transferOptions": {
"metadataOptions": {
"acl": "ACL_DESTINATION_BUCKET_DEFAULT",
"kmsKey": "KMS_KEY_DESTINATION_BUCKET_DEFAULT",
"storageClass": "STORAGE_CLASS_DESTINATION_BUCKET_DEFAULT",
"temporaryHold": "TEMPORARY_HOLD_PRESERVE",
"timeCreated": "TIME_CREATED_SKIP"
},
"overwriteWhen": "DIFFERENT"
}
}
}
}
```### Checking Transfer Job Settings After Execution
Let's check the transfer job settings after DAG execution.

The transfer job's "Include by last modified date" filter has been automatically updated to "After 2025/09/01".
While the Cloud Console UI displays the date in day units, it's actually managed with precision down to the nanosecond level.
You can check the detailed configuration values from the command line.
```bash
$ gcloud transfer jobs describe transferJobs/{transfer job ID} | jq '.transferSpec.objectConditions.lastModifiedSince'
"2025-09-01T05:57:18.622936746Z"
During the next execution, only files updated after this timestamp will be targeted for transfer.
We've successfully implemented an incremental transfer workflow that reliably transfers only files modified since the previous successful execution, regardless of when the DAG runs.
Summary
In this article, we introduced how to build a dynamic configuration update workflow for Storage Transfer Service using Cloud Composer.
This dynamic configuration update solves the problem of "missing files due to DAG execution delays" that was an issue with traditional external trigger execution.
The implementation involves several technical points, including proper analysis of transfer history, implementation of fallback functionality, and task coordination using XComs.
While we directly specified the previous successful timestamp for lastModifiedSince
, you could also set a buffer by using a timestamp a few minutes earlier than the last successful time to ensure no files at timing boundaries are missed.
In the future, we'd like to explore resource optimization through combination with Deferrable Operators and retry control for transfer failures.