Amazon MWAA Serverless用のDAGコンバーターを使ってみた

Amazon MWAA Serverless用のDAGコンバーターを使ってみた

2025.12.28

こんにちは。サービス開発室の武田です。

先日、Amazon MWAA Serverlessがリリースされました。MWAA ServerlessはAirflow 3(Python 3.12)ベースで動作し、ワークフローをYAML形式で定義します。

既存のPython DAGをMWAA Serverlessへ移行するツールとして、AWSから公式の変換ツール「Python to YAML DAG Converter」が提供されています。今回はこのツールを試してみました。

MWAA ServerlessのYAML形式ワークフロー

MWAA Serverlessでは、dag-factory形式と互換性のあるYAMLでワークフローを定義します。

dag_id: my_data_pipeline
description: "Process daily data from S3 to Redshift"
schedule: "0 2 * * *"  # Daily at 2 AM
start_date: "2024-01-01"

tasks:
  extract_data:
    operator: S3ListOperator
    bucket: amzn-s3-demo-source-bucket
    prefix: daily/

  transform_data:
    operator: GlueJobOperator
    job_name: example-transform-job
    depends_on: [extract_data]

  load_data:
    operator: RedshiftSQLOperator
    sql: "COPY target_table FROM 's3://amzn-s3-demo-output-bucket/'"
    depends_on: [transform_data]

これまではPythonコードでDAGを定義してましたが、YAML形式に変わるため慣れは必要でしょうか。メリットとしてはGitでの差分管理や、実行前のYAML定義検証などはありそうです。

変換ツールをやってみる

Python to YAML DAG Converter for MWAA Serverlessは、既存のPython DAGをYAML形式に変換するツールです。

https://github.com/awslabs/python-to-yaml-dag-converter-mwaa-serverless

環境準備

Python 3.11以上が必要です。pipでインストールする場合はvenvで仮想環境を作っておくのがお勧めです。

python3 -m venv venv
source venv/bin/activate
pip install python-to-yaml-dag-converter-mwaa-serverless

uvxを使えばインストール不要で直接実行できます。

uvx --from python-to-yaml-dag-converter-mwaa-serverless dag-converter convert <DAGファイル>

サンプルDAGを用意

変換対象として、S3からファイル一覧を取得してLambdaで処理するシンプルなDAGを用意しました。

sample_dag.py
from datetime import datetime, timezone
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator


@dag(
    dag_id="s3_lambda_pipeline",
    description="S3ファイル一覧を取得してLambdaで処理するパイプライン",
    schedule="@daily",
    start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),
    catchup=False,
    tags=["example", "s3", "lambda"],
)
def s3_lambda_pipeline():
    list_files = S3ListOperator(
        task_id="list_s3_files",
        bucket="amzn-s3-demo-source-bucket",
        prefix="input/",
    )

    process_files = LambdaInvokeFunctionOperator(
        task_id="process_with_lambda",
        function_name="example-process-function",
        payload='{"source": "airflow"}',
    )

    list_files >> process_files


s3_lambda_pipeline()

変換を実行

--validateオプションを付けると、dag-factoryで変換後のYAMLを検証してくれます。

dag-converter convert sample_dag.py --validate
$ dag-converter convert sample_dag.py --validate
[2025-12-01T00:00:00.000+0900] {dagbag.py:585} INFO - Filling up the DagBag from sample_dag.py
Validating YAML...
[2025-12-01T00:00:00.500+0900] {dagbag.py:585} INFO - Filling up the DagBag from /tmp/dags
Loading DAG from: /tmp/dags/configs/temp_dag.yaml
[2025-12-01T00:00:01.000+0900] {dagfactory.py:303} INFO - Loading DAGs from /home/user/airflow/dags
Successfully loaded DAGs: []

Found 4 differences:

  Task: list_s3_files
    - Attributes missing in final task: {'_cached_logger', 'hook', ...}
    - _BaseOperator__init_kwargs: Keys missing in initial dict: {'start_date', 'trigger_rule', ...}

  Task: process_with_lambda
    - Attributes missing in final task: {'_cached_logger', 'hook', ...}
    - _BaseOperator__init_kwargs: Keys missing in initial dict: {'start_date', 'trigger_rule', ...}

