Amazon MWAA Serverless用のDAGコンバーターを使ってみた
こんにちは。サービス開発室の武田です。
先日、Amazon MWAA Serverlessがリリースされました。MWAA ServerlessはAirflow 3(Python 3.12)ベースで動作し、ワークフローをYAML形式で定義します。
既存のPython DAGをMWAA Serverlessへ移行するツールとして、AWSから公式の変換ツール「Python to YAML DAG Converter」が提供されています。今回はこのツールを試してみました。
MWAA ServerlessのYAML形式ワークフロー
MWAA Serverlessでは、dag-factory形式と互換性のあるYAMLでワークフローを定義します。
dag_id: my_data_pipeline
description: "Process daily data from S3 to Redshift"
schedule: "0 2 * * *" # Daily at 2 AM
start_date: "2024-01-01"
tasks:
extract_data:
operator: S3ListOperator
bucket: amzn-s3-demo-source-bucket
prefix: daily/
transform_data:
operator: GlueJobOperator
job_name: example-transform-job
depends_on: [extract_data]
load_data:
operator: RedshiftSQLOperator
sql: "COPY target_table FROM 's3://amzn-s3-demo-output-bucket/'"
depends_on: [transform_data]
これまではPythonコードでDAGを定義してましたが、YAML形式に変わるため慣れは必要でしょうか。メリットとしてはGitでの差分管理や、実行前のYAML定義検証などはありそうです。
変換ツールをやってみる
Python to YAML DAG Converter for MWAA Serverlessは、既存のPython DAGをYAML形式に変換するツールです。
環境準備
Python 3.11以上が必要です。pipでインストールする場合はvenvで仮想環境を作っておくのがお勧めです。
python3 -m venv venv
source venv/bin/activate
pip install python-to-yaml-dag-converter-mwaa-serverless
uvxを使えばインストール不要で直接実行できます。
uvx --from python-to-yaml-dag-converter-mwaa-serverless dag-converter convert <DAGファイル>
サンプルDAGを用意
変換対象として、S3からファイル一覧を取得してLambdaで処理するシンプルなDAGを用意しました。
from datetime import datetime, timezone
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
@dag(
dag_id="s3_lambda_pipeline",
description="S3ファイル一覧を取得してLambdaで処理するパイプライン",
schedule="@daily",
start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),
catchup=False,
tags=["example", "s3", "lambda"],
)
def s3_lambda_pipeline():
list_files = S3ListOperator(
task_id="list_s3_files",
bucket="amzn-s3-demo-source-bucket",
prefix="input/",
)
process_files = LambdaInvokeFunctionOperator(
task_id="process_with_lambda",
function_name="example-process-function",
payload='{"source": "airflow"}',
)
list_files >> process_files
s3_lambda_pipeline()
変換を実行
--validateオプションを付けると、dag-factoryで変換後のYAMLを検証してくれます。
dag-converter convert sample_dag.py --validate
$ dag-converter convert sample_dag.py --validate
[2025-12-01T00:00:00.000+0900] {dagbag.py:585} INFO - Filling up the DagBag from sample_dag.py
Validating YAML...
[2025-12-01T00:00:00.500+0900] {dagbag.py:585} INFO - Filling up the DagBag from /tmp/dags
Loading DAG from: /tmp/dags/configs/temp_dag.yaml
[2025-12-01T00:00:01.000+0900] {dagfactory.py:303} INFO - Loading DAGs from /home/user/airflow/dags
Successfully loaded DAGs: []
Found 4 differences:
Task: list_s3_files
- Attributes missing in final task: {'_cached_logger', 'hook', ...}
- _BaseOperator__init_kwargs: Keys missing in initial dict: {'start_date', 'trigger_rule', ...}
Task: process_with_lambda
- Attributes missing in final task: {'_cached_logger', 'hook', ...}
- _BaseOperator__init_kwargs: Keys missing in initial dict: {'start_date', 'trigger_rule', ...}
YAML validation successful, no errors found
YAML written to output_yaml/s3_lambda_pipeline.yaml
1 Dag Object(s) found.
変換結果
出力されたYAMLを見てみます。デフォルトではoutput_yamlフォルダに保存されます。
$ cat output_yaml/s3_lambda_pipeline.yaml
s3_lambda_pipeline:
dag_id: s3_lambda_pipeline
schedule: '@daily'
catchup: false
description: "S3ファイル一覧を取得してLambdaで処理するパイプライン"
start_date:
__type__: datetime.datetime
year: 2024
month: 1
day: 1
tags:
- lambda
- example
- s3
tasks:
list_s3_files:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
bucket: amzn-s3-demo-source-bucket
prefix: input/
aws_conn_id: aws_default
task_id: list_s3_files
dependencies: []
process_with_lambda:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
function_name: example-process-function
payload: '{"source": "airflow"}'
aws_conn_id: aws_default
task_id: process_with_lambda
dependencies:
- list_s3_files
元のPythonコードに比べるとちょっと冗長ですが、オペレーターの完全修飾名やデフォルト値が明示的に出力されています。dependenciesでタスク間の依存関係も正しく変換されていますね。
その他のオプション
| オプション | 説明 |
|---|---|
--output |
出力パスを指定 |
--bucket |
S3バケットを指定してYAMLを直接アップロード |
--validate / --no-validate |
dag-factoryを使用した検証の有効/無効 |
--debug / --no-debug |
変換前後のログ記録の有効/無効 |
S3に直接アップロードしたい場合はこんな感じです。
dag-converter convert sample_dag.py --bucket amzn-s3-demo-mwaa-bucket --validate
変換できないDAGも試してみる
続いて、変換できないパターンも試してみましょう。PythonOperatorを含むDAGを用意しました。
from datetime import datetime, timezone
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
def process_data(**context):
"""カスタムのPython処理"""
print("Processing data...")
return {"status": "completed"}
@dag(
dag_id="mixed_pipeline",
description="サポート/非サポートオペレーターを含むDAG",
schedule="@daily",
start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),
catchup=False,
)
def mixed_pipeline():
list_files = S3ListOperator(
task_id="list_s3_files",
bucket="amzn-s3-demo-source-bucket",
prefix="input/",
)
custom_process = PythonOperator(
task_id="custom_process",
python_callable=process_data,
)
list_files >> custom_process
mixed_pipeline()
このDAGを変換してみます。
$ dag-converter convert unsupported_dag.py --validate
Static validation failed
...
InvalidOperatorError: Operator PythonOperator is not supported
静的検証の段階でPythonOperatorがサポートされていないと判定されて、変換処理自体が中断されてしまいました。
サポートされるオペレーター
MWAA ServerlessはAmazon Provider Packageのオペレーターのみサポートしています。80以上のAWSオペレーターが使えます。
- S3関連:
S3ListOperator,S3CopyObjectOperatorなど - Glue関連:
GlueJobOperator,GlueCrawlerOperatorなど - EMR関連:
EmrServerlessStartJobOperatorなど - Redshift関連:
RedshiftSQLOperatorなど - Lambda関連:
LambdaInvokeFunctionOperatorなど - Step Functions関連:
StepFunctionStartExecutionOperatorなど
サポートされるオペレーターの一覧は公式ドキュメントで確認できます。
変換できないもの
次のようなDAGは変換できないか、変換後に手直しが必要になります。
- サードパーティや自作のカスタムオペレーターは使えない
PythonOperatorで任意のPythonコードを実行するタスクは直接サポートされていない- Airflowの動的タスクマッピング(動的タスク生成)は変換ツールでサポートされていない
- dag-factoryが対応していない機能は変換されない
PythonOperatorの代替パターン
PythonOperatorを使っている場合は、処理をLambdaに移すのが手っ取り早いです。
たとえばこんなPythonOperatorがあったとします。
def my_custom_function():
# カスタム処理
pass
task = PythonOperator(
task_id='custom_task',
python_callable=my_custom_function
)
この処理をLambda関数として実装しておけば、YAMLではこう書けます。
tasks:
custom_task:
operator: LambdaInvokeFunctionOperator
function_name: example-custom-function
payload: '{"key": "value"}'
Airflow 2.xからの移行フロー
既存のAirflow 2.x環境からMWAA Serverlessへ移行する場合の流れをまとめておきます。
1. ruffでAirflow 3互換性をチェック
まず既存DAGがAirflow 3と互換性があるか確認します。ruff linterのAIR30ルールセットで非互換なコードを自動検出できます。
$ ruff check --preview --select AIR30 legacy_dag.py
AIR301 [*] `schedule_interval` is removed in Airflow 3.0
--> legacy_dag.py:9:5
|
7 | dag_id="legacy_pipeline",
8 | description="Airflow 2.x形式のDAG",
9 | schedule_interval="@daily",
| ^^^^^^^^^^^^^^^^^
|
help: Use `schedule` instead
Found 1 error.
[*] 1 fixable with the `--fix` option.
--fixオプションを付けると自動修正もできます。
2. Airflow 2.x → 3.xの主な変更点
ruffで検出される主な非互換項目はこのあたりです。
| 項目 | Airflow 2.x | Airflow 3.x |
|---|---|---|
| スケジュール指定 | schedule_interval |
schedule(旧名は非推奨) |
| 開始日時 | タイムゾーンなしでも可 | タイムゾーン指定を推奨 |
| catchupデフォルト | True |
False |
| SubDAG | サポート | 廃止(TaskGroups/Assetsへ移行) |
| コンテキスト変数 | execution_date等 |
logical_date等に変更 |
3. DAGを修正して変換
非互換な箇所を直したら、変換ツールでYAMLに変換します。
dag-converter convert <修正済みDAG>.py --validate
4. 変換結果を確認してデプロイ
変換されたYAMLを確認し、MWAA Serverlessにデプロイします。
使ってみた所感
良かった点
- AWSオペレーターだけで組んだDAGはすんなり変換できた
--validateオプションで変換後のYAMLを検証できるのは助かる- dag-factoryと互換性があるので、従来版MWAAで事前テストできる
注意が必要な点
- PythonOperatorを多用しているDAGは設計の見直しが必要
- タスク間の依存関係が複雑な場合は、変換結果を要確認
- カスタムオペレーターは事前にLambda等への移行が必要
移行を検討する際のポイント
すべてのワークロードがMWAA Serverlessに向いているわけではないですね。
たとえばカスタムオペレーターを多用していてLambda等への移行コストが高い場合や、Airflow Web UIを活発に使っている場合は、従来版MWAAを継続した方がよさそうです。常時高頻度で実行されるワークロードも同様ですね。
一方で、AWSサービス中心のパイプラインや、実行頻度の低いワークロードであれば、MWAA Serverlessへの移行メリットは大きいです。コスト最適化を重視する開発・テスト環境にも向いています。
まとめ
Python to YAML DAG Converter for MWAA Serverlessを使って、既存のPython DAGをYAML形式に変換してみました。AWSオペレーターだけで組んだDAGであればスムーズに変換できますが、PythonOperatorやカスタムオペレーターを使っている場合は設計の見直しが必要です。
移行を検討する際は、まず既存DAGで使っているオペレーターを洗い出して、サポート状況を確認するところから始めるのがよさそうです。参考になれば幸いです。








