Amazon MWAA ServerlessではAirflow Variableが使えないので移行方法を考える

Amazon MWAA ServerlessではAirflow Variableが使えないので移行方法を考える

2026.03.11

こんにちは。サービス開発室の武田です。

Apache AirflowではVariable.get()やJinjaテンプレートの{{ var.value.xxx }}で設定値を管理するのが一般的です。今回は Airflow変数(Variable) がMWAA Serverlessでどう扱われるかを検証してみました。

MWAA Serverlessではこのしくみが根本的に変わっています。しかも現時点ではAWSからVariableの公式な移行ガイドも出ていません。検証結果と移行時の考え方を整理してみます。

Airflow VariableとParamsの違い

Apache Airflowにはワークフローにパラメーターを渡すしくみが複数あります。今回の検証に関係する VariableParams の違いを先に整理しておきます。

Variable

DAGの外部から管理するグローバルなkey-valueストアです。Jinjaテンプレートで参照するのが推奨パターンです。

task = S3ListOperator(
    task_id="list_files",
    bucket="{{ var.value.data_bucket }}",
    prefix="{{ var.json.pipeline_config.prefix }}",
)
  • メタデータベース(variableテーブル)に格納される
  • Airflow UI、CLI、REST APIから登録・変更可能
  • DAGのコードを変更せずに値を変えられる
  • DAG間で共有できる
  • Jinjaテンプレートでは{{ var.value.xxx }}{{ var.json.xxx }}で参照
  • Pythonコード内ではVariable.get()でも取得可能だが、DAGのトップレベルでの使用はスケジューラのパース毎にDB問い合わせが発生するため非推奨

Params

DAG定義内に記述するパラメーターです。

@dag(params={"bucket": "default-bucket", "prefix": "data/"})
def my_pipeline():
    task = BashOperator(
        task_id="process",
        bash_command="aws s3 ls s3://{{ params.bucket }}/{{ params.prefix }}",
    )
  • DAG定義のコード内に記述する
  • DAGスコープ(他のDAGとは共有しない)
  • trigger_dag -cやAPI経由で実行時に上書き可能
  • Jinjaテンプレートでは{{ params.xxx }}で参照

両者の使い分け

特性 Variable Params
管理場所 メタデータDB(UIやCLIで管理) DAG定義内(コードに記述)
スコープ グローバル(DAG間共有) DAGスコープ
値の変更 コード変更不要 コード変更が必要(※実行時上書きは可能)
機密情報 Secrets Managerバックエンド対応 コードに含まれるため不向き

従来のMWAAでは両方が使えますが、MWAA Serverlessではどうなるのかを検証しました。

MWAA Serverlessで使えるテンプレート変数を検証してみた

MWAA ServerlessのYAMLワークフロー定義で、各種テンプレート変数が実際に展開されるかを検証しました。

検証方法

SqsPublishOperatorのmessage_contentにテンプレート変数を埋め込み、送信されたSQSメッセージの内容から展開結果を確認しました。

# テスト用ワークフロー定義
test_var:
  dag_id: test_var
  schedule: null
  default_args:
    owner: airflow
    start_date: "2024-01-01"
  tasks:
    publish_test:
      operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
      task_id: publish_test
      sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
      message_content: '{"value":"{{ テスト対象のテンプレート変数 }}"}'

ワークフローを実行し、SQSキューに送信されたメッセージの中身を確認します。変数が正しく展開されていれば値が入り、サポートされていなければエラーでワークフロー自体が失敗します。

検証結果

{{ params }} → 使える

YAML定義内のparamsで定義した値は、{{ params.xxx }}で正常に展開されます。

test_var_params:
  dag_id: test_var_params
  schedule: null
  default_args:
    owner: airflow
    start_date: "2024-01-01"
  params:
    greeting: "hello-from-params"
    target_bucket: "my-params-bucket"
  tasks:
    publish_params_test:
      operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
      task_id: publish_params_test
      sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
      message_content: '{"greeting":"{{ params.greeting }}","bucket":"{{ params.target_bucket }}","ds":"{{ ds }}"}'

送信されたSQSメッセージはこちらです。

{
  "greeting": "hello-from-params",
  "bucket": "my-params-bucket",
  "ds": "2026-03-04"
}

{{ params.greeting }}hello-from-paramsに、{{ params.target_bucket }}my-params-bucketに正しく展開されています。{{ ds }}(DAG実行日)も問題なく展開されました。

{{ var.value.xxx }} → 使えない