YAML validation successful, no errors found
YAML written to output_yaml/s3_lambda_pipeline.yaml

1 Dag Object(s) found.

変換結果

出力されたYAMLを見てみます。デフォルトではoutput_yamlフォルダに保存されます。

$ cat output_yaml/s3_lambda_pipeline.yaml
s3_lambda_pipeline:
  dag_id: s3_lambda_pipeline
  schedule: '@daily'
  catchup: false
  description: "S3ファイル一覧を取得してLambdaで処理するパイプライン"
  start_date:
    __type__: datetime.datetime
    year: 2024
    month: 1
    day: 1
  tags:
  - lambda
  - example
  - s3
  tasks:
    list_s3_files:
      operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
      bucket: amzn-s3-demo-source-bucket
      prefix: input/
      aws_conn_id: aws_default
      task_id: list_s3_files
      dependencies: []
    process_with_lambda:
      operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
      function_name: example-process-function
      payload: '{"source": "airflow"}'
      aws_conn_id: aws_default
      task_id: process_with_lambda
      dependencies:
      - list_s3_files

元のPythonコードに比べるとちょっと冗長ですが、オペレーターの完全修飾名やデフォルト値が明示的に出力されています。dependenciesでタスク間の依存関係も正しく変換されていますね。

その他のオプション

オプション 説明
--output 出力パスを指定
--bucket S3バケットを指定してYAMLを直接アップロード
--validate / --no-validate dag-factoryを使用した検証の有効/無効
--debug / --no-debug 変換前後のログ記録の有効/無効

S3に直接アップロードしたい場合はこんな感じです。

dag-converter convert sample_dag.py --bucket amzn-s3-demo-mwaa-bucket --validate

変換できないDAGも試してみる

続いて、変換できないパターンも試してみましょう。PythonOperatorを含むDAGを用意しました。

unsupported_dag.py
from datetime import datetime, timezone
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator


def process_data(**context):
    """カスタムのPython処理"""
    print("Processing data...")
    return {"status": "completed"}


@dag(
    dag_id="mixed_pipeline",
    description="サポート/非サポートオペレーターを含むDAG",
    schedule="@daily",
    start_date=datetime(2024, 1, 1, tzinfo=timezone.utc),
    catchup=False,
)
def mixed_pipeline():
    list_files = S3ListOperator(
        task_id="list_s3_files",
        bucket="amzn-s3-demo-source-bucket",
        prefix="input/",
    )

    custom_process = PythonOperator(
        task_id="custom_process",
        python_callable=process_data,
    )

    list_files >> custom_process


mixed_pipeline()

このDAGを変換してみます。

$ dag-converter convert unsupported_dag.py --validate
Static validation failed
...
InvalidOperatorError: Operator PythonOperator is not supported

静的検証の段階でPythonOperatorがサポートされていないと判定されて、変換処理自体が中断されてしまいました。

サポートされるオペレーター

MWAA ServerlessはAmazon Provider Packageのオペレーターのみサポートしています。80以上のAWSオペレーターが使えます。

  • S3関連: S3ListOperator, S3CopyObjectOperatorなど
  • Glue関連: GlueJobOperator, GlueCrawlerOperatorなど
  • EMR関連: EmrServerlessStartJobOperatorなど
  • Redshift関連: RedshiftSQLOperatorなど
  • Lambda関連: LambdaInvokeFunctionOperatorなど
  • Step Functions関連: StepFunctionStartExecutionOperatorなど

サポートされるオペレーターの一覧は公式ドキュメントで確認できます。

https://docs.aws.amazon.com/mwaa/latest/mwaa-serverless-userguide/operators.html

変換できないもの

