Amazon MWAAでOperatorがAssumeRoleする際にSTSリージョナルエンドポイントを使用する
こんにちは。サービス開発室の武田です。
Amazon MWAA(Managed Workflows for Apache Airflow)というかApache Airflowでは、実行するタスクはOperatorという単位で定義します。Operatorはファイルのコピーや削除、Web APIの呼び出しなど多岐に渡り、また自作も可能です。
AirflowではビルトインのOperatorもありますが、プロバイダーという形でSaaSなどと連携できるライブラリも多く提供されています。もちろんAWSもあります。
AWSにアクセスする際にはリソースを操作するための権限が必要となりますが、その設定としてIAMロールを使用するのがベストプラクティスです。AWSではそのIAMロールの認証情報(一時クレデンシャル)を払い出すことをAssumeRoleと呼びます。AirflowではConnectionsで設定をするだけで簡単にAssumeRoleしてOperatorを実行できます。
さてここからが本題です。AirflowはPythonで書かれたシステムです。AWSの操作はBoto3を使用しています。AssumeRoleの際にはSTS(Security Token Service)というサービスを使用するのですが、先日これに関して障害がありました。それを受けてSTSの利用についてベストプラクティスがまとめられていました。
またそれらをもとにBoto3のSTSエンドポイントの利用について確認しました。
前置きが長くなりましたが、結論は「AirflowはSTSグローバルエンドポイントを使用している」です。ベストプラクティスに沿っていないのでSTSリージョナルエンドポイントを使用するようにしよう、というのがこのエントリの主題となります。
MWAAの環境で設定してみた
まずは本当にデフォルトがSTSグローバルエンドポイントを使用しているのか、を確認しました。MWAAではログレベルを指定できるようになっているのですが、INFOは指定できてもDEBUGは指定できません。そのためDAG定義で無理やりDEBUGレベルを指定して出力してみました。なお、Connectionsの設定は前述のエントリと同じですので省略します。
次のコードをDAG定義ファイルに追加してやります。
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.info("SET DEBUG LEVEL")
この設定を追加しDAGを実行してみると、次のようなログが確認できました。
[2024-09-01, 09:00:00 UTC] {{regions.py:498}} DEBUG - Calling endpoint provider with parameters: {'Region': 'ap-northeast-1', 'UseDualStack': False, 'UseFIPS': False, 'UseGlobalEndpoint': True}
[2024-09-01, 09:00:00 UTC] {{regions.py:513}} DEBUG - Endpoint provider result: https://sts.amazonaws.com
このhttps://sts.amazonaws.com
というのがグローバルエンドポイントです。つまりデフォルトではSTSグローバルエンドポイントを使用していることが確認できました。
それではSTSリージョナルエンドポイントを使用するよう設定を追加していきましょう。設定としてはAWS_STS_REGIONAL_ENDPOINTS=regional
の環境変数を追加するだけです。DAGファイルで定義する方法もありそうですが、あまりスマートではありませんね。
AWSのドキュメントでは、Airflow全体に環境変数を追加する方法としてカスタムプラグインが紹介されています。今回はこの方法を採用しました。
まず作業用のディレクトリを作成します。
$ mkdir plugin && cd $_
$ vim env_var_plugin.py
このディレクトリの中で新しくenv_var_plugin.py
を作成します。ファイルの内容は次のとおりです。
import os
from airflow.plugins_manager import AirflowPlugin
os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
class EnvVarPlugin(AirflowPlugin):
name = 'env_var_plugin'
コードを保存できたらplugins.zip
を作成します。
$ zip -r ../plugins.zip ./
zipファイルが作成できたら、MWAAからアクセスできるS3バケットのアップロードします。
アップロードしたzipファイルをMWAAのプラグインファイルとして指定します。今回は既存のMWAA環境に設定しました。
設定を保存したらクラスターの更新が終わるまで待ちましょう。
更新が終わったら、先ほど実行したDAGを再実行してやります。次のとおりログが記録されていました。
[2024-09-01, 09:00:00 UTC] {{regions.py:498}} DEBUG - Calling endpoint provider with parameters: {'Region': 'ap-northeast-1', 'UseDualStack': False, 'UseFIPS': False, 'UseGlobalEndpoint': False}
[2024-09-01, 09:00:00 UTC] {{regions.py:513}} DEBUG - Endpoint provider result: https://sts.ap-northeast-1.amazonaws.com
先ほどとはエンドポイントが異なっておりhttps://sts.ap-northeast-1.amazonaws.com
になっています。このMWAA環境は東京リージョンで動かしていましたので、想定通りの挙動を確認できました。
まとめ
STSについて調査してまとめている中で、「そういえばAirflowもPython製なんだからもしかして」と思って確認したら案の定でした。今回は確認から設定までひととおり確認して見ました。どなたかの参考になりましたら幸いです。