従来のAirflowと同じように{{ var.value.xxx }}が使えるか検証しました。

test_var_value:
  dag_id: test_var_value
  schedule: null
  default_args:
    owner: airflow
    start_date: "2024-01-01"
  tasks:
    publish_var_value_test:
      operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
      task_id: publish_var_value_test
      sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
      message_content: '{"value":"{{ var.value.test_var }}"}'

結果はFAILEDでした。

jinja2.exceptions.UndefinedError: 'var' is undefined

タスクは実行されず、テンプレート展開の段階でエラーになりました。Jinjaのテンプレートコンテキストにvarオブジェクトが存在しないためです。

{{ var.json.xxx }} → 使えない

message_content: '{"bucket":"{{ var.json.test_json_var.bucket_name }}"}'

こちらも同じく'var' is undefinedエラーでFAILEDです。

varオブジェクト自体がテンプレートコンテキストにないため、var.valuevar.jsonはどちらも使えません。

{{ ti }} → 一部使える(SafeTaskInstance)

{{ ti }}は使えますが、標準のAirflow TaskInstanceではなくSafeTaskInstanceという制限付きのラッパオブジェクトです。

message_content: '{"task_id":"{{ ti.task_id }}","dag_id":"{{ ti.dag_id }}"}'

送信されたSQSメッセージはこちらです。

{
  "task_id": "publish_ti_test",
  "dag_id": "ex_3e5d20aa-d1fd-4c45-b636-b327eb411736",
  "ds": "2026-03-04"
}

ti.task_idは期待したとおりですが、ti.dag_idは定義したtest_var_tiではなくMWAA Serverless内部のID(ex_3e5d20aa-...)が返りました。

一方、ti.run_idti.try_numberを参照するとエラーになります。

jinja2.exceptions.UndefinedError: 'yaml_dag_handler.driver.jinja_context.SafeTaskInstance object' has no attribute 'run_id'

エラーメッセージから、MWAA ServerlessではTaskInstanceSafeTaskInstanceでラップして公開する属性を制限していることがわかります。

ti.xcom_pull() → 使える

タスク間のデータ連携に使うxcom_pull()は動作しました。S3ListOperatorの結果をxcom_pull経由で次のタスクから参照する2タスク構成で確認しています。

tasks:
  list_s3:
    operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
    task_id: list_s3
    bucket: my-bucket
    prefix: "workflows/"
  publish_result:
    operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
    task_id: publish_result
    sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
    message_content: '{"xcom_result":"{{ ti.xcom_pull(task_ids="list_s3") }}"}'
    dependencies:
      - list_s3
{
  "xcom_result": "['workflows/sqs-sensor-basic.yaml', 'workflows/test-var-params.yaml', ...]"
}

S3ListOperatorの戻り値がXCom経由で正しく取得できています。

SafeTaskInstance の属性まとめ

属性/メソッド 結果 備考
ti.task_id 使える タスクIDが返る
ti.dag_id 使える ただし内部生成のIDが返る(ユーザー定義のdag_idではない)
ti.xcom_pull() 使える タスク間データ連携が可能
ti.run_id 使えない SafeTaskInstanceに属性なし
ti.try_number 使えない 同上
ti.execution_date 使えない 同上

{{ macros }} → 使える(ただしサンドボックス制限あり)

macrosパッケージの関数をいくつか検証しました。

# ds_add: 日付加減算
message_content: '{"yesterday":"{{ macros.ds_add(ds, -1) }}","tomorrow":"{{ macros.ds_add(ds, 1) }}"}'
{"yesterday": "2026-03-03", "tomorrow": "2026-03-05"}
# ds_format: フォーマット変換、random: ランダム値
message_content: >-
  {"ds_format":"{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}","random":"{{ macros.random() }}"}
{"ds_format": "2026/03/04", "random": "0.7386899833460479"}
# datetime.now(), timedelta
message_content: >-
  {"datetime_now":"{{ macros.datetime.now() }}","timedelta":"{{ macros.timedelta(days=7) }}"}
{"datetime_now": "2026-03-04 12:31:34.399187", "timedelta": "7 days, 0:00:00"}

注意点として、macros.datetimeは標準Airflowのdatetimeモジュールではなくdatetime.datetimeクラスそのものです。また、返されたオブジェクトの.strftime()メソッドはJinjaサンドボックスでブロックされます。

jinja2.exceptions.SecurityError: access to attribute 'strftime' of 'datetime' object is unsafe.