次のようなDAGは変換できないか、変換後に手直しが必要になります。

  • サードパーティや自作のカスタムオペレーターは使えない
  • PythonOperatorで任意のPythonコードを実行するタスクは直接サポートされていない
  • Airflowの動的タスクマッピング(動的タスク生成)は変換ツールでサポートされていない
  • dag-factoryが対応していない機能は変換されない

PythonOperatorの代替パターン

PythonOperatorを使っている場合は、処理をLambdaに移すのが手っ取り早いです。

たとえばこんなPythonOperatorがあったとします。

def my_custom_function():
    # カスタム処理
    pass

task = PythonOperator(
    task_id='custom_task',
    python_callable=my_custom_function
)

この処理をLambda関数として実装しておけば、YAMLではこう書けます。

tasks:
  custom_task:
    operator: LambdaInvokeFunctionOperator
    function_name: example-custom-function
    payload: '{"key": "value"}'

Airflow 2.xからの移行フロー

既存のAirflow 2.x環境からMWAA Serverlessへ移行する場合の流れをまとめておきます。

1. ruffでAirflow 3互換性をチェック

まず既存DAGがAirflow 3と互換性があるか確認します。ruff linterのAIR30ルールセットで非互換なコードを自動検出できます。

$ ruff check --preview --select AIR30 legacy_dag.py
AIR301 [*] `schedule_interval` is removed in Airflow 3.0
  --> legacy_dag.py:9:5
   |
 7 |     dag_id="legacy_pipeline",
 8 |     description="Airflow 2.x形式のDAG",
 9 |     schedule_interval="@daily",
   |     ^^^^^^^^^^^^^^^^^
   |
help: Use `schedule` instead

Found 1 error.
[*] 1 fixable with the `--fix` option.

--fixオプションを付けると自動修正もできます。

2. Airflow 2.x → 3.xの主な変更点

ruffで検出される主な非互換項目はこのあたりです。

項目 Airflow 2.x Airflow 3.x
スケジュール指定 schedule_interval schedule(旧名は非推奨)
開始日時 タイムゾーンなしでも可 タイムゾーン指定を推奨
catchupデフォルト True False
SubDAG サポート 廃止(TaskGroups/Assetsへ移行)
コンテキスト変数 execution_date logical_date等に変更

3. DAGを修正して変換

非互換な箇所を直したら、変換ツールでYAMLに変換します。

dag-converter convert <修正済みDAG>.py --validate

4. 変換結果を確認してデプロイ

変換されたYAMLを確認し、MWAA Serverlessにデプロイします。

使ってみた所感

良かった点

  • AWSオペレーターだけで組んだDAGはすんなり変換できた
  • --validateオプションで変換後のYAMLを検証できるのは助かる
  • dag-factoryと互換性があるので、従来版MWAAで事前テストできる

注意が必要な点

  • PythonOperatorを多用しているDAGは設計の見直しが必要
  • タスク間の依存関係が複雑な場合は、変換結果を要確認
  • カスタムオペレーターは事前にLambda等への移行が必要

移行を検討する際のポイント

すべてのワークロードがMWAA Serverlessに向いているわけではないですね。

たとえばカスタムオペレーターを多用していてLambda等への移行コストが高い場合や、Airflow Web UIを活発に使っている場合は、従来版MWAAを継続した方がよさそうです。常時高頻度で実行されるワークロードも同様ですね。

一方で、AWSサービス中心のパイプラインや、実行頻度の低いワークロードであれば、MWAA Serverlessへの移行メリットは大きいです。コスト最適化を重視する開発・テスト環境にも向いています。

まとめ

Python to YAML DAG Converter for MWAA Serverlessを使って、既存のPython DAGをYAML形式に変換してみました。AWSオペレーターだけで組んだDAGであればスムーズに変換できますが、PythonOperatorやカスタムオペレーターを使っている場合は設計の見直しが必要です。

移行を検討する際は、まず既存DAGで使っているオペレーターを洗い出して、サポート状況を確認するところから始めるのがよさそうです。参考になれば幸いです。

この記事をシェアする

FacebookHatena blogX

関連記事