Amazon MWAA Serverlessでテンプレート経由でLambdaの複数戻り値を取り出す3つの回避策
こんにちは。サービス開発室の武田です。
前回の記事では、MWAA ServerlessでAirflow Variableが使えない問題と、その移行パターンを紹介しました。その中で「Lambda経由でSSM Parameter Storeから値を取得する」パターンを紹介しましたが、パラメーターごとにLambdaタスクを分ける必要がある という制約がありました。
今回はこの制約をもう少し深掘りして、MWAA ServerlessのJinjaテンプレートでXCom値に対してどんな操作が許可されているのか、ひととおり検証してみました。
XCom値の型はオペレータによって異なる
MWAA ServerlessでのXCom値の型は、オペレータによって異なります。この違いが、テンプレートから値を取り出す際の制約に直結します。
実際にいくつかのオペレータでXCom値の型を検証しました。
| オペレータ | XCom値の型 | is_mapping |
is_string |
備考 |
|---|---|---|---|---|
| S3ListOperator | リスト | False | False | |
| SqsPublishOperator | 辞書 | True | False | |
| LambdaInvokeFunctionOperator | 文字列 | False | True | |
| SqsSensor | リスト(辞書のリスト) | — | — | key='messages'で取得(デフォルトキーではNone) |
S3ListOperatorやSqsPublishOperatorのようにexecute()メソッドがPythonオブジェクトを直接返すオペレータは、XComを経由しても型が保持されます。一方、LambdaInvokeFunctionOperatorはLambdaランタイムからのレスポンスをresponse['Payload'].read().decode()で読み取ります。そのため、Lambdaが辞書を返してもXComには文字列として格納されます。
なお、SqsSensorはデフォルトのreturn_valueキーではなくmessagesキーでXComにpushします。取得するにはxcom_pull(task_ids='...', key='messages')の指定が必要です。
MWAA Serverlessのテンプレートにおけるアクセス制限
標準のJinja2では、ドット記法(obj.key)とブラケット記法(obj['key'])は等価です。Jinja2はobj.keyを評価するとき、まずgetattr(obj, 'key')を試し、失敗するとobj['key']にフォールバックします。そのため、辞書に対して{{ mydict.bucket }}と{{ mydict['bucket'] }}は同じ結果になります。
しかし、MWAA Serverlessではこの等価性が成り立ちません。ドット記法に制限がかかっています。
ドットアクセスは組込み変数以外ブロックされる
params、ti、macros等の組込みテンプレート変数に対するドットアクセスは正常に動作します。
# これらはすべて動作する
message_content: '{"greeting":"{{ params.greeting }}","task":"{{ ti.task_id }}","yesterday":"{{ macros.ds_add(ds, -1) }}"}'
しかし、それ以外の変数に対するドットアクセスはすべてブロックされます。{% set %}で変数に代入しても回避できません。
# NG — xcom_pull() の戻り値に対するドットアクセス
bucket: "{{ ti.xcom_pull(task_ids='get_config').data_bucket }}"
# NG — {% set %} で変数に代入してもドットアクセスはブロック
bucket: >-
{%- set result = ti.xcom_pull(task_ids='send_message') -%}
{{ result.MessageId }}
# NG — メソッド呼び出し構文でも同様
bucket: >-
{%- set raw = ti.xcom_pull(task_ids='get_config') -%}
{{ raw.split(',') }}
いずれも同じエラーになります。
builtins.AttributeError: 'str' object has no attribute 'task_id'
要求した属性名(.data_bucket、.MessageId、.split)ではなくtask_idのエラーが出るのはちょっと不思議です。標準のJinja2であればこのようなエラーにはなりません。MWAA Serverless固有の処理層がテンプレートの展開前に介在していると考えられますが、具体的な実装は公開情報からは不明です。
なお、大文字・小文字は関係なく、.Bodyと.bodyで同じ結果でした。
ブラケットアクセスは制限なく動作する
ブラケット記法はドット記法とは対照的に、制限なく動作します。{% set %}も不要です。
# OK — リスト型XComに対する整数インデックス
first_file: "{{ ti.xcom_pull(task_ids='list_s3')[0] }}"
# OK — 辞書型XComに対する文字列キー
msg_id: "{{ ti.xcom_pull(task_ids='send_message')['MessageId'] }}"
標準のJinja2では{{ result.MessageId }}と{{ result['MessageId'] }}は同じ意味です。しかしMWAA Serverlessでは ブラケット記法のみが使えます。
実際にSqsPublishOperatorのXCom値(辞書型)に対して['MessageId']でアクセスし、値を取り出せることを確認しました。
アクセス制限のまとめ
| アクセス方法 | 動作 | 例 |
|---|---|---|
.xxx(ドットアクセス) |
組込み変数のみ | params.greeting、ti.task_id |
[0](整数インデックス) |
常に動作 | xcom_pull(...)[0] |
['key'](文字列キー) |
常に動作 | xcom_pull(...)['MessageId'] |
[0:100](スライス) |
常に動作 | 文字列の部分取得 |
| filter(フィルター) |
組込みフィルターのみ | | trim、| replace |
辞書型XComならブラケットアクセスで値を取り出せる
ここでのポイントは、XCom値が辞書型であれば['key']で個別の値を取り出せる ということです。
SqsPublishOperatorで実際に検証しました。SqsPublishOperatorのexecute()メソッドはboto3のsend_messageレスポンス(辞書)をそのまま返すため、XComに辞書型が格納されます。
tasks:
send_message:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: send_message
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
message_content: '{"hello":"world"}'
check_result:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: check_result
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/test-queue
message_content: >-
{"msg_id":"{{ ti.xcom_pull(task_ids='send_message')['MessageId'] }}"}
dependencies:
- send_message
{"msg_id": "59bd8a37-812d-43b7-84d8-b79cba2f0028"}
['MessageId']で辞書から値を取り出せています。{% set %}を使えば複数のキーにもアクセスできます。
message_content: >-
{%- set result = ti.xcom_pull(task_ids='send_message') -%}
{"msg_id":"{{ result['MessageId'] }}","md5":"{{ result['MD5OfMessageBody'] }}"}
{"msg_id": "a033bdc1-0fc0-420f-9d9b-4bc9de9afe84", "md5": "fbc24bcc7a1794758fc1327fcfebdaf6"}
LambdaInvokeFunctionOperatorの壁
では、Lambdaが返す複数の値を['key']で取り出せないのかというと、残念ながら できません。
Lambdaが辞書{"data_bucket": "my-bucket", "output_prefix": "data/"}を返したとします。しかしLambdaInvokeFunctionOperatorがresponse['Payload'].read().decode()で文字列に変換してしまいます。
XComに格納されるのは文字列'{"data_bucket": "my-bucket", "output_prefix": "data/"}'です。
# NG — XCom値が文字列なので['data_bucket']はTypeError
bucket: "{{ ti.xcom_pull(task_ids='get_config')['data_bucket'] }}"
実機で検証したところ、Lambdaタスク自体はSUCCESSですが、後続タスクで['data_bucket']にアクセスした時点でランタイムエラーになりました。
文字列をテンプレート内で辞書に変換する手段もありません。| fromjsonフィルターは存在せず、.split()もドットアクセスのためブロックされます。
render_template_as_native_obj も効かない
Airflow 2.xでは、DAGレベルで render_template_as_native_obj=True を設定するとテンプレートの戻り値をPythonネイティブ型として扱えます。さらに PR #60619 により、オペレータレベルでも render_template_as_native_obj を設定できるようになりました。
これを使えば文字列がネイティブ型に変換されるのではないか、と期待しましたが、MWAA Serverlessでは 効果がありません でした。
tasks:
get_config:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: get_config
function_name: my-function
payload: '{}'
publish_result:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: publish_result
sqs_queue: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
render_template_as_native_obj: true
message_content: >-
{"bucket":"{{ ti.xcom_pull(task_ids='get_config')['data_bucket'] }}"}
dependencies:
- get_config
render_template_as_native_obj: true を付けても、XCom値は文字列のままで ['data_bucket'] アクセスはランタイムエラーになります。実際、render_template_as_native_obj はMWAA Serverlessのサポートされていないパラメーターとしてドキュメントに記載されています。
また、Airflow本体では以前から利用可能な macros.json.loads() もMWAA Serverlessでは動作しませんでした。user_defined_filters や user_defined_macros もサポート対象外です。テンプレート内でJSON文字列を辞書に変換する手段は、現時点では見つかりませんでした。
LambdaInvokeFunctionOperatorだけが特殊な理由
この問題はLambdaInvokeFunctionOperator固有の挙動です。
- S3ListOperator —
execute()がPythonのリストを直接返す → XComにリスト型として格納 →[0]でアクセス可能 - SqsPublishOperator —
execute()がPythonの辞書を直接返す → XComに辞書型として格納 →['MessageId']でアクセス可能 - LambdaInvokeFunctionOperator —
execute()がPayload.read().decode()で文字列を返す → XComに文字列型で格納 → ブラケットアクセス不可
通常のオペレータはexecute()メソッドがPythonオブジェクトを直接返すため、XComの直列化・復元で型が保持されます。LambdaInvokeFunctionOperatorだけが、Lambdaランタイムとの間でJSON直列化→文字列化が発生するため、型情報が失われます。
回避策
パラメーターごとにLambdaタスクを分ける
前回の記事で紹介したとおり、もっともシンプルな回避策は パラメーターごとにLambdaタスクを分ける 方法です。
tasks:
get_bucket:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: get_bucket
function_name: get-ssm-parameter
payload: '{"key": "/myapp/data_bucket"}'
get_prefix:
operator: airflow.providers.amazon.aws.operators.lambda_function.LambdaInvokeFunctionOperator
task_id: get_prefix
function_name: get-ssm-parameter
payload: '{"key": "/myapp/output_prefix"}'
list_files:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
task_id: list_files
bucket: >-
{{ ti.xcom_pull(task_ids='get_bucket') | replace('"', '') }}
prefix: >-
{{ ti.xcom_pull(task_ids='get_prefix') | replace('"', '') }}
dependencies:
- get_bucket
- get_prefix
Lambda関数は1つだけ用意すれば済みます。呼び出し時のペイロードでキーを指定し、対応する値を返します。
import boto3
def handler(event, context):
ssm = boto3.client("ssm")
resp = ssm.get_parameter(Name=event["key"])
return resp["Parameter"]["Value"]
Lambdaが文字列値を返す場合、XComにはJSONクオート付き("my-bucket")で格納されます。| replace('"', '')でクオートを除去することで、後続のオペレータにクリーンな値を渡せます。
コスト面の考慮
MWAA Serverlessではタスクインスタンスごとに最低1分の課金が発生します。パラメーター数が多い場合、この方式ではタスク数に比例してコストが増えます。
| パラメーター数 | タスク数 | 最低課金時間 |
|---|---|---|
| 2個 | 2 | 120秒($0.003) |
| 5個 | 5 | 300秒($0.007) |
| 10個 | 10 | 600秒($0.013) |
パラメーター数が多い場合は、paramsへの直接記述 + --override-parametersでの上書きのほうが適切です。Lambda経由で外部ストアを参照する必要性自体を見直してみてください。
固定幅パディング + スライス
1回のLambda呼び出しで複数の値を取り出す方法として、Lambda側で各値を固定幅にパディングして結合し、テンプレート側でスライスして取り出す方法も検証してみました。
# Lambda: 各値を100文字に固定幅化して結合
return "".join(values[k].ljust(100) for k in keys)
# テンプレート: クオート除去 → スライス → 空白除去
bucket: >-
{%- set raw = ti.xcom_pull(task_ids='get_config') | replace('"', '') -%}
{{ raw[0:100] | trim }}
prefix: >-
{%- set raw = ti.xcom_pull(task_ids='get_config') | replace('"', '') -%}
{{ raw[100:200] | trim }}
LambdaInvokeFunctionOperatorのXCom値はJSONクオートで囲まれています(例: "my-bucket")。先に| replace('"', '')でクオートを除去してからスライスしてください。
かなりの力技ですが、ちゃんと動作します。ただし値に含まれる空白が| trimで除去される、pad_widthの管理が必要、位置ベースのマッピングで保守性が低いなどの制約はあります。パラメーター数が多くタスク課金を抑えたい場合の選択肢として参考にしてください。
Step Functions経由でLambdaを呼び出す
前述のとおり、XCom値がPythonの辞書型として格納されていれば['key']でのキーアクセスは正常に動作します。であれば、XComに辞書型を返してくれるオペレータ経由でLambdaを呼べばいいのでは?ということで、Step Functionsを使う方法を検証してみました。
ポイントはStepFunctionGetExecutionOutputOperatorの実装です。このオペレータは内部でjson.loads()を実行するため、XComに辞書型として格納されます。DescribeExecution APIのoutputフィールドは常にJSON文字列であることが仕様で保証されており、安全にjson.loads()を適用できます。
まず、Lambdaを呼び出すだけのシンプルなStep Functionsステートマシンを作成します。
{
"StartAt": "InvokeLambda",
"States": {
"InvokeLambda": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:get-config",
"End": true
}
}
}
YAMLワークフロー定義では、ステートマシンの起動→完了待ち→出力取得の3ステップで構成します。
tasks:
run_sfn:
operator: airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator
task_id: run_sfn
state_machine_arn: arn:aws:states:us-east-1:123456789012:stateMachine:get-config
wait_sfn:
operator: airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor
task_id: wait_sfn
execution_arn: "{{ ti.xcom_pull(task_ids='run_sfn') }}"
dependencies:
- run_sfn
get_output:
operator: airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator
task_id: get_output
execution_arn: "{{ ti.xcom_pull(task_ids='run_sfn') }}"
dependencies:
- wait_sfn
use_config:
operator: airflow.providers.amazon.aws.operators.s3.S3ListOperator
task_id: use_config
bucket: "{{ ti.xcom_pull(task_ids='get_output')['data_bucket'] }}"
prefix: "{{ ti.xcom_pull(task_ids='get_output')['prefix'] }}"
dependencies:
- get_output
get_outputタスクのXCom値は辞書型になっているので、['data_bucket']や['prefix']で個別の値を取り出せます。
この方法はStep Functionsステートマシンの作成とIAMロールの追加設定が必要です。一方、Lambda関数をパラメーターごとに分割する必要がなく、1回の呼び出しで複数の値を取り出せます。Lambda関数がJSON辞書を返す必要があり、文字列を返す場合はStepFunctionGetExecutionOutputOperatorの戻り値も文字列となるため、キーアクセスはできません。
まとめ
MWAA ServerlessのJinjaテンプレートにおけるXCom値へのアクセスルールは次のとおりです。
- ドットアクセス(
.xxx)は組込み変数(params、ti、macros)以外すべてブロック。{% set %}での回避も不可 - ブラケットアクセス(
[0]、['key'])は制限なく動作。辞書型XComに対する['key']でのキーアクセスも可能 - フィルターは
| trim、| replace等の組込みフィルターのみ動作。| fromjson等は未登録
辞書型XComを返すオペレータ(SqsPublishOperator等)であれば['key']で個別の値を取り出せます。しかしLambdaInvokeFunctionOperatorはオペレータ内部で文字列に変換してしまうため、Lambdaが辞書を返してもテンプレートからはキーアクセスできません。ここがやっかいなポイントです。
この制約はLambdaInvokeFunctionOperatorの実装に起因するもので、テンプレート側では対処できません。回避策としては、パラメーターごとにLambdaタスクを分ける方法、固定幅パディング + スライス、Step Functions経由でLambdaを呼び出す方法があります。要件に応じて検討してみてください。
OSSおよびAWSへのフィードバック
今回の検証で、根本的な制約はLambdaInvokeFunctionOperatorがレスポンスを常に文字列で返す点だとわかりました。MWAA ServerlessのYAML定義ではサブクラス化やPythonOperatorでの後処理ができないので、オペレータ側で対応してもらうのがもっとも自然です。
そこで、Apache Airflowにdeserialize_payloadオプションを追加するPRを提出しました。
deserialize_payload: trueを指定すると、execute()内でjson.loads()が適用され、XComに辞書型として格納されます。後続タスクのテンプレートから['key']で直接値を取り出せるはずです。
PRの結果
しかし、このPRはAirflowメンテナーから won't fix としてクローズされました。
- この問題はMWAA Serverless固有であり、Airflow本体のオペレータコードで解決すべきではない
- オペレータレベルの
render_template_as_native_obj(PR #60619)で対応できるはず
これに対して、MWAA Serverlessでrender_template_as_native_objを実際に検証し、効果がないことを報告しました。しかし最終的に「MWAA Serverless固有の問題であるため、AWS側で対応すべき」という判断でクローズされています。
AWSサポートへのフィードバック
Airflow側での対応が難しいとわかったため、AWSサポートケースを起票しフィードバックしました。伝えた内容はざっくり次のとおりです。
- Airflow本体へのPRが「MWAA Serverless固有の問題」として却下されたこと
- 具体的なユースケース(Lambda経由でSSM Parameter Storeから複数パラメーターを取得する場面)を示し、現状では技術的に実現できないこと
- ワークアラウンド(パラメーターごとにLambdaタスクを分ける、固定幅パディング + スライス)は存在するが、いずれもシンプルとは言えないこと
MWAA Serverlessはまだ新しいサービスですので、今後の改善に期待したいところです。同じ制約で困っている方の参考になれば幸いです。