日付フォーマットの変換にはmacros.ds_format()を使う必要があります。

macrosの対応まとめ

関数/モジュール 結果 備考
macros.ds_add() 使える 日付の加減算
macros.ds_format() 使える 日付フォーマット変換
macros.random() 使える ランダム値生成
macros.datetime.now() 使える ただし.strftime()はSecurityErrorでブロック
macros.timedelta() 使える タイムデルタ生成

以上の検証から、{{ params }}{{ ti }}(SafeTaskInstanceの範囲内)、{{ macros }}は使えることがわかりました。一方で{{ var.value }}{{ var.json }}は使えません。

なぜVariableが使えないのか

MWAA Serverlessは従来のMWAAとアーキテクチャが大きく異なります。

項目 従来のMWAA MWAA Serverless
メタデータDB Aurora PostgreSQL 管理不要(抽象化)
Airflow UI あり なし
DAG定義 Python YAMLのみ
スケジューラ Airflowスケジューラ(常時稼働) EventBridge Scheduler
Airflow設定 airflow.cfg相当を設定可能 設定項目が限定的
aws_conn_id ユーザーが管理 サービスが制御(IAMロールベース)

Airflowのvarテンプレート変数は、デフォルトではメタデータベース内のvariableテーブルから値を取得します。MWAA Serverlessではメタデータベースの管理が完全に抽象化されており、Variableの登録手段(Airflow UI、CLI、REST API)がありません。そのためvarオブジェクト自体がテンプレートコンテキストに注入されていないと考えられます。

従来のMWAAではsecrets.backendを設定してSecrets Managerなどの外部バックエンドからVariableを取得できました。しかしMWAA Serverlessではそのような設定手段も提供されていません。

公式ドキュメントのサポートされるJinjaテンプレート変数にもvarは記載されていません。ただしこのページには「Apache Airflowに付属するJinjaテンプレートをサポート」という記述があります。一見するとすべてのAirflow標準テンプレート変数が使えるように読めますが、実際には限定されたサブセットのみのサポートです。

Connectionの扱いも変わっている

なお、Variableだけでなく Connectionの扱いも変わっています公式ドキュメントによるとaws_conn_idは「Controlled by service」で、AWSサービスへのアクセスはワークフローに紐づくIAM実行ロールで制御されます。ユーザーがConnectionを登録するしくみは提供されていません。非AWSサービスへの接続が必要な場合はLambdaやECS経由の間接実行になります。

公式な移行ガイドはまだない

2026年3月時点で、AWSはVariableの移行に関する公式ガイドを提供していません。

確認した情報源は次のとおりです。

変換ツールはVariable.get()を含むDAGを変換できない

公式の変換ツール(python-to-yaml-dag-converter-mwaa-serverless)を使って、Variable.get()を含むDAGを実際に変換してみました。次の例はDAGのトップレベルでVariable.get()を呼んでいるケースです。Airflowでは非推奨のパターンですが、既存のDAGでは見かけることがあります。

# DAGトップレベルでVariable.get()を呼ぶ非推奨パターン
from airflow.models import Variable

bucket = Variable.get("data_bucket")
config = Variable.get("pipeline_config", deserialize_json=True)

with DAG(dag_id="dag_with_variable", schedule="@daily", ...):
    list_files = S3ListOperator(task_id="list_files", bucket=bucket, prefix=config["prefix"])
$ dag-converter convert dag_with_variable.py

結果は 変換失敗 です。

KeyError: 'Variable data_bucket does not exist'
Exception: Failed to generate dag object for file dag_with_variable.py

変換ツールは内部でDagBagを使ってDAGファイルをPythonとして実行します。上記の例ではVariable.get("data_bucket")がモジュールのトップレベルで呼ばれているため、ファイルの読み込み時にそのまま実行されます。Variableクラス自体はインポートできますが、キーdata_bucketに対応する値がメタデータDBに登録されていないためKeyErrorで失敗します。

仮にローカルのAirflow環境に変数を登録しておけば変換自体は成功しますが、その場合は変数の値がYAMLにハードコードされるだけです。paramsへの変換や{{ var.value }}の検出・警告といった処理は行われません。いずれにしても Variable.get()を使っているDAGの変換は、手動での事前対応が必要 です。

Jinjaテンプレートの {{ var.value }} は警告なくスルーされる

一方、Jinjaテンプレート内で{{ var.value.xxx }}を使っているケースも検証しました。

