Cloud Composer で Slack 通知が失敗した場合のフォールバックとして Amazon SES でメール通知する
はじめに
こんにちは!エノカワです。
Cloud Composer (Apache Airflow) を使用したデータパイプラインでは、DAG の実行状況を Slack に通知することが一般的です。
しかし、Slack 自体の障害やネットワークの問題により通知が失敗するケースも考えられます。
本記事では、Slack 通知が失敗した場合のフォールバックとして、Amazon SES を使ったメール通知を実装する方法をご紹介します。
AWS へのアクセスには、セキュリティを高めるため Workload Identity 連携を使用し、アクセスキーを使わずに実現します。
前回の Slack 通知実装のおさらい
以前、「Cloud Composer でDAGの実行状況をSlackに通知したい」という記事で、コールバック関数を使用した Slack 通知の実装方法を紹介しました。
今回は、この Slack 通知をベースに、失敗時のフォールバック機能を追加します。
今回やること
解決したい課題
Slack 通知に依存したワークフローでは、以下のリスクがあります。
- Slack サービスの一時的な障害
- ネットワーク接続の問題
- Webhook URL の設定ミスや期限切れ
これらの問題が発生した場合、重要なアラートを見逃してしまう可能性があります。
フォールバック設計
本記事では、Airflow の on_failure_callback を活用して、Slack 通知タスクが失敗した場合に Amazon SES 経由でメール通知を送信する仕組みを実装します。
AWS への認証には、アクセスキーを使わずに Workload Identity 連携を使用します。
これにより、セキュリティを高めつつ、キーのローテーションなどの運用負荷を削減できます。
環境作成
DAG を動かすために Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、環境名を設定して東京リージョンで作成しました。
設定はデフォルトのままとしていますが、サービスアカウントやネットワーク設定は必要に応じて調整してください。

また、Slack 通知と AWS SES 連携のために、以下の PyPI パッケージをインストールします。
apache-airflow-providers-slack: Slack 通知用boto3: AWS SES へのメール送信用

AWS 認証の設定(Workload Identity 連携)
Cloud Composer から AWS SES にアクセスするため、Workload Identity 連携 を設定します。
Workload Identity 連携は、Google Cloud のサービスアカウントが発行する ID トークンを使って、AWS の IAM Role を一時的に引き受ける仕組みです。アクセスキーを使わずにセキュアに AWS サービスにアクセスできます。
詳細な仕組みやメリットについては、下記の記事で詳しく解説されていますので、こちらもご参照ください。
Google Cloud で行う設定
1. サービスアカウントの数値 ID を取得
Cloud Composer が使用しているサービスアカウントの数値 ID を取得します。
# 数値 ID を取得
gcloud iam service-accounts describe SERVICE_ACCOUNT_EMAIL \
--format="value(uniqueId)"
この数値 ID は後で AWS IAM Role の信頼ポリシーで使用します。
AWS で行う設定
2. AWS IAM Role の作成
2.1. ロールの作成開始
AWS IAM コンソールで「ロールを作成」をクリックします。

2.2. 信頼されたエンティティタイプの選択
「ウェブアイデンティティ」を選択し、アイデンティティプロバイダーのドロップダウンで「Google」を選択します。Audience に作成するロールの ARN を入力し、次へ をクリックします。

Audience の値(例):
arn:aws:iam::AWS_ACCOUNT_ID:role/cloud-composer-ses-role
2.3. 許可の追加
「許可を追加」の画面では、ポリシーは選択せず、そのまま 次へ をクリックして次に進みます。

2.4. ロール名の設定
「ロール名」に任意の名前(例: cloud-composer-ses-role)を入力し、ロールを作成 をクリックしてロールを作成します。

2.5. ロールにインラインポリシーを追加
後述の DAG で boto3 の send_email() を実行する際に必要となるため、ses:SendEmail と ses:SendRawEmail の 2 アクションを許可するポリシーを指定します。
作成したロールを開き、SES 送信権限のインラインポリシーを追加します。
- ロールの「許可」タブで「許可を追加」→「インラインポリシーの作成」をクリック
- ポリシーエディタで「JSON」タブを選択し、以下を貼り付ける
- 「ポリシーの確認」でポリシー名(例:
cloud-composer-ses-send-policy)を入力して作成
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ses:SendEmail",
"ses:SendRawEmail"
],
"Resource": "*"
}
]
}

