Amazon MWAAでapache-airflow-providers-amazonを使ってRedshiftと繋いでみた

2022.09.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どーもsutoです。

今回はAmazon MWAAのv2.2.2の環境を構築して、AWSプロバイダーパッケージのRedshiftモジュールを利用したDAGを作成して、Redshift Serverlessにクエリを実行するジョブを動かしてみようと思います。

下記リンクのとおり、Amazon MWAAで構築可能なApache Airflow v2.2.2には apache-airflow-providers-amazon がデフォルトでインストールされているので、その中にある RedshiftSQLOperatorRedshiftToS3Operator といったRedshift操作のモジュールを使える、と言うことなので実際にDAGを作って検証してみました。

環境構築

まずはAWSコンソールのウィザードに従ってAmazon MWAAの環境構築をササッとこんな感じで作りました。

VPC、セキュリティグループ、IAMロールを全て新規作成する場合は何も考えなくても環境構築が完了しますが、既存VPCに作成する場合はMWAAのネットワーク要件に漏れがないように注意しましょう。また、既存のIAMロールを使う場合も対象のS3バケットへのアクセス権限がちゃんと満たされているか注意しましょう。

次に今回の接続先としてRedshift Serverlessを作成します。手順に関しては以下のリンクがわかりやすいのでこちらを参考にServerlessクラスターを作成します。

また、Redshift Serverlessのセキュリティグループのインバウンドには、MWAA環境にアタッチしているセキュリティグループを許可する設定を忘れずに。

DAG作成

Connectionの設定

まず事前作業として、AirflowのUIにアクセスしてメニューの「Admin」→「Connection」を選択してRedshiftクラスターの接続情報やAWSの認証情報を設定しておきましょう。

  • 今回のAWS認証の設定は、Defaultプロファイルに設定したアカウントから、今回の検証アカウントにIAMロールでAssumeRoleしてOperatorを実行する、という方式で認証情報を設定していきます。

  • aws_defaultという設定名を選択して、編集画面で Login にIAMユーザーのアクセスキー、 Password にシークレットキーを、Extra にリージョンを設定します。

  • 次に+マークをクリックして、aws_iam_roleという名前でConnectionを新規作成します。編集画面の Extra にリージョンとRedshift Serverlessに設定したIAMロールのARNを設定します。
{
 "role_arn": "<Redshift Serverlessの実行ロールのArn>",
 "region_name": "<region>"
}

  • 次にRedshift Serverlessクラスターの接続情報を設定します。

  • redshift_defaultという設定名を選択して(一覧にない場合は+マークをクリックして新規作成)、編集画面で以下のように接続情報を設定します。

RedshiftSQLOperator

Redshiftに対してクエリを実行できるモジュール RedshiftSQLOperator を使って以下のDAGを作成してS3バケットにアップロードしてみます。

クエリ内容としては新規でtestという名前のテーブルを作成するクエリとなります。

# file name = redshift_dag.py

from datetime import datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator

args = {
    "owner": "airflow",
    "start_date": days_ago(1),
    "provide_context": True,
}

with DAG(
    dag_id="example_query_redshift",
    default_args=args,
    schedule_interval=None,
    catchup=False,
    tags=['example']
) as dag:

    t = RedshiftSQLOperator(
        task_id='example',
        sql='CREATE TABLE IF NOT EXISTS test ( \
            test_id INTEGER, \
            name VARCHAR NOT NULL, \
            color VARCHAR NOT NULL \
            );',
    )

Airflow UIにアクセスしてアップロードしたDAGが表示されていることを確認します。(キャプチャ画面は試しに何回がジョブ実行した後に撮りました)

では実際にジョブを実行した結果は以下となります。

ジョブ実行成功の場合のLog。

Redshift Query Editor v2で実際にテーブルが出来ていることを確認できました。

RedshiftToS3Operator

RedshiftのテーブルをS3にUNLOADを行う RedshiftToS3Operator モジュールを使ってみましょう。

対象テーブルは以下のflightdataというテーブルをS3に出力してみます。

以下のようにDAGを作成してS3バケットにアップロードし、実行してみます。(コード内のS3バケット名は適宜修正してご利用ください)

from datetime import datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator

args = {
    "owner": "airflow",
    "start_date": days_ago(1),
    "provide_context": True,
}

with DAG(
    dag_id="example_unload_redshift",
    default_args=args,
    schedule_interval=None,
    catchup=False,
    tags=['example']
) as dag:

    t = RedshiftToS3Operator(
        task_id='unload',
        s3_bucket='<your s3 bucket name>',
        s3_key='airflow/output/public/flightdata/{{ ds }}_',
        schema='public',
        table='flightdata',
        redshift_conn_id='redshift_default',
        aws_conn_id='aws_iam_role',
        table_as_file_name=True,
        unload_options=[
            "DELIMITER AS ','",
            "FORMAT AS CSV",
            "ALLOWOVERWRITE",
            "PARALLEL OFF",
            "HEADER"
        ]
    )

ジョブ実行成功の場合のLogは以下です。

指定したS3バケットに問題なく出力されていました。

まとめ

AirflowのAWS Redshiftモジュールを使用したRedshiftクラスターの操作を行うDAGをご紹介させていただきました。

MWAAのAirflowがv2.2に対応したことでAWSプロバイダーがデフォルトで使え、AWSリソースへの操作がよりやりやすくなったのはありがたいです。

本記事では書いていませんが、S3からRedshiftへCOPYできる S3ToRedshiftOperator モジュールも当然利用可能です。

最後にAmazon MWAAサービスに対する私の願望を言うと、「もう少し利用料金が安くなってくれるとスモールスタートとして採用しやすくなるんだけどなー」と思っています。

参考リンク