# Jinjaテンプレート内で var を使っているDAG
publish = SqsPublishOperator(
    task_id="publish_result",
    sqs_queue="https://sqs...",
    message_content='{"bucket":"{{ var.value.data_bucket }}"}',
)

この場合、変換は成功 します。

# 変換後のYAML — {{ var.value.data_bucket }} がそのまま残る
message_content: '{"bucket":"{{ var.value.data_bucket }}"}'

Jinjaテンプレートは変換時点ではただの文字列ですので、変換ツールはエラーや警告を出せません。問題が発覚するのは MWAA Serverlessでワークフローを実行したとき です。実行時に'var' is undefinedエラーでタスクが失敗してはじめて気付くことになります。

MWAA Serverlessでのパラメーター管理

Variableが使えないMWAA Serverlessでは、パラメーター管理の手段はparamsに限られます。ただしparamsはもともとVariableとは異なる用途の機能です。Variableが担っていた役割をすべてカバーできるわけではありません。

paramsではカバーできないVariableの機能

Variableの機能 paramsでの実現可否
DAGコードを変更せずに値を変更 --override-parametersで部分的に可能。ただしスケジュール実行時はYAML再アップロードが必要
DAG間で設定値を共有 不可能 — DAGごとに個別定義が必要
UIから管理 不可能 — Airflow UIが存在しない
Secrets Managerで機密情報管理 不可能 — YAML/API上に記述されるため不向き
動的な値の取得(実行時にDB参照) 不可能 — params値はDAG定義時に固定

YAML定義内にデフォルト値を設定する

my_pipeline:
  dag_id: my_pipeline
  schedule: "0 2 * * *"
  default_args:
    owner: airflow
    start_date: "2024-01-01"
  params:
    data_bucket: "default-data-bucket"
    output_prefix: "output/daily"
    max_records: "10000"
  tasks:
    list_files:
      operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
      task_id: list_files
      bucket: "{{ params.data_bucket }}"
      prefix: "{{ params.output_prefix }}"

実行時にパラメーターを上書きする

StartWorkflowRun APIの--override-parametersを使うと、YAML定義内のparamsを実行時に上書きできます。

aws mwaa-serverless start-workflow-run \
    --workflow-arn "arn:aws:airflow-serverless:us-east-1:123456789012:workflow/my-pipeline-xxx" \
    --override-parameters '{"data_bucket":"prod-data-bucket","output_prefix":"prod/output"}'

実際に検証してみました。YAML定義ではparams.greeting"hello-from-params"を設定し、--override-parametersで上書きしてみます。

// YAML定義のparamsで実行した場合
{"greeting": "hello-from-params", "bucket": "my-params-bucket"}

// override-parametersで上書きした場合
{"greeting": "overridden-greeting", "bucket": "overridden-bucket"}

ばっちり上書きされていますね。

既存DAGからの移行パターン

公式の移行ガイドがない以上、Variableを使っている既存DAGをMWAA Serverlessに移行する場合は、利用者自身でパターンに応じた書き換えが必要です。典型的なパターンを整理してみました。

なお、すべてのパターンでVariableの利点(DAG間共有、UIからの管理、コード変更なしの値変更)は失われます。移行可否の判断材料としてください。

パターン1: {{ var.value }}をparamsに書き換える

たとえばこんなDAGがあったとします。

task = S3ListOperator(
    task_id="list_files",
    bucket="{{ var.value.data_bucket }}",
    prefix="{{ var.value.output_prefix }}",
)

MWAA Serverlessでは次のようにYAML定義のparamsで書き換えます。

my_pipeline:
  dag_id: my_pipeline
  params:
    data_bucket: "my-data-bucket"
    output_prefix: "output/"
  tasks:
    list_files:
      operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
      task_id: list_files
      bucket: "{{ params.data_bucket }}"
      prefix: "{{ params.output_prefix }}"

パターン2: {{ var.json }}をparamsに展開する

JSON形式のVariableをJinjaテンプレートで参照していた場合は、フラットなparamsに展開します。

task = GlueJobOperator(
    task_id="transform",
    job_name="my-etl",
    script_args={
        "--source_bucket": "{{ var.json.pipeline_config.bucket }}",
        "--output_format": "{{ var.json.pipeline_config.format }}",
    },
)

これをYAMLに変換するとこうなります。

