Apache AirflowのAWSオペレーターをAssumeRoleして実行する

こんにちは。サービスグループの武田です。AirflowはOperatorがAWSに接続する際の接続情報を複数用意できるしくみがあり、各Operatorにどの接続情報を使用するかの指定ができます。ここではIAMロールにAssumeRoleしてOperatorを実行する方法を紹介します。
2021.04.14

こんにちは。サービスグループの武田です。

サービスGではサービス運用にあたってジョブワークフロー管理システムであるApache Airflowを利用しています。AWSマネージドサービスであるMWAA(Amazon Managed Workflows for Apache Airflow)も利用できるようになりましたね。このAirflowにはOperatorと呼ばれる具体的な処理を実行するコンポーネントが多数用意されており、Operatorを組み合わせることでワークフローを構築します。

さてAirflowのOperatorの中にはAWSのサービスを呼び出せるものもあります。ちなみに使いたいサービスに対応したOperatorがなければ自分で処理を書くこともできます(結局はPythonのコードなので)。AirflowはOperatorがAWSに接続する際の接続情報を複数用意できるしくみがあり、各Operatorにどの接続情報を使用するかの指定ができます。デフォルトではaws_defaultという接続情報、いわゆる「defaultプロファイル」が使用されます。ここではIAMロールにAssumeRoleしてOperatorを実行する方法を紹介します。

前提

前提としてソフトウェアのバージョンを次のものとします。

AWSインテグレーションについて補足します。Airflowはこれまでcontribパッケージとしてインテグレーションコンポーネントを提供していました。

Integration — Airflow Documentation

Airflow 1系ではこれを使用するのが基本となります。一方で2系に向けてこれらはprovidersパッケージに移行され、新機能もそちらで実装されるようになっていました。

airflow.providers.amazon — apache-airflow-providers-amazon Documentation

2系向けに追加されたモジュールを1系のAirflowでも使用できるよう用意されているのが前述のbackport-providersパッケージです。2系への移行が簡単になること、および今回紹介する機能がprovidersパッケージで書き直されていることもあるため、このエントリではbackport-providersを推奨します。

設定方法

それでは具体的な設定の方法です。Airflow管理画面などから接続情報を管理できます。[Admin] > [Connections]と選択します。

接続情報一覧が表示されたら[Create]タグを押すと、新しい接続情報を追加できます。

Conn Idは接続IDで、Operatorを定義する際に使用します。Conn Typeはプルダウンから Amazon Web Services を選択します。重要なのがExtraです。いくつかのパターンに分けて設定方法を紹介します。

ベースセッションにdefaultプロファイルを使用する

ベースセッションとしてdefaultプロファイルを使用し、123456789012アカウントのairflow-test-roleロールにスイッチしたい場合は次のように指定します。

{
  "role_arn": "arn:aws:iam::123456789012:role/airflow-test-role",
  "region_name": "ap-northeast-1"
}

次のように設定しても同じです(中でARNに組み立てられます)。

{
  "aws_account_id": "123456789012",
  "aws_iam_role": "airflow-test-role",
  "region_name": "ap-northeast-1"
}

ベースセッションに任意のプロファイルを使用する

AWS CLIでいうところのsource_profileを指定する場合は、次のようにsession_kwargsを追加します。basic_profileはAssumeRoleを実行するプロファイル名です。

{
  "role_arn": "arn:aws:iam::123456789012:role/airflow-test-role",
  "region_name": "ap-northeast-1",
  "session_kwargs": {
    "profile_name": "basic_profile"
  }
}

ベースセッションにアクセスキーを使用する

ベースセッションとしてプロファイルではなく、アクセスキーを接続情報に直接指定も可能です。

LoginにアクセスキーID、Passwordにシークレットアクセスキーを指定し、Extraにロール情報を指定します。

{
  "role_arn": "arn:aws:iam::123456789012:role/airflow-test-role",
  "region_name": "ap-northeast-1"
}

またExtraに、次のように設定しても同じです。

{
  "aws_access_key_id": "enter IAM User access key id",
  "aws_secret_access_key": "enter IAM User secret access key",
  "role_arn": "arn:aws:iam::123456789012:role/airflow-test-role",
  "region_name": "ap-northeast-1"
}

Operatorでの指定

上記のいずれかで接続情報を定義したら、あとは実行するOperatorで指定するだけです。例としてAWSAthenaOperatorであれば、次のように指定します。

with DAG(dag_id="sample_athena_query", default_args=args, schedule_interval=None) as dag:

    athena = AWSAthenaOperator(
        task_id="athena_query",
        aws_conn_id="aws_test_role",
        output_location="s3://aws-athena-query-results-123456789012-ap-northeast-1",
        database="default",
        query='SELECT * FROM "some_table" limit 10',
    )

5行目で指定しているaws_conn_idの値が定義した接続IDです。たったこれだけで、自動的にAssumeRoleしてAWSサービスへ接続してくれます。便利!

まとめ

AirflowでAWSのクロスアカウントアクセスの方法を調べてみました。簡単な設定で実現でき、すばらしいですね。

参考URL