Apache AirflowのSecrets BackendとしてAWS SSMのパラメーターストアを利用してみる
こんにちは。サービスグループの武田です。
Apache AirflowはDAG(有向非巡回グラフ)でワークフローを表現し、ジョブのスケジューリングや監視などを行ってくれるツールです。AirflowではデータベースやAWSなどへ接続するための接続情報や各種パスワード情報などをSecrets Backend(以下、バックエンド)に保持できます。バックエンドはデフォルトではメタストア(PostgreSQLなどのDB)を使用しますが変更もできます。
なおAirflowのバージョンは 1.10.10 以降を対象としています。
優先順位
バックエンドは複数同時に使用できるのですが取得する優先順位があります。これを間違えてしまうと思っていたのと異なる挙動になるため注意が必要です。
- 指定したカスタムバックエンド
- 環境変数
- メタストア
1はこのエントリのメインとなるものです。さまざまなカスタムバックエンドが使用できますが、今回はAWS SSMパラメーターストアを使用してみます。2はAIRFLOW_VAR_<variable_name>
という形式の環境変数で指定できます。そして前述したように3がデフォルトです。
また注意点として、これらは 取得を試行する順序 です。変数の保存は常にメタストアが使われます。
バックエンドにAWS SSMパラメーターストアを指定してみる
それではAirflowのバックエンドにAWS SSMパラメーターストアを指定して動作確認をしていきましょう。指定はairflow.cfg
で行います(そのため環境変数でもOKです)。
[secrets] backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "profile_name": "default"}
ポイントはbackend_kwargs
の方です。それぞれ指定しているパラメーターを確認しておきます。
- connections_prefix
- 接続情報の接頭辞です
- ここでは
/airflow/connections
を指定しています(指定しなかった場合のデフォルト名でもあります)
- variables_prefix
- 変数の接頭辞です
- ここでは
/airflow/variables
を指定しています(指定しなかった場合のデフォルト名でもあります)
- profile_name
- AWSにアクセスするプロファイル名です
- ここでは
default
を指定しています
続いてDAGの中で取得する変数をパラメーターストアに設定します。マネジメントコンソールを開いて次のように設定します。
/airflow/variables/blog_name
というキーで設定しているため、Airflow内では(接頭辞を除いた)blog_name
という変数名で取得できます。またタイプとして文字列
を指定していますが、安全な文字列
も使用できます。パスワードなど機密性の高い文字列を保存する場合はそちらを選択してください。
設定ができたら次は動作確認用のDAGを用意しましょう。今回は次のようなDAGにしてみました。
import airflow from airflow.models import DAG from airflow.models.variable import Variable from airflow.operators.python_operator import PythonOperator args = { "owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), "provide_context": True, } def ssm_write(**context): Variable.set("name", "Classmethod") Variable.set("blog_name", "DevIO") def ssm_read(**context): print(Variable.get("name"), Variable.get("blog_name")) with DAG(dag_id="test_aws_ssm", default_args=args, schedule_interval=None,) as dag: t1 = PythonOperator(task_id="ssm_write", python_callable=ssm_write) t2 = PythonOperator(task_id="ssm_read", python_callable=ssm_read) t1 >> t2
これで準備完了です。さっそく実行してみましょう。
実行できたらssm_read
のログを確認します。ssm_write
で保存した変数と、パラメーターストアに保存した変数の出力が確認できます。
ちなみにAirflowの管理画面から変数を確認してみるとname
とblog_name
のどちらも保存されていることが確認できます。このことからもパラメーターストアを優先して取得していることがわかりますね。
まとめ
AirflowのバックエンドにAWS SSMパラメーターストアを利用してみました。今回は変数のみですが接続情報も保持できます。変数/接続情報取得のたびにAPIコールが発生するため、それを踏まえた上で検討してみてください。