my_pipeline:
  dag_id: my_pipeline
  params:
    source_bucket: "xxx"
    output_format: "parquet"
  tasks:
    transform:
      operator: airflow.providers.amazon.aws.operators.glue.GlueJobOperator
      task_id: transform
      job_name: my-etl
      script_args:
        "--source_bucket": "{{ params.source_bucket }}"
        "--output_format": "{{ params.output_format }}"

パターン3: 環境別の設定管理

環境ごとにVariableの値を変えて運用していた場合は、ワークフロー起動側で--override-parametersを使い分けます。

たとえば従来のMWAAではこう書いていたとします。

# Airflow UI や Secrets Manager で環境ごとに Variable を設定
# staging: data_bucket = "staging-bucket"
# production: data_bucket = "prod-bucket"
task = S3ListOperator(
    task_id="list_files",
    bucket="{{ var.value.data_bucket }}",
)

MWAA Serverlessでは、YAML定義にはデフォルト値を設定しておきます。

# YAML定義にはデフォルト値を設定
my_pipeline:
  params:
    data_bucket: "default-bucket"

そして環境別にパラメーターを上書きして実行します。

# 環境別にパラメーターを上書きして実行
# staging
aws mwaa-serverless start-workflow-run \
    --workflow-arn "arn:aws:..." \
    --override-parameters '{"data_bucket":"staging-bucket"}'

# production
aws mwaa-serverless start-workflow-run \
    --workflow-arn "arn:aws:..." \
    --override-parameters '{"data_bucket":"prod-bucket"}'

スケジュール実行する場合は、環境ごとに異なるデフォルト値を設定したワークフローをデプロイするか、EventBridge SchedulerからLambda経由で start-workflow-run を呼び出してパラメーターを渡す構成で対応します。

パターン4: 機密情報の扱い

JinjaテンプレートやVariable.get()で機密情報(APIキー等)を取得していた場合は、Lambda経由の間接実行で対応します。

従来のMWAAでは、Jinjaテンプレートを使って次のように記述していました。

task = SimpleHttpOperator(
    task_id="call_api",
    headers={"Authorization": "Bearer {{ var.value.external_api_key }}"},
)

MWAA Serverlessでは、Lambda関数内でSecrets Managerから直接取得する設計に変更します。

# Lambda関数内でSecrets Managerから直接取得する設計に変更
my_pipeline:
  tasks:
    call_api:
      operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
      task_id: call_api
      function_name: call-external-api
      payload: '{"endpoint": "/data", "method": "GET"}'

Lambda関数内でboto3を使ってSecrets Managerから機密情報を取得すれば、YAML定義やパラメーターに機密情報を含めずに済みます。

パターン5: Lambda経由で外部パラメーターストアを参照する

Variableを「コード変更なしで設定値を変えたい」「DAG間で設定を共有したい」という用途で使っていた場合はどうでしょうか。SSM Parameter Storeなどの外部ストアとLambdaを組み合わせる方法があります。

なお、MWAA Serverlessのサポートオペレータ一覧には、外部ストアから設定値を読み取ってXComに渡すオペレータが存在しません。そのためLambda経由で取得します。

制約: 1つのLambdaから複数の値を個別に取り出せない

「Lambdaで複数パラメータをまとめて取得し、後続タスクに個別に渡す」のが理想です。しかしLambdaInvokeFunctionOperatorの戻り値はJSON文字列としてXComに格納されるため、dictキーアクセスやJSON解析が使えません。

# いずれも動作しない
{{ ti.xcom_pull(task_ids='get_config').data_bucket }}      → YAMLプリプロセッサでブロック
{{ ti.xcom_pull(task_ids='get_config')['data_bucket'] }}   → 実行時エラー(XCom値が文字列のため)
{{ ti.xcom_pull(task_ids='get_config') | fromjson }}       → No filter named 'fromjson'

標準Airflowではrender_template_as_native_objuser_defined_filtersで回避できます。しかしMWAA ServerlessではこれらのDAGレベルパラメーターが非サポートです。| splitフィルターもJinja2に存在せず、.split()メソッドはサンドボックスでブロックされます。

そのため、パラメーターごとにLambdaタスクを分ける 必要があります。

回避策: パラメーターごとにLambdaタスクを分け | replace でクオートを除去する

実際に検証したところ、同一のLambda関数をパラメーターごとに異なるペイロードで呼び出し、JinjaのreplaceフィルターでJSON文字列のクオートを除去するパターンが動作しました。

