[アップデート] Amazon MWAA ServerlessがEventBridgeへイベントを配信するようになったので動作確認してみた
こんにちは。サービス開発室の武田です。
Amazon MWAA Serverlessのワークフローとタスクの状態変化が、Amazon EventBridgeにイベントとして配信されるようになりました。
これまでMWAA Serverlessの実行状況を追うには、GetWorkflowRunをポーリングするかCloudWatch Logsを見るしかありませんでした。イベントが流れてくるなら、失敗時のアラートや後続パイプラインの起動をイベント駆動で組めます。
ただ、公式ドキュメントにはイベントの種類(detail-typeの一覧)しか書かれておらず、ペイロードの中身やイベントがいつ届くのかは記載がありません。そこで、実際にワークフローを動かして配信のされ方を観察してみました。本記事のペイロード・タイムスタンプはすべてap-northeast-1での実測値です。
先に結論
- イベントは
source: aws.airflow-serverlessで届く。ペイロードのdetailはワークフローARN・実行ID・タスクID・ステータスのみの最小構成で、エラーメッセージや試行回数のフィールドはない - 配信はリアルタイムではない。観測した範囲では、イベントは実行ごとに約2分間隔のタイミングでまとめて生成され、その間に通過した中間状態(Started、Scheduledなど)はイベントとして観測できなかった
- 終了イベント(Succeeded / Failed)は実際の完了時刻から最大2分強遅れて届いた。一方
StopWorkflowRunによるStoppedイベントは、今回の停止シナリオでは約1秒で届いた sourceだけのイベントパターンだと、CloudTrail経由の「AWS API Call via CloudTrail」イベント(CreateWorkflowなど)も同じルールにマッチする。detail-typeのprefixで絞るのが実用的
「失敗したら通知する」「成功したら次を起動する」という典型的な用途には十分使えます。ただし数十秒単位の即応性を期待する用途には向きません。
アップデートの概要
EventBridgeのイベントリファレンスによると、配信されるイベントは次の15種類です。detail-typeは「MWAA Serverless Workflow Run Queued」のように、対象と状態名をつなげた形式になっています。
- ワークフロー実行(7種類): Started/Queued/Running/Succeeded/Failed/Stopped/Timeout
- タスク(8種類): Queued/Scheduled/Upstream Failed/Running/Succeeded/Failed/Up For Retry/Timeout
配信タイプはDurableで、MWAA Serverlessが利用できる全リージョンで有効です。ただしEventBridge全般の話として、重複や順序の入れ替わりは起こり得ます。後続処理はworkflowRunIdやtaskInstanceIdをキーとした冪等な作りにしておくのが無難です。料金面では、EventBridgeへのAWS管理イベントの取り込みは無料です。ターゲット側(Lambda、CloudWatch Logsなど)のコストは別途確認してください。
なお、ドキュメントの特定イベント向けの例では、detail-typeの値が「MWAA Serverless Workflow Run State Change」になっています。これはページ上で赤字斜体に表示される、差し替え用のプレースホルダーです。実在するdetail-typeではないので、上記15種類のうち必要な名前に置き換えて使います。
検証の構成
観察用に、届いたイベントをそのままCloudWatch Logsに書き出すルールを用意しました。
# ロググループとEventBridgeからの書き込みを許可するリソースポリシー
aws logs create-log-group --log-group-name /aws/events/mwaa-serverless-test
aws logs put-resource-policy \
--policy-name mwaa-serverless-events-policy \
--policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": ["events.amazonaws.com", "delivery.logs.amazonaws.com"]},
"Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": "arn:aws:logs:ap-northeast-1:123456789012:log-group:/aws/events/*:*"
}]
}'
# sourceだけで拾う観察用ルール
aws events put-rule \
--name mwaa-serverless-all-events \
--event-pattern '{"source": ["aws.airflow-serverless"]}' \
--state ENABLED
aws events put-targets \
--rule mwaa-serverless-all-events \
--targets '[{"Id": "cwlogs", "Arn": "arn:aws:logs:ap-northeast-1:123456789012:log-group:/aws/events/mwaa-serverless-test"}]'
ワークフローは3パターン用意しました。いずれもSqsPublishOperator/SqsSensorを使ったYAML定義です。
- 成功パターン。SQS送信タスク2つを直列実行(publish_1 >> publish_2)
- 失敗・リトライパターン。存在しないキューへの送信で失敗するタスク(
retries: 1)と、その下流タスク - 停止パターン。空のキューを待ち続けるセンサーを起動し、
StopWorkflowRunで停止
失敗・リトライ用の定義だけ載せておきます。
eb_test_failure:
dag_id: eb_test_failure
schedule: null
default_args:
owner: airflow
start_date: "2024-01-01"
retries: 1
retry_delay: 30
tasks:
fail_task:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: fail_task
sqs_queue: https://sqs.ap-northeast-1.amazonaws.com/123456789012/nonexistent-queue
message_content: '{"task": "fail_task"}'
downstream_task:
operator: airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator
task_id: downstream_task
sqs_queue: https://sqs.ap-northeast-1.amazonaws.com/123456789012/mwaa-eb-test-queue
message_content: '{"task": "downstream_task"}'
dependencies:
- fail_task
余談ですが、default_argsに書けるキーはバリデーションされており、dag-factoryで使われるretry_delay_secはエラーになりました。retry_delay(秒数の整数)を使います。
イベントのペイロード
まずワークフロー実行のイベントです。成功時のものを載せます。
{
"version": "0",
"id": "b4cc37e6-1fa1-58a6-386f-a2303df03968",
"detail-type": "MWAA Serverless Workflow Run Succeeded",
"source": "aws.airflow-serverless",
"account": "123456789012",
"time": "2026-06-15T12:04:40Z",
"region": "ap-northeast-1",
"resources": [
"arn:aws:airflow-serverless:ap-northeast-1:123456789012:workflow/eb_test_success-8l0DVgxQQB"
],
"detail": {
"accountId": "123456789012",
"workflowRunId": "cGaCaYWbVldeYyD",
"workflowArn": "arn:aws:airflow-serverless:ap-northeast-1:123456789012:workflow/eb_test_success-8l0DVgxQQB",
"status": "SUCCESS",
"workflowId": "eb_test_success-8l0DVgxQQB"
}
}
タスクのイベントにはtaskIdとtaskInstanceIdが加わります。
{
"detail-type": "MWAA Serverless Task Failed",
"source": "aws.airflow-serverless",
"time": "2026-06-15T12:04:25Z",
"resources": [
"arn:aws:airflow-serverless:ap-northeast-1:123456789012:workflow/eb_test_failure-iCzT4Wlk9A"
],
"detail": {
"accountId": "123456789012",
"workflowRunId": "lDJPUFqzILifKNx",
"workflowArn": "arn:aws:airflow-serverless:ap-northeast-1:123456789012:workflow/eb_test_failure-iCzT4Wlk9A",
"status": "FAILED",
"workflowId": "eb_test_failure-iCzT4Wlk9A",
"taskInstanceId": "ex_651ce903-bb4c-4831-8d41-dfad00b91859_fail_task_2",
"taskId": "fail_task"
}
}
detailに入るのは次のフィールドだけです。
| フィールド | 内容 |
|---|---|
| accountId | アカウントID |
| workflowArn / workflowId | ワークフローのARNとID |
| workflowRunId | 実行ID |
| status | そのイベント時点のステータス |
| taskId | タスクID(タスク系イベントのみ) |
| taskInstanceId | タスクインスタンスID(タスク系イベントのみ) |
注目したいのは 入っていない 情報のほうです。エラーメッセージ、試行回数、実行時間などはありません。今回観測したID形式では、taskInstanceIdの末尾接尾辞(fail_task_1、fail_task_2)から試行回数を推測できました。とはいえID形式が仕様として保証されているわけではないので、ロジックで依存するのは避けたほうがよさそうです。また失敗の原因究明には結局タスクログが必要です。実際、GetWorkflowRunで取れるErrorMessageも汎用文言でした。「Workflow execution failed, please check the task execution logs for more details」とだけ返ってきます。
イベントはいつ届くのか
ここが今回もっとも興味深かったところです。3つのワークフローを同時に実行して、届いたイベントを時刻順に並べました。
時刻はイベント本文のtimeフィールドです。なお、各イベントのtimeとCloudWatch Logsへの到達時刻(ingestionTime)の差は全イベントで1秒以内だったため、この表は実質的に「ターゲットに届いた時刻」として読めます。つまり以降で見る遅れは、EventBridgeからターゲットへの配送遅延ではなく、イベントが生成されるタイミングそのものに由来します。
成功パターン(publish_1 >> publish_2)
| 時刻(UTC) | イベント |
|---|---|
| 12:00:39 | Workflow Run Queued |
| 12:02:39 | Workflow Run Running |
| 12:02:40 | Task Succeeded(publish_1)、Task Queued(publish_2) |
| 12:04:40 | Task Succeeded(publish_2)、Workflow Run Succeeded |
失敗・リトライパターン
| 時刻(UTC) | イベント |
|---|---|
| 12:00:25 | Workflow Run Queued |
| 12:02:25 | Workflow Run Running、Task Up For Retry(fail_task試行1)、Task Queued(fail_task試行2) |
| 12:04:25 | Task Failed(fail_task試行2)、Task Upstream Failed(downstream_task)、Workflow Run Failed |
停止パターン
| 時刻(UTC) | イベント |
|---|---|
| 12:00:33 | Workflow Run Queued |
| 12:02:33 | Workflow Run Running、Task Running(wait_forever) |
| 12:02:37 | Workflow Run Stopped(StopWorkflowRun呼び出しの約1秒後) |
| 12:04:33 | Task Succeeded(wait_forever) |
timeの値がワークフロー実行ごとに「初回 + 2分 + 4分」ときれいにそろっています。Stoppedを除き、イベントは約2分周期のタイミングでしか届いていません。再現性を確認するため成功パターンをもう一度実行しましたが、結果は同じでした(12:11:58 → 12:13:58 → 12:15:58)。状態遷移のたびにpush配信されるのではなく、定期的なスナップショットとして配信されているように見えます(あくまで観測からの推測です)。
この挙動が実用面に効いてくるのは次の2点です。
1. 中間状態のイベントは観測できないことがある
2分の間隔内に複数の状態遷移が起こると、今回の検証では各タイミングでの最新状態だけが観測されました。ドキュメントに記載のある15種類のうちWorkflow Run StartedとTask Scheduledは一度も観測できていません(Timeout系はシナリオを用意していないため対象外)。publish_1にいたってはQueued / Running すら出ず、最初のイベントがTask Succeededでした。
2. 終了イベントは実際の完了から遅れる
GetWorkflowRunが返す実際の時刻と、イベントの時刻を比べるとこうなりました。
| 項目 | 実際の時刻 | イベント時刻 | 遅延 |
|---|---|---|---|
| 成功ランの完了(CompletedOn) | 12:02:26 | 12:04:40 | 約2分14秒 |
| 失敗ランの完了(CompletedOn) | 12:02:50 | 12:04:25 | 約1分35秒 |
| 再実行した成功ランの完了(CompletedOn) | 12:13:45 | 12:15:58 | 約2分13秒 |
| 停止(StopWorkflowRun呼び出し) | 12:02:36 | 12:02:37 | 約1秒 |
成功ランは105秒で完了している(CompletedOn 12:02:26)のに、Succeededイベントが届いたのはその約2分14秒後でした。一方でStoppedだけは即時に届いており、生成のされ方が違うように見えます(停止シナリオは1回だけの観測なので、参考程度に受け取ってください)。
なお、StartWorkflowRunを呼んでから最初のQueuedイベントまでは7〜16秒でした。入口は速く、その後が2分刻みになるイメージです。
細かい観測メモ
検証中に気付いた点をいくつか挙げておきます。
1つ目は、停止したセンサータスクがTask Succeededになることです。停止パターンでは、StopWorkflowRunの約2分後に、停止したはずのセンサータスクのTask Succeededイベントが届きました。Workflow Run Stoppedの後にタスクの成功イベントが来るので、タスク単位で成否を判定するロジックだと、停止したのに成功とみなしてしまう恐れがあります。
2つ目は、CloudTrailイベントが混ざることです。{"source": ["aws.airflow-serverless"]}だけのパターンには、CloudTrail経由のAPI呼び出しイベントもマッチします。detail-typeが「AWS API Call via CloudTrail」のもので、CreateWorkflowやStartWorkflowRunの呼び出しが流れてきます。状態変化だけ拾いたい場合はdetail-typeも指定します。
最後に小ネタですが、一度も実行されなかったタスクのtaskInstanceIdは末尾が_0でした。Upstream Failedになったdownstream_taskのIDは..._downstream_task_0となっています。
使うときのイベントパターン
観察結果を踏まえると、実用ではdetail-typeをprefixで絞るのが扱いやすいです。ワークフロー実行のイベントだけ拾うならこうなります。
{
"source": ["aws.airflow-serverless"],
"detail-type": [{"prefix": "MWAA Serverless Workflow Run"}]
}
タスクイベントも含めて状態変化をすべて拾う場合は、prefixを並べてORにします。
{
"source": ["aws.airflow-serverless"],
"detail-type": [
{"prefix": "MWAA Serverless Workflow Run"},
{"prefix": "MWAA Serverless Task"}
]
}
失敗系の通知だけ欲しい場合は名前で列挙します。
{
"source": ["aws.airflow-serverless"],
"detail-type": [
"MWAA Serverless Workflow Run Failed",
"MWAA Serverless Workflow Run Timeout",
"MWAA Serverless Workflow Run Stopped"
]
}
ユースケースとしてはWhat's Newにも挙げられているとおり、失敗時のアラート、上流ワークフロー成功時の後続起動、終了状態や失敗履歴の保存あたりが素直です。MWAA Serverlessには他のワークフローの完了を待つ手段が乏しく、センサーで待つとその間のタスク実行時間が課金されます。Succeededイベント→Lambda→StartWorkflowRunという連携が組めるのは個人的にうれしいポイントです。
一方で、配信が約2分刻みである点は用途を選びます。「失敗から数秒以内に検知したい」「タスクの全状態遷移を漏れなく記録したい」という要件には現状のイベント配信は合いません。完了検知が2分遅れても困らない、日次バッチのオーケストレーションのような用途であれば十分実用的です。
まとめ
MWAA ServerlessのイベントがEventBridgeへ届くようになり、ポーリングなしで失敗通知やパイプライン連携を組めるようになりました。実測した範囲では次のとおりです。
- ペイロードは最小構成。エラー詳細はログやAPIで別途取得する
- 観測した範囲では、イベントは約2分刻みで生成され、短時間で通過した中間状態は観測できなかった。終了イベントは実完了から最大2分強遅れた
- 今回の停止シナリオでは、Stoppedイベントだけは即時に届いた
- 実用時は
detail-typeで絞り込む
ドキュメントに書かれていない挙動が多いので、組み込む際は自分のユースケースで配信タイミングを確認することをお勧めします。どなたかの参考になれば幸いです。