3. 信頼ポリシーの修正
作成したロールの信頼ポリシーを編集します。
3.1. 信頼ポリシーを編集
IAM コンソールで作成したロールを開き、信頼関係 タブを選択して、信頼ポリシーを編集 をクリックします。

3.2. 信頼ポリシーの設定
以下の内容に更新します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "accounts.google.com"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"accounts.google.com:oaud": "arn:aws:iam::AWS_ACCOUNT_ID:role/cloud-composer-ses-role",
"accounts.google.com:sub": "SERVICE_ACCOUNT_UNIQUE_ID"
}
}
}
]
}
JSON エディタに上記を貼り付けたら、ポリシーを更新 をクリックして変更を保存します。

設定値の説明:
| キー | 説明 | 値の例 |
|---|---|---|
Principal.Federated |
OIDC プロバイダー | accounts.google.com(固定) |
accounts.google.com:oaud |
ID トークンの audience | arn:aws:iam::AWS_ACCOUNT_ID:role/cloud-composer-ses-role |
accounts.google.com:sub |
サービスアカウントの数値 ID | SERVICE_ACCOUNT_UNIQUE_ID |
信頼ポリシーの各キーについて:
Principal.Federated: Google を IdP として使う場合はaccounts.google.com固定です。AWS の組み込み OIDC プロバイダーとして登録されているため、別途 IAM Identity Provider を作成する必要はありません。accounts.google.com:oaud: DAG が ID トークン取得時に指定する audience(このロールの ARN)と一致する必要があります。accounts.google.com:sub: このロールを引き受けられる Google Cloud サービスアカウントを、数値 ID で指定します。
Amazon SES の設定
Amazon SES でメール送信を行うための設定を行います。
詳細な手順については、以下の記事の Amazon SES セクションを参照してください。
Cloud Composer(Airflow UI)で行う設定
4. Airflow Variables の設定
Cloud Composer の Airflow UI で Variables を設定します。
上部ナビゲーションの Admin をクリックし、ドロップダウンから Variables を選択して Variables 一覧を開きます。

+(Add a new record)をクリックし、Key に Variable 名、Val に値を入力して Save で保存します。
下表の 4 つを登録し、一覧に 4 件表示されることを確認します。

| Variable 名 | 説明 | 値の例 |
|---|---|---|
aws_role_arn |
AWS IAM Role の ARN | arn:aws:iam::AWS_ACCOUNT_ID:role/cloud-composer-ses-role |
aws_role_session_name |
セッション名(任意) | cloud-composer-session |
ses_sender_email |
SES 送信元メールアドレス | sender@example.com |
ses_recipient_email |
通知先メールアドレス | recipient@example.com |
Workload Identity 関連の値について:
aws_role_arn: 前述の手順で作成した AWS IAM ロールの ARN を指定します。信頼ポリシーで Google を指定したロールであり、DAG が ID トークンを提示して引き受ける(AssumeRoleWithWebIdentity)対象のロールです。IAM コンソールのロール詳細画面で確認できます。aws_role_session_name: AssumeRoleWithWebIdentity 実行時のセッション名です。CloudTrail などの監査ログに記録される任意の文字列で、運用上識別しやすい名前(例:cloud-composer-session)を指定します。
メールアドレスについて:
ses_sender_email: Amazon SES で Verified identities として検証済みの送信元メールアドレスを指定します。サンドボックス環境では、検証済みのメールアドレスのみ送信元に指定できます。ses_recipient_email: アラートを受け取るメールアドレスを指定します。サンドボックス環境では受信先も検証済みである必要があります。本番リリース後は未検証のアドレスにも送信できます。
Airflow Connections の設定
Slack Webhook 用の Connection を設定します。
前回記事「Cloud Composer でDAGの実行状況をSlackに通知したい」と同様の設定です。
Admin をクリックし、ドロップダウンから Connections を選択して Connections 一覧を開きます。

+ をクリックして「Add Connection」画面を開き、下記の項目を入力して Save をクリックして保存します。