my_pipeline:
  tasks:
    get_bucket:
      operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
      task_id: get_bucket
      function_name: get-ssm-parameter
      payload: '{"key": "/myapp/data_bucket"}'
    get_prefix:
      operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
      task_id: get_prefix
      function_name: get-ssm-parameter
      payload: '{"key": "/myapp/output_prefix"}'
    list_files:
      operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
      task_id: list_files
      bucket: >-
        {{ ti.xcom_pull(task_ids='get_bucket') | replace('"', '') }}
      prefix: >-
        {{ ti.xcom_pull(task_ids='get_prefix') | replace('"', '') }}
      dependencies:
        - get_bucket
        - get_prefix

Lambda関数は1つだけ用意すれば済みます。呼び出し時のペイロードでキーを指定し、対応する値を返します。

import boto3

def handler(event, context):
    ssm = boto3.client("ssm")
    resp = ssm.get_parameter(Name=event["key"])
    return resp["Parameter"]["Value"]

LambdaInvokeFunctionOperatorの戻り値はJSON文字列としてXComに格納されるため、文字列値の場合は前後にクオート(")が付きます。| replace('"', '')でこのクオートを除去することで、後続のオペレータにクリーンな値を渡せます。

なお、このクオート問題はLambdaInvokeFunctionOperator固有の挙動です。S3ListOperator等の通常のオペレータではXCom値がPythonオブジェクトのまま保持されます。そのため| replaceは不要で、{{ ti.xcom_pull(task_ids='list_s3')[0] }}のようなブラケットアクセスも正常に動作します。

LambdaInvokeFunctionOperatorだけが特殊なのは、Lambdaランタイムが戻り値をJSON直列化するためです。オペレータがそれをresponse['Payload'].read().decode()で文字列として読み取ります。通常のオペレータはexecute()がPythonオブジェクトを直接返すため、XComの直列化・復元で型が保持されます。

このパターンを使えば、Lambda経由でSSM Parameter Storeから取得した値をAWSオペレータに直接渡せます。実際にS3ListOperatorでバケット内のファイル一覧を取得できることを確認済みです。

paramsと比べたメリットは次のとおりです。

  • YAML定義を変更せずにSSM Parameter Store側で値を変更できる
  • 複数のワークフロー間で同じパラメーターパスを参照して設定を共有できる
  • SecureStringパラメーターを使えば機密情報の管理にも対応可能

制約として、パラメーターごとにLambdaタスクが1つ必要 です。Lambda関数自体は1つで済みますが、ワークフロー定義上のタスク数はパラメーター数に比例して増えます。取得するパラメーターが多い場合は煩雑になるため、paramsへの直接記述と使い分けてください。

まとめ

検証結果

項目 結果
{{ params.xxx }} 使える
{{ ds }}, {{ ts }} 使える
{{ ti.task_id }}, {{ ti.dag_id }} 使える(dag_idは内部IDが返る)
{{ ti.xcom_pull() }} 使える
{{ ti.run_id }}, {{ ti.try_number }} 使えない(SafeTaskInstanceで制限)
{{ macros.ds_add() }} 使える
{{ var.value.xxx }} 使えない'var' is undefined
{{ var.json.xxx }} 使えない(同上)
--override-parameters による上書き 使える

移行における課題

MWAA Serverlessでは、Airflow Variableの扱いが従来のMWAAとは根本的に異なっています。

  • Variableの登録手段がない。メタデータベースは抽象化されており、secrets.backendでの外部バックエンドへの切り替えもできない
  • Connectionも同様に変更されており、aws_conn_idはIAMロールで制御される
  • Variableの移行方法について、AWSからの公式ガイドは2026年3月時点で提供されていない
  • 公式の変換ツールはVariable.get()を含むDAGを変換できない(DAGインポート時にVariable.get()が実行されエラーになる)

移行可否の判断基準

MWAA Serverlessへの移行を検討する際は、次の観点で既存DAGを評価してみてください。

移行しやすいのは次のようなケースです。

  • Variableの利用が限定的で、固定値への書き換えで済む
  • AWSオペレータのみを使用している
  • DAG間での設定値共有がない

一方で、次のようなケースだと移行がやっかいです。

  • 多数のDAG間でVariableを共有しており、一元管理が必要
  • Variable.get()を使った動的なDAG生成をしている
  • 機密情報をVariable経由で管理し、Secrets Managerバックエンドを活用している
  • 非AWSサービスへのConnectionに依存している

移行が困難なケースでは、従来のMWAA(標準版)の継続利用も選択肢に入れたうえで判断してください。

参考リンク

この記事をシェアする

FacebookHatena blogX

関連記事