Apache AirflowのSecrets BackendとしてAWS SSMのパラメーターストアを利用してみる

こんにちは。サービスグループの武田です。Apache Airflowの変数や接続情報を保持するバックエンドにAWS SSMパラメーターストアを利用してみました。
2020.09.28

こんにちは。サービスグループの武田です。

Apache AirflowはDAG(有向非巡回グラフ)でワークフローを表現し、ジョブのスケジューリングや監視などを行ってくれるツールです。AirflowではデータベースやAWSなどへ接続するための接続情報や各種パスワード情報などをSecrets Backend(以下、バックエンド)に保持できます。バックエンドはデフォルトではメタストア(PostgreSQLなどのDB)を使用しますが変更もできます。

なおAirflowのバージョンは 1.10.10 以降を対象としています。

優先順位

バックエンドは複数同時に使用できるのですが取得する優先順位があります。これを間違えてしまうと思っていたのと異なる挙動になるため注意が必要です。

  1. 指定したカスタムバックエンド
  2. 環境変数
  3. メタストア

1はこのエントリのメインとなるものです。さまざまなカスタムバックエンドが使用できますが、今回はAWS SSMパラメーターストアを使用してみます。2はAIRFLOW_VAR_<variable_name>という形式の環境変数で指定できます。そして前述したように3がデフォルトです。

また注意点として、これらは 取得を試行する順序 です。変数の保存は常にメタストアが使われます。

バックエンドにAWS SSMパラメーターストアを指定してみる

それではAirflowのバックエンドにAWS SSMパラメーターストアを指定して動作確認をしていきましょう。指定はairflow.cfgで行います(そのため環境変数でもOKです)。

airflow.cfg

[secrets]

backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "profile_name": "default"}

ポイントはbackend_kwargsの方です。それぞれ指定しているパラメーターを確認しておきます。

  • connections_prefix
    • 接続情報の接頭辞です
    • ここでは/airflow/connectionsを指定しています(指定しなかった場合のデフォルト名でもあります)
  • variables_prefix
    • 変数の接頭辞です
    • ここでは/airflow/variablesを指定しています(指定しなかった場合のデフォルト名でもあります)
  • profile_name
    • AWSにアクセスするプロファイル名です
    • ここではdefaultを指定しています

続いてDAGの中で取得する変数をパラメーターストアに設定します。マネジメントコンソールを開いて次のように設定します。

/airflow/variables/blog_nameというキーで設定しているため、Airflow内では(接頭辞を除いた)blog_nameという変数名で取得できます。またタイプとして文字列を指定していますが、安全な文字列も使用できます。パスワードなど機密性の高い文字列を保存する場合はそちらを選択してください。

設定ができたら次は動作確認用のDAGを用意しましょう。今回は次のようなDAGにしてみました。

test_aws_ssm.py

import airflow
from airflow.models import DAG
from airflow.models.variable import Variable
from airflow.operators.python_operator import PythonOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(2),
    "provide_context": True,
}


def ssm_write(**context):
    Variable.set("name", "Classmethod")
    Variable.set("blog_name", "DevIO")


def ssm_read(**context):
    print(Variable.get("name"), Variable.get("blog_name"))


with DAG(dag_id="test_aws_ssm", default_args=args, schedule_interval=None,) as dag:
    t1 = PythonOperator(task_id="ssm_write", python_callable=ssm_write)
    t2 = PythonOperator(task_id="ssm_read", python_callable=ssm_read)

    t1 >> t2

これで準備完了です。さっそく実行してみましょう。

実行できたらssm_readのログを確認します。ssm_writeで保存した変数と、パラメーターストアに保存した変数の出力が確認できます。

ちなみにAirflowの管理画面から変数を確認してみるとnameblog_nameのどちらも保存されていることが確認できます。このことからもパラメーターストアを優先して取得していることがわかりますね。

まとめ

AirflowのバックエンドにAWS SSMパラメーターストアを利用してみました。今回は変数のみですが接続情報も保持できます。変数/接続情報取得のたびにAPIコールが発生するため、それを踏まえた上で検討してみてください。

参考リンク