| 項目 | 値 |
|---|---|
| Connection Id | slack_webhook_conn |
| Connection Type | HTTP |
| Host | https://hooks.slack.com/services/ |
| Password | Webhook URL のトークン部分 |
Connections の各項目について:
- Connection Id: DAG 内の
slack_webhook_conn_idで参照する ID です。前回記事と揃えてslack_webhook_connにします。 - Connection Type: Slack Webhook では HTTP を選択します(Slack Webhook ではなく HTTP を使用)。
- Host: Slack Incoming Webhooks のベース URL(
https://hooks.slack.com/services/)を指定します。 - Password: Slack の Incoming Webhook を有効にしたときに発行される Webhook URL のうち、Host より後ろのトークン部分(
T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXXのような文字列)を指定します。
保存後、Connections 一覧に slack_webhook_conn が表示されることを確認します。

DAGを作成する
以下が、実際に動作検証を行ったDAGです。
前回記事「Cloud Composer でDAGの実行状況をSlackに通知したい」で紹介した Slack 通知をベースに、失敗時のフォールバックとして Amazon SES でメール通知を送信する機能を実装しています。
"""
Slack通知失敗時にAWS SESでバックアップメール通知を送るDAG
Workload Identity 連携で AWS 認証
"""
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
# AWS SES設定
AWS_REGION = "ap-northeast-1"
def get_aws_credentials_via_workload_identity() -> dict:
"""Workload Identity 連携で AWS 一時認証情報を取得"""
import boto3
from google.auth.transport.requests import Request
from google.oauth2 import id_token
# 実行時に Variable を取得
aws_role_arn = Variable.get("aws_role_arn")
aws_role_session_name = Variable.get(
"aws_role_session_name", default_var="cloud-composer-session"
)
# ID トークンを取得(audience = AWS Role ARN)
request = Request()
id_token_value = id_token.fetch_id_token(request, audience=aws_role_arn)
# AWS STS で一時認証情報を取得
sts_client = boto3.client("sts", region_name=AWS_REGION)
response = sts_client.assume_role_with_web_identity(
RoleArn=aws_role_arn,
RoleSessionName=aws_role_session_name,
WebIdentityToken=id_token_value,
)
credentials_dict = {
"AccessKeyId": response["Credentials"]["AccessKeyId"],
"SecretAccessKey": response["Credentials"]["SecretAccessKey"],
"SessionToken": response["Credentials"]["SessionToken"],
}
return credentials_dict
def send_email_via_ses(subject: str, body: str) -> bool:
"""AWS SES でメール送信(Workload Identity 連携使用)"""
import boto3
from botocore.exceptions import ClientError
# 実行時に Variable を取得
sender_email = Variable.get("ses_sender_email")
recipient_email = Variable.get("ses_recipient_email")
try:
# 一時認証情報を取得
aws_credentials = get_aws_credentials_via_workload_identity()
# SES クライアントを作成
ses_client = boto3.client(
"ses",
region_name=AWS_REGION,
aws_access_key_id=aws_credentials["AccessKeyId"],
aws_secret_access_key=aws_credentials["SecretAccessKey"],
aws_session_token=aws_credentials["SessionToken"],
)
# メール送信
ses_client.send_email(
Source=sender_email,
Destination={"ToAddresses": [recipient_email]},
Message={
"Subject": {"Data": subject, "Charset": "UTF-8"},
"Body": {"Text": {"Data": body, "Charset": "UTF-8"}},
},
)
return True
except ClientError:
return False
def on_failure_callback(context):
"""タスク失敗時のコールバック: メール通知を送信"""
task_instance = context["task_instance"]
dag_id = context["dag"].dag_id
task_id = task_instance.task_id
execution_date = context["execution_date"]
exception = context.get("exception", "Unknown error")
subject = f"[Airflow Alert] Task Failed: {dag_id}.{task_id}"
body = f"""
Airflow Task Failure Alert
DAG: {dag_id}
Task: {task_id}
Execution Date: {execution_date}
Error: {exception}
This email was sent because Slack notification failed.
"""
send_email_via_ses(subject, body)
default_args = {
"start_date": datetime(2026, 1, 1),
"retries": 0,
}
with DAG(
dag_id="slack_with_email_backup",
default_args=default_args,
description="Slack通知失敗時にSESでバックアップメール送信",
schedule_interval=None,
catchup=False,
) as dag:
task_sample = DummyOperator(task_id="sample_task")
task_slack_notify = SlackWebhookOperator(
task_id="slack_notify",
slack_webhook_conn_id="slack_webhook_conn",
message=":white_check_mark: DAG `slack_with_email_backup` completed successfully!",
on_failure_callback=on_failure_callback,
)
task_sample >> task_slack_notify
DAGコードの解説
このDAGは、Slack 通知タスクに on_failure_callback を設定し、失敗時に Amazon SES でメール通知を送信する構成です。
Workload Identity による認証(get_aws_credentials_via_workload_identity)
-
ID トークンの取得
google.oauth2.id_token.fetch_id_token()を使用audienceには Airflow Variables で設定した AWS Role ARN を指定- Cloud Composer 環境では
google.auth.default()の呼び出しは不要(fetch_id_tokenがデフォルト認証情報を使用)
-
一時認証情報の取得
- AWS STS の
assume_role_with_web_identity()で ID トークンを提示し、一時認証情報(AccessKeyId, SecretAccessKey, SessionToken)を取得 - 取得した認証情報は SES クライアント作成時に使用
- AWS STS の
メール送信(send_email_via_ses)
-
SES クライアントの作成
get_aws_credentials_via_workload_identity()で取得した一時認証情報で boto3 の SES クライアントを作成- SessionToken を忘れずに指定する(一時認証情報では必須)
-
メール送信
- 送信元・送信先は Airflow Variables(
ses_sender_email,ses_recipient_email)から取得 ses_client.send_email()でメールを送信
- 送信元・送信先は Airflow Variables(
失敗時コールバック(on_failure_callback)
- コールバックの役割
- Slack 通知タスク(
slack_notify)が失敗した場合にのみ呼び出される contextから DAG ID・タスク ID・実行日時・例外内容を取得し、メール本文に含めて送信send_email_via_ses()を呼び出して Amazon SES 経由でアラートメールを送信
- Slack 通知タスク(
タスク構成
-
sample_task
DummyOperatorを使用したプレースホルダータスク- 正常に完了すると次タスクへ進む
-
slack_notify
SlackWebhookOperatorを使用- 前回記事と同様に
slack_webhook_connの Connection を参照 on_failure_callback=on_failure_callbackを指定し、失敗時のみメール通知が送信される
-
タスク依存関係
task_sample >> task_slack_notifyで、sample_task 成功後に slack_notify が実行される
動作確認
1. 正常系:Slack 通知が成功する場合
Airflow UI で slack_with_email_backup DAG の詳細画面を開き、右上の 再生ボタン(▶) をクリックして DAG を手動実行します。

実行後、該当する DAG Run をクリックして詳細を開き、slack_notify タスクを選択して Logs タブを開きます。
ログに Retrieving connection 'slack_webhook_conn' と出ていれば、Connection が正しく参照されています。

Slack 側で、DAG 完了を伝える通知メッセージが届いていることが確認できました。

2. 異常系:Slack 通知が失敗する場合
Slack 通知を意図的に失敗させるため、Admin → Connections を開き、slack_webhook_conn の行にある ゴミ箱アイコン をクリックして Connection を削除します。
一覧から slack_webhook_conn が消えていることを確認します。

再度 DAG を手動実行すると、slack_notify タスクが失敗します。
該当タスクの Logs タブを開くと、The conn_id 'slack_webhook_conn' isn't defined というエラーや、その後に on_failure_callback が実行されていることがログから確認できます。

設定した通知先メールアドレスに、タスク失敗のアラートメールが届いていることが確認できました!

まとめ
本記事では、Cloud Composer で Slack 通知失敗時のフォールバックとして Amazon SES メール通知を実装する方法 をご紹介しました。
on_failure_callback を活用することで、特定のタスクが失敗した場合にのみ代替通知を送信する仕組みを簡潔に実装できます。
今回は Amazon SES を使用しましたが、同様のアプローチで SendGrid やその他のメールサービスにも応用可能です。
重要なワークフローの通知を冗長化することで、アラートの見逃しを防ぎ、運用の信頼性を高めることができます。
アクセスキー方式(aws_access_key_id/aws_secret_access_key を直接設定)でも AWS 連携は可能ですが、本記事ではよりセキュアな Workload Identity 連携を採用しました。
アクセスキーの管理が不要で、長期的な認証情報が残らないため安全性が向上します。
本記事が、運用現場でフォールバック通知を導入する際の参考になれば幸いです。









