Amazon MWAA ServerlessではAirflow Variableが使えないので移行方法を考える
こんにちは。サービス開発室の武田です。
Apache AirflowではVariable.get()やJinjaテンプレートの{{ var.value.xxx }}で設定値を管理するのが一般的です。今回は Airflow変数(Variable) がMWAA Serverlessでどう扱われるかを検証してみました。
MWAA Serverlessではこのしくみが根本的に変わっています。しかも現時点ではAWSからVariableの公式な移行ガイドも出ていません。検証結果と移行時の考え方を整理してみます。
Airflow VariableとParamsの違い
Apache Airflowにはワークフローにパラメーターを渡すしくみが複数あります。今回の検証に関係する Variable と Params の違いを先に整理しておきます。
Variable
DAGの外部から管理するグローバルなkey-valueストアです。Jinjaテンプレートで参照するのが推奨パターンです。
task = S3ListOperator(
task_id="list_files",
bucket="{{ var.value.data_bucket }}",
prefix="{{ var.json.pipeline_config.prefix }}",
)
- メタデータベース(
variableテーブル)に格納される - Airflow UI、CLI、REST APIから登録・変更可能
- DAGのコードを変更せずに値を変えられる
- DAG間で共有できる
- Jinjaテンプレートでは
{{ var.value.xxx }}、{{ var.json.xxx }}で参照 - Pythonコード内では
Variable.get()でも取得可能だが、DAGのトップレベルでの使用はスケジューラのパース毎にDB問い合わせが発生するため非推奨
Params
DAG定義内に記述するパラメーターです。
@dag(params={"bucket": "default-bucket", "prefix": "data/"})
def my_pipeline():
task = BashOperator(
task_id="process",
bash_command="aws s3 ls s3://{{ params.bucket }}/{{ params.prefix }}",
)
- DAG定義のコード内に記述する
- DAGスコープ(他のDAGとは共有しない)
trigger_dag -cやAPI経由で実行時に上書き可能- Jinjaテンプレートでは
{{ params.xxx }}で参照
両者の使い分け
| 特性 | Variable | Params |
|---|---|---|
| 管理場所 | メタデータDB(UIやCLIで管理) | DAG定義内(コードに記述) |
| スコープ | グローバル(DAG間共有) | DAGスコープ |
| 値の変更 | コード変更不要 | コード変更が必要(※実行時上書きは可能) |
| 機密情報 | Secrets Managerバックエンド対応 | コードに含まれるため不向き |
従来のMWAAでは両方が使えますが、MWAA Serverlessではどうなるのかを検証しました。
MWAA Serverlessで使えるテンプレート変数を検証してみた
MWAA ServerlessのYAMLワークフロー定義で、各種テンプレート変数が実際に展開されるかを検証しました。
検証方法
SqsPublishOperatorのmessage_contentにテンプレート変数を埋め込み、送信されたSQSメッセージの内容から展開結果を確認しました。
# テスト用ワークフロー定義
test_var:
dag_id: test_var
schedule: null
default_args:
owner: airflow
start_date: "2024-01-01"
tasks:
publish_test:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: publish_test
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
message_content: '{"value":"{{ テスト対象のテンプレート変数 }}"}'
ワークフローを実行し、SQSキューに送信されたメッセージの中身を確認します。変数が正しく展開されていれば値が入り、サポートされていなければエラーでワークフロー自体が失敗します。
検証結果
{{ params }} → 使える
YAML定義内のparamsで定義した値は、{{ params.xxx }}で正常に展開されます。
test_var_params:
dag_id: test_var_params
schedule: null
default_args:
owner: airflow
start_date: "2024-01-01"
params:
greeting: "hello-from-params"
target_bucket: "my-params-bucket"
tasks:
publish_params_test:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: publish_params_test
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
message_content: '{"greeting":"{{ params.greeting }}","bucket":"{{ params.target_bucket }}","ds":"{{ ds }}"}'
送信されたSQSメッセージはこちらです。
{
"greeting": "hello-from-params",
"bucket": "my-params-bucket",
"ds": "2026-03-04"
}
{{ params.greeting }}がhello-from-paramsに、{{ params.target_bucket }}がmy-params-bucketに正しく展開されています。{{ ds }}(DAG実行日)も問題なく展開されました。
{{ var.value.xxx }} → 使えない
従来のAirflowと同じように{{ var.value.xxx }}が使えるか検証しました。
test_var_value:
dag_id: test_var_value
schedule: null
default_args:
owner: airflow
start_date: "2024-01-01"
tasks:
publish_var_value_test:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: publish_var_value_test
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
message_content: '{"value":"{{ var.value.test_var }}"}'
結果はFAILEDでした。
jinja2.exceptions.UndefinedError: 'var' is undefined
タスクは実行されず、テンプレート展開の段階でエラーになりました。Jinjaのテンプレートコンテキストにvarオブジェクトが存在しないためです。
{{ var.json.xxx }} → 使えない
message_content: '{"bucket":"{{ var.json.test_json_var.bucket_name }}"}'
こちらも同じく'var' is undefinedエラーでFAILEDです。
varオブジェクト自体がテンプレートコンテキストにないため、var.valueとvar.jsonはどちらも使えません。
{{ ti }} → 一部使える(SafeTaskInstance)
{{ ti }}は使えますが、標準のAirflow TaskInstanceではなくSafeTaskInstanceという制限付きのラッパオブジェクトです。
message_content: '{"task_id":"{{ ti.task_id }}","dag_id":"{{ ti.dag_id }}"}'
送信されたSQSメッセージはこちらです。
{
"task_id": "publish_ti_test",
"dag_id": "ex_3e5d20aa-d1fd-4c45-b636-b327eb411736",
"ds": "2026-03-04"
}
ti.task_idは期待したとおりですが、ti.dag_idは定義したtest_var_tiではなくMWAA Serverless内部のID(ex_3e5d20aa-...)が返りました。
一方、ti.run_idやti.try_numberを参照するとエラーになります。
jinja2.exceptions.UndefinedError: 'yaml_dag_handler.driver.jinja_context.SafeTaskInstance object' has no attribute 'run_id'
エラーメッセージから、MWAA ServerlessではTaskInstanceをSafeTaskInstanceでラップして公開する属性を制限していることがわかります。
ti.xcom_pull() → 使える
タスク間のデータ連携に使うxcom_pull()は動作しました。S3ListOperatorの結果をxcom_pull経由で次のタスクから参照する2タスク構成で確認しています。
tasks:
list_s3:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
task_id: list_s3
bucket: my-bucket
prefix: "workflows/"
publish_result:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: publish_result
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
message_content: '{"xcom_result":"{{ ti.xcom_pull(task_ids="list_s3") }}"}'
dependencies:
- list_s3
{
"xcom_result": "['workflows/sqs-sensor-basic.yaml', 'workflows/test-var-params.yaml', ...]"
}
S3ListOperatorの戻り値がXCom経由で正しく取得できています。
SafeTaskInstance の属性まとめ
| 属性/メソッド | 結果 | 備考 |
|---|---|---|
ti.task_id |
使える | タスクIDが返る |
ti.dag_id |
使える | ただし内部生成のIDが返る(ユーザー定義のdag_idではない) |
ti.xcom_pull() |
使える | タスク間データ連携が可能 |
ti.run_id |
使えない | SafeTaskInstanceに属性なし |
ti.try_number |
使えない | 同上 |
ti.execution_date |
使えない | 同上 |
{{ macros }} → 使える(ただしサンドボックス制限あり)
macrosパッケージの関数をいくつか検証しました。
# ds_add: 日付加減算
message_content: '{"yesterday":"{{ macros.ds_add(ds, -1) }}","tomorrow":"{{ macros.ds_add(ds, 1) }}"}'
{"yesterday": "2026-03-03", "tomorrow": "2026-03-05"}
# ds_format: フォーマット変換、random: ランダム値
message_content: >-
{"ds_format":"{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}","random":"{{ macros.random() }}"}
{"ds_format": "2026/03/04", "random": "0.7386899833460479"}
# datetime.now(), timedelta
message_content: >-
{"datetime_now":"{{ macros.datetime.now() }}","timedelta":"{{ macros.timedelta(days=7) }}"}
{"datetime_now": "2026-03-04 12:31:34.399187", "timedelta": "7 days, 0:00:00"}
注意点として、macros.datetimeは標準Airflowのdatetimeモジュールではなくdatetime.datetimeクラスそのものです。また、返されたオブジェクトの.strftime()メソッドはJinjaサンドボックスでブロックされます。
jinja2.exceptions.SecurityError: access to attribute 'strftime' of 'datetime' object is unsafe.
日付フォーマットの変換にはmacros.ds_format()を使う必要があります。
macrosの対応まとめ
| 関数/モジュール | 結果 | 備考 |
|---|---|---|
macros.ds_add() |
使える | 日付の加減算 |
macros.ds_format() |
使える | 日付フォーマット変換 |
macros.random() |
使える | ランダム値生成 |
macros.datetime.now() |
使える | ただし.strftime()はSecurityErrorでブロック |
macros.timedelta() |
使える | タイムデルタ生成 |
以上の検証から、{{ params }}、{{ ti }}(SafeTaskInstanceの範囲内)、{{ macros }}は使えることがわかりました。一方で{{ var.value }}と{{ var.json }}は使えません。
なぜVariableが使えないのか
MWAA Serverlessは従来のMWAAとアーキテクチャが大きく異なります。
| 項目 | 従来のMWAA | MWAA Serverless |
|---|---|---|
| メタデータDB | Aurora PostgreSQL | 管理不要(抽象化) |
| Airflow UI | あり | なし |
| DAG定義 | Python | YAMLのみ |
| スケジューラ | Airflowスケジューラ(常時稼働) | EventBridge Scheduler |
| Airflow設定 | airflow.cfg相当を設定可能 | 設定項目が限定的 |
aws_conn_id |
ユーザーが管理 | サービスが制御(IAMロールベース) |
Airflowのvarテンプレート変数は、デフォルトではメタデータベース内のvariableテーブルから値を取得します。MWAA Serverlessではメタデータベースの管理が完全に抽象化されており、Variableの登録手段(Airflow UI、CLI、REST API)がありません。そのためvarオブジェクト自体がテンプレートコンテキストに注入されていないと考えられます。
従来のMWAAではsecrets.backendを設定してSecrets Managerなどの外部バックエンドからVariableを取得できました。しかしMWAA Serverlessではそのような設定手段も提供されていません。
公式ドキュメントのサポートされるJinjaテンプレート変数にもvarは記載されていません。ただしこのページには「Apache Airflowに付属するJinjaテンプレートをサポート」という記述があります。一見するとすべてのAirflow標準テンプレート変数が使えるように読めますが、実際には限定されたサブセットのみのサポートです。
Connectionの扱いも変わっている
なお、Variableだけでなく Connectionの扱いも変わっています。公式ドキュメントによるとaws_conn_idは「Controlled by service」で、AWSサービスへのアクセスはワークフローに紐づくIAM実行ロールで制御されます。ユーザーがConnectionを登録するしくみは提供されていません。非AWSサービスへの接続が必要な場合はLambdaやECS経由の間接実行になります。
公式な移行ガイドはまだない
2026年3月時点で、AWSはVariableの移行に関する公式ガイドを提供していません。
確認した情報源は次のとおりです。
- Convert Python DAG to YAML definition — Python DAG→YAML変換の手順のみ。
Variable.get()の変換については記載なし - python-to-yaml-dag-converter-mwaa-serverless — AWS Labsの公式変換ツール。Variable.get()の処理について明記されていない
- MWAA Serverless ユーザーガイド — Variableの移行セクションなし
- MWAA Migration Guide — 従来のMWAAへの移行やバージョンアップグレードに関するガイドであり、MWAA Serverlessへの移行は対象外
変換ツールはVariable.get()を含むDAGを変換できない
公式の変換ツール(python-to-yaml-dag-converter-mwaa-serverless)を使って、Variable.get()を含むDAGを実際に変換してみました。次の例はDAGのトップレベルでVariable.get()を呼んでいるケースです。Airflowでは非推奨のパターンですが、既存のDAGでは見かけることがあります。
# DAGトップレベルでVariable.get()を呼ぶ非推奨パターン
from airflow.models import Variable
bucket = Variable.get("data_bucket")
config = Variable.get("pipeline_config", deserialize_json=True)
with DAG(dag_id="dag_with_variable", schedule="@daily", ...):
list_files = S3ListOperator(task_id="list_files", bucket=bucket, prefix=config["prefix"])
$ dag-converter convert dag_with_variable.py
結果は 変換失敗 です。
KeyError: 'Variable data_bucket does not exist'
Exception: Failed to generate dag object for file dag_with_variable.py
変換ツールは内部でDagBagを使ってDAGファイルをPythonとして実行します。上記の例ではVariable.get("data_bucket")がモジュールのトップレベルで呼ばれているため、ファイルの読み込み時にそのまま実行されます。Variableクラス自体はインポートできますが、キーdata_bucketに対応する値がメタデータDBに登録されていないためKeyErrorで失敗します。
仮にローカルのAirflow環境に変数を登録しておけば変換自体は成功しますが、その場合は変数の値がYAMLにハードコードされるだけです。paramsへの変換や{{ var.value }}の検出・警告といった処理は行われません。いずれにしても Variable.get()を使っているDAGの変換は、手動での事前対応が必要 です。
Jinjaテンプレートの {{ var.value }} は警告なくスルーされる
一方、Jinjaテンプレート内で{{ var.value.xxx }}を使っているケースも検証しました。
# Jinjaテンプレート内で var を使っているDAG
publish = SqsPublishOperator(
task_id="publish_result",
sqs_queue="https://sqs...",
message_content='{"bucket":"{{ var.value.data_bucket }}"}',
)
この場合、変換は成功 します。
# 変換後のYAML — {{ var.value.data_bucket }} がそのまま残る
message_content: '{"bucket":"{{ var.value.data_bucket }}"}'
Jinjaテンプレートは変換時点ではただの文字列ですので、変換ツールはエラーや警告を出せません。問題が発覚するのは MWAA Serverlessでワークフローを実行したとき です。実行時に'var' is undefinedエラーでタスクが失敗してはじめて気付くことになります。
MWAA Serverlessでのパラメーター管理
Variableが使えないMWAA Serverlessでは、パラメーター管理の手段はparamsに限られます。ただしparamsはもともとVariableとは異なる用途の機能です。Variableが担っていた役割をすべてカバーできるわけではありません。
paramsではカバーできないVariableの機能
| Variableの機能 | paramsでの実現可否 |
|---|---|
| DAGコードを変更せずに値を変更 | --override-parametersで部分的に可能。ただしスケジュール実行時はYAML再アップロードが必要 |
| DAG間で設定値を共有 | 不可能 — DAGごとに個別定義が必要 |
| UIから管理 | 不可能 — Airflow UIが存在しない |
| Secrets Managerで機密情報管理 | 不可能 — YAML/API上に記述されるため不向き |
| 動的な値の取得(実行時にDB参照) | 不可能 — params値はDAG定義時に固定 |
YAML定義内にデフォルト値を設定する
my_pipeline:
dag_id: my_pipeline
schedule: "0 2 * * *"
default_args:
owner: airflow
start_date: "2024-01-01"
params:
data_bucket: "default-data-bucket"
output_prefix: "output/daily"
max_records: "10000"
tasks:
list_files:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
task_id: list_files
bucket: "{{ params.data_bucket }}"
prefix: "{{ params.output_prefix }}"
実行時にパラメーターを上書きする
StartWorkflowRun APIの--override-parametersを使うと、YAML定義内のparamsを実行時に上書きできます。
aws mwaa-serverless start-workflow-run \
--workflow-arn "arn:aws:airflow-serverless:us-east-1:123456789012:workflow/my-pipeline-xxx" \
--override-parameters '{"data_bucket":"prod-data-bucket","output_prefix":"prod/output"}'
実際に検証してみました。YAML定義ではparams.greetingに"hello-from-params"を設定し、--override-parametersで上書きしてみます。
// YAML定義のparamsで実行した場合
{"greeting": "hello-from-params", "bucket": "my-params-bucket"}
// override-parametersで上書きした場合
{"greeting": "overridden-greeting", "bucket": "overridden-bucket"}
ばっちり上書きされていますね。
既存DAGからの移行パターン
公式の移行ガイドがない以上、Variableを使っている既存DAGをMWAA Serverlessに移行する場合は、利用者自身でパターンに応じた書き換えが必要です。典型的なパターンを整理してみました。
なお、すべてのパターンでVariableの利点(DAG間共有、UIからの管理、コード変更なしの値変更)は失われます。移行可否の判断材料としてください。
パターン1: {{ var.value }}をparamsに書き換える
たとえばこんなDAGがあったとします。
task = S3ListOperator(
task_id="list_files",
bucket="{{ var.value.data_bucket }}",
prefix="{{ var.value.output_prefix }}",
)
MWAA Serverlessでは次のようにYAML定義のparamsで書き換えます。
my_pipeline:
dag_id: my_pipeline
params:
data_bucket: "my-data-bucket"
output_prefix: "output/"
tasks:
list_files:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
task_id: list_files
bucket: "{{ params.data_bucket }}"
prefix: "{{ params.output_prefix }}"
パターン2: {{ var.json }}をparamsに展開する
JSON形式のVariableをJinjaテンプレートで参照していた場合は、フラットなparamsに展開します。
task = GlueJobOperator(
task_id="transform",
job_name="my-etl",
script_args={
"--source_bucket": "{{ var.json.pipeline_config.bucket }}",
"--output_format": "{{ var.json.pipeline_config.format }}",
},
)
これをYAMLに変換するとこうなります。
my_pipeline:
dag_id: my_pipeline
params:
source_bucket: "xxx"
output_format: "parquet"
tasks:
transform:
operator: airflow.providers.amazon.aws.operators.glue.GlueJobOperator
task_id: transform
job_name: my-etl
script_args:
"--source_bucket": "{{ params.source_bucket }}"
"--output_format": "{{ params.output_format }}"
パターン3: 環境別の設定管理
環境ごとにVariableの値を変えて運用していた場合は、ワークフロー起動側で--override-parametersを使い分けます。
たとえば従来のMWAAではこう書いていたとします。
# Airflow UI や Secrets Manager で環境ごとに Variable を設定
# staging: data_bucket = "staging-bucket"
# production: data_bucket = "prod-bucket"
task = S3ListOperator(
task_id="list_files",
bucket="{{ var.value.data_bucket }}",
)
MWAA Serverlessでは、YAML定義にはデフォルト値を設定しておきます。
# YAML定義にはデフォルト値を設定
my_pipeline:
params:
data_bucket: "default-bucket"
そして環境別にパラメーターを上書きして実行します。
# 環境別にパラメーターを上書きして実行
# staging
aws mwaa-serverless start-workflow-run \
--workflow-arn "arn:aws:..." \
--override-parameters '{"data_bucket":"staging-bucket"}'
# production
aws mwaa-serverless start-workflow-run \
--workflow-arn "arn:aws:..." \
--override-parameters '{"data_bucket":"prod-bucket"}'
スケジュール実行する場合は、環境ごとに異なるデフォルト値を設定したワークフローをデプロイするか、EventBridge SchedulerからLambda経由で start-workflow-run を呼び出してパラメーターを渡す構成で対応します。
パターン4: 機密情報の扱い
JinjaテンプレートやVariable.get()で機密情報(APIキー等)を取得していた場合は、Lambda経由の間接実行で対応します。
従来のMWAAでは、Jinjaテンプレートを使って次のように記述していました。
task = SimpleHttpOperator(
task_id="call_api",
headers={"Authorization": "Bearer {{ var.value.external_api_key }}"},
)
MWAA Serverlessでは、Lambda関数内でSecrets Managerから直接取得する設計に変更します。
# Lambda関数内でSecrets Managerから直接取得する設計に変更
my_pipeline:
tasks:
call_api:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: call_api
function_name: call-external-api
payload: '{"endpoint": "/data", "method": "GET"}'
Lambda関数内でboto3を使ってSecrets Managerから機密情報を取得すれば、YAML定義やパラメーターに機密情報を含めずに済みます。
パターン5: Lambda経由で外部パラメーターストアを参照する
Variableを「コード変更なしで設定値を変えたい」「DAG間で設定を共有したい」という用途で使っていた場合はどうでしょうか。SSM Parameter Storeなどの外部ストアとLambdaを組み合わせる方法があります。
なお、MWAA Serverlessのサポートオペレータ一覧には、外部ストアから設定値を読み取ってXComに渡すオペレータが存在しません。そのためLambda経由で取得します。
制約: 1つのLambdaから複数の値を個別に取り出せない
「Lambdaで複数パラメータをまとめて取得し、後続タスクに個別に渡す」のが理想です。しかしLambdaInvokeFunctionOperatorの戻り値はJSON文字列としてXComに格納されるため、dictキーアクセスやJSON解析が使えません。
# いずれも動作しない
{{ ti.xcom_pull(task_ids='get_config').data_bucket }} → YAMLプリプロセッサでブロック
{{ ti.xcom_pull(task_ids='get_config')['data_bucket'] }} → 実行時エラー(XCom値が文字列のため)
{{ ti.xcom_pull(task_ids='get_config') | fromjson }} → No filter named 'fromjson'
標準Airflowではrender_template_as_native_objやuser_defined_filtersで回避できます。しかしMWAA ServerlessではこれらのDAGレベルパラメーターが非サポートです。| splitフィルターもJinja2に存在せず、.split()メソッドはサンドボックスでブロックされます。
そのため、パラメーターごとにLambdaタスクを分ける 必要があります。
回避策: パラメーターごとにLambdaタスクを分け | replace でクオートを除去する
実際に検証したところ、同一のLambda関数をパラメーターごとに異なるペイロードで呼び出し、JinjaのreplaceフィルターでJSON文字列のクオートを除去するパターンが動作しました。
my_pipeline:
tasks:
get_bucket:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: get_bucket
function_name: get-ssm-parameter
payload: '{"key": "/myapp/data_bucket"}'
get_prefix:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: get_prefix
function_name: get-ssm-parameter
payload: '{"key": "/myapp/output_prefix"}'
list_files:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
task_id: list_files
bucket: >-
{{ ti.xcom_pull(task_ids='get_bucket') | replace('"', '') }}
prefix: >-
{{ ti.xcom_pull(task_ids='get_prefix') | replace('"', '') }}
dependencies:
- get_bucket
- get_prefix
Lambda関数は1つだけ用意すれば済みます。呼び出し時のペイロードでキーを指定し、対応する値を返します。
import boto3
def handler(event, context):
ssm = boto3.client("ssm")
resp = ssm.get_parameter(Name=event["key"])
return resp["Parameter"]["Value"]
LambdaInvokeFunctionOperatorの戻り値はJSON文字列としてXComに格納されるため、文字列値の場合は前後にクオート(")が付きます。| replace('"', '')でこのクオートを除去することで、後続のオペレータにクリーンな値を渡せます。
なお、このクオート問題はLambdaInvokeFunctionOperator固有の挙動です。S3ListOperator等の通常のオペレータではXCom値がPythonオブジェクトのまま保持されます。そのため| replaceは不要で、{{ ti.xcom_pull(task_ids='list_s3')[0] }}のようなブラケットアクセスも正常に動作します。
LambdaInvokeFunctionOperatorだけが特殊なのは、Lambdaランタイムが戻り値をJSON直列化するためです。オペレータがそれをresponse['Payload'].read().decode()で文字列として読み取ります。通常のオペレータはexecute()がPythonオブジェクトを直接返すため、XComの直列化・復元で型が保持されます。
このパターンを使えば、Lambda経由でSSM Parameter Storeから取得した値をAWSオペレータに直接渡せます。実際にS3ListOperatorでバケット内のファイル一覧を取得できることを確認済みです。
paramsと比べたメリットは次のとおりです。
- YAML定義を変更せずにSSM Parameter Store側で値を変更できる
- 複数のワークフロー間で同じパラメーターパスを参照して設定を共有できる
- SecureStringパラメーターを使えば機密情報の管理にも対応可能
制約として、パラメーターごとにLambdaタスクが1つ必要 です。Lambda関数自体は1つで済みますが、ワークフロー定義上のタスク数はパラメーター数に比例して増えます。取得するパラメーターが多い場合は煩雑になるため、paramsへの直接記述と使い分けてください。
まとめ
検証結果
| 項目 | 結果 |
|---|---|
{{ params.xxx }} |
使える |
{{ ds }}, {{ ts }} 等 |
使える |
{{ ti.task_id }}, {{ ti.dag_id }} |
使える(dag_idは内部IDが返る) |
{{ ti.xcom_pull() }} |
使える |
{{ ti.run_id }}, {{ ti.try_number }} 等 |
使えない(SafeTaskInstanceで制限) |
{{ macros.ds_add() }} |
使える |
{{ var.value.xxx }} |
使えない('var' is undefined) |
{{ var.json.xxx }} |
使えない(同上) |
--override-parameters による上書き |
使える |
移行における課題
MWAA Serverlessでは、Airflow Variableの扱いが従来のMWAAとは根本的に異なっています。
- Variableの登録手段がない。メタデータベースは抽象化されており、
secrets.backendでの外部バックエンドへの切り替えもできない - Connectionも同様に変更されており、
aws_conn_idはIAMロールで制御される - Variableの移行方法について、AWSからの公式ガイドは2026年3月時点で提供されていない
- 公式の変換ツールは
Variable.get()を含むDAGを変換できない(DAGインポート時にVariable.get()が実行されエラーになる)
移行可否の判断基準
MWAA Serverlessへの移行を検討する際は、次の観点で既存DAGを評価してみてください。
移行しやすいのは次のようなケースです。
- Variableの利用が限定的で、固定値への書き換えで済む
- AWSオペレータのみを使用している
- DAG間での設定値共有がない
一方で、次のようなケースだと移行がやっかいです。
- 多数のDAG間でVariableを共有しており、一元管理が必要
- Variable.get()を使った動的なDAG生成をしている
- 機密情報をVariable経由で管理し、Secrets Managerバックエンドを活用している
- 非AWSサービスへのConnectionに依存している
移行が困難なケースでは、従来のMWAA(標準版)の継続利用も選択肢に入れたうえで判断してください。
参考リンク
- Amazon MWAA Serverless - サポートされるJinjaテンプレート変数
- Amazon MWAA Serverless - Apache Airflow Parameter Support
- Amazon MWAA Serverless ユーザーガイド
- MWAA Serverless - ワークフロー定義
- MWAA Serverless - Convert Python DAG to YAML definition
- python-to-yaml-dag-converter-mwaa-serverless(AWS Labs)
- 従来のMWAA - Secrets Manager連携






