どーもsutoです。
今回はAmazon MWAAのv2.2.2の環境を構築して、AWSプロバイダーパッケージのRedshiftモジュールを利用したDAGを作成して、Redshift Serverlessにクエリを実行するジョブを動かしてみようと思います。
下記リンクのとおり、Amazon MWAAで構築可能なApache Airflow v2.2.2には apache-airflow-providers-amazon
がデフォルトでインストールされているので、その中にある RedshiftSQLOperator
、 RedshiftToS3Operator
といったRedshift操作のモジュールを使える、と言うことなので実際にDAGを作って検証してみました。
環境構築
まずはAWSコンソールのウィザードに従ってAmazon MWAAの環境構築をササッとこんな感じで作りました。
VPC、セキュリティグループ、IAMロールを全て新規作成する場合は何も考えなくても環境構築が完了しますが、既存VPCに作成する場合はMWAAのネットワーク要件に漏れがないように注意しましょう。また、既存のIAMロールを使う場合も対象のS3バケットへのアクセス権限がちゃんと満たされているか注意しましょう。
次に今回の接続先としてRedshift Serverlessを作成します。手順に関しては以下のリンクがわかりやすいのでこちらを参考にServerlessクラスターを作成します。
また、Redshift Serverlessのセキュリティグループのインバウンドには、MWAA環境にアタッチしているセキュリティグループを許可する設定を忘れずに。
DAG作成
Connectionの設定
まず事前作業として、AirflowのUIにアクセスしてメニューの「Admin」→「Connection」を選択してRedshiftクラスターの接続情報やAWSの認証情報を設定しておきましょう。
- 今回のAWS認証の設定は、Defaultプロファイルに設定したアカウントから、今回の検証アカウントにIAMロールでAssumeRoleしてOperatorを実行する、という方式で認証情報を設定していきます。
-
aws_defaultという設定名を選択して、編集画面で
Login
にIAMユーザーのアクセスキー、Password
にシークレットキーを、Extra
にリージョンを設定します。
- 次に+マークをクリックして、aws_iam_roleという名前でConnectionを新規作成します。編集画面の
Extra
にリージョンとRedshift Serverlessに設定したIAMロールのARNを設定します。
{
"role_arn": "<Redshift Serverlessの実行ロールのArn>",
"region_name": "<region>"
}
- 次にRedshift Serverlessクラスターの接続情報を設定します。
-
redshift_defaultという設定名を選択して(一覧にない場合は+マークをクリックして新規作成)、編集画面で以下のように接続情報を設定します。
RedshiftSQLOperator
Redshiftに対してクエリを実行できるモジュール RedshiftSQLOperator
を使って以下のDAGを作成してS3バケットにアップロードしてみます。
クエリ内容としては新規でtestという名前のテーブルを作成するクエリとなります。
# file name = redshift_dag.py
from datetime import datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
args = {
"owner": "airflow",
"start_date": days_ago(1),
"provide_context": True,
}
with DAG(
dag_id="example_query_redshift",
default_args=args,
schedule_interval=None,
catchup=False,
tags=['example']
) as dag:
t = RedshiftSQLOperator(
task_id='example',
sql='CREATE TABLE IF NOT EXISTS test ( \
test_id INTEGER, \
name VARCHAR NOT NULL, \
color VARCHAR NOT NULL \
);',
)
Airflow UIにアクセスしてアップロードしたDAGが表示されていることを確認します。(キャプチャ画面は試しに何回がジョブ実行した後に撮りました)
では実際にジョブを実行した結果は以下となります。
ジョブ実行成功の場合のLog。
Redshift Query Editor v2で実際にテーブルが出来ていることを確認できました。
RedshiftToS3Operator
RedshiftのテーブルをS3にUNLOADを行う RedshiftToS3Operator
モジュールを使ってみましょう。
対象テーブルは以下のflightdataというテーブルをS3に出力してみます。
以下のようにDAGを作成してS3バケットにアップロードし、実行してみます。(コード内のS3バケット名は適宜修正してご利用ください)
from datetime import datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
args = {
"owner": "airflow",
"start_date": days_ago(1),
"provide_context": True,
}
with DAG(
dag_id="example_unload_redshift",
default_args=args,
schedule_interval=None,
catchup=False,
tags=['example']
) as dag:
t = RedshiftToS3Operator(
task_id='unload',
s3_bucket='<your s3 bucket name>',
s3_key='airflow/output/public/flightdata/{{ ds }}_',
schema='public',
table='flightdata',
redshift_conn_id='redshift_default',
aws_conn_id='aws_iam_role',
table_as_file_name=True,
unload_options=[
"DELIMITER AS ','",
"FORMAT AS CSV",
"ALLOWOVERWRITE",
"PARALLEL OFF",
"HEADER"
]
)
ジョブ実行成功の場合のLogは以下です。
指定したS3バケットに問題なく出力されていました。
まとめ
AirflowのAWS Redshiftモジュールを使用したRedshiftクラスターの操作を行うDAGをご紹介させていただきました。
MWAAのAirflowがv2.2に対応したことでAWSプロバイダーがデフォルトで使え、AWSリソースへの操作がよりやりやすくなったのはありがたいです。
本記事では書いていませんが、S3からRedshiftへCOPYできる S3ToRedshiftOperator
モジュールも当然利用可能です。
最後にAmazon MWAAサービスに対する私の願望を言うと、「もう少し利用料金が安くなってくれるとスモールスタートとして採用しやすくなるんだけどなー」と思っています。