AWSのマネージドAirflow、Amazon Managed Workflow for Apache Airflow(MWAA)が登場!

こんにちは。サービスグループの武田です。ETL処理などのワークフローを実行するツールとして人気のあるApache AirflowがAWSのマネージドサービスとして登場しました!
2020.11.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。サービスグループの武田です。

本日朝イチで飛び込んできたニュースをお届けします。ETL処理などのワークフローを実行するツールとして人気のあるApache AirflowがAWSのマネージドサービスとして登場しました!GCPにはCloud Composerというマネージドサービスがあったのですが、AWSには存在しませんでした。re:Invent 2019で発表されないかな、と昨年も期待していたのですが、なんと今年のre:Invent 2020を目前にしてリリースされました。歓喜!

Amazon MWAAの特徴

ドキュメントなどを読んでいくつかピックアップしてみました。詳細はぜひドキュメントを確認してください。

  • IAMと統合されたWebサーバーのユーザー管理
  • CloudWatchと統合されたモニタリング
  • DAGファイルやプラグインはS3にアップロードするだけ
  • Workerのオートスケーリング
    • 最大Worker数は25(上限緩和可能)
  • 自動マイナーバージョンアップ

料金体系

MWAAは次の3つについて使用した分だけの料金がかかります。

  • 環境クラス
  • 追加のWorkerインスタンス
  • メタデータベース

1つ目は環境クラスで、Small、Medium、Largeの3種類が提供されており、各ノードのvCPUが異なります。2つ目がWorkerインスタンスの追加分で、オートスケーリングでノードが増減するため、2ノード目以降の追加されたインスタンスがカウントされます。3つ目がメタデータベースの保存量です。環境クラスと追加のWorkerインスタンスは秒単位で課金されます。

この3つの合計がMWAAの利用料となります。またこのほかに、ネットワーキング(NATゲートウェイなど)やS3およびCloudWatchの保存料金、GlueやLambdaなどを使用する場合はそれらの料金も予算に入れておきましょう。

MWAAを起動してみた

それでは実際にAirflowの環境を立ち上げて、簡単なDAGを実行してみましょう。先立ってMWAA用のS3バケットを作成します。

S3バケットはairflow-から始まる必要があります。またS3オブジェクトのバージョニングを有効にすることで、プラグインやrequirements.txtのバージョンを指定できます。今回はairflow-firstairflow-XXXXという名前でバケットを作成し、DAG配置用にdagsというフォルダーを作成しました。

次にAirflowの環境を作成します。マネジメントコンソールでMWAAにアクセスし、[環境を作成]ボタンをクリックします。

ウィザードページに遷移します。

環境の名前を入力します。今回はFirstAirflowとしました。バージョンのプルダウンがありますが、執筆時点では最新の1.10.12のみ選択できました。今後マイナーバージョンアップや2系が登場してくると増えるのでしょう。

DAGファイルなどをアップロードするS3バケットの指定をします。事前に作成しておいたairflow-firstairflow-XXXXおよびairflow-firstairflow-XXXX/dagsを指定しています。プラグインや追加のモジュールが必要な場合は、それもここで指定します。入力できたら[次へ]をクリック。

次の画面ではAriflowを起動するVPCを指定します。今回はCreate MWAA VPCのリンクからAirflow用のVPCを作成します。ここで作成されるVPCの仕様はドキュメントに書かれていますので、こちらも参照してください。

先ほどのリンクをクリックするとCloudFormationスタックのクイック作成画面になります。スタック名は任意の名前を入力しましょう。

環境名やCIDRのレンジなどを指定できます。基本的にはデフォルトの設定で問題ありません。

作成ボタンを押したら、スタックが完了するまで少し待ちましょう。

スタックが完了したら、MWAAのウィザードに戻ります。更新するとネットワークのプルダウンに作成したVPCが選択できるはずです。それぞれ選択します。

Webサーバーのアクセス方法を選択します。Airflowの管理画面にどこからアクセスするかが基準となります。今回は外部からアクセスしたいので公開ネットワークを選択します。またセキュリティグループでアクセス制御されるため、環境に合わせて設定する必要があります。

環境クラスとWorkerの上限を指定します。DAGキャパシティーは最初、その数に制限されるのかとも思ったのですが、どうやら推奨値のようです。つまりDAGの数で環境クラスをあげてくださいね、ということのようです。WMAAはタスクキューの数によって自動的にWorkerがスケーリングします。その際のWorkerの上限が指定できます。大きめの値を指定すればスケールしますが、そのぶんのコストもかかります。時間とコストの兼ね合いで上限値を決めるとよいでしょう。

各ログの出力設定ができます。またログレベルも指定できるため、本番環境ではモニタリングのためにも出力設定をするとよいのではないでしょうか。

Airflow設定オプションではsmtp_hostdag_concurrencyなどの設定を上書きできます。詳細はドキュメントを確認してください。

最後にMWAAが使用するIAMロールを指定して[環境を作成]をクリックしましょう。

ウィザードが終了し、環境の作成が始まります。20分〜30分ほどかかると書いてあります。

15分くらいで利用可能になりました。右の方にある「Airflow UI を開く」リンクをクリックするとAirflowの管理画面に簡単にアクセスできます。

DAGをアップロードして実行してみた

Airflowの環境ができたので、DAGをアップロードして実行していきましょう。今回は次のような適当な秒数スリープするタスクで構成されたDAGを実行してみます。

random_sleep.py

import random
import time

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(2),
    "provide_context": True,
}


def task_proc(**context):
    s = random.random() * 10 + 10
    print(f"sleep {s} times.")
    time.sleep(s)


with DAG(
    dag_id="random_sleep",
    default_args=args,
    schedule_interval=None,
) as dag:
    t1 = PythonOperator(task_id="random_sleep1", python_callable=task_proc)
    t2 = PythonOperator(task_id="random_sleep2", python_callable=task_proc)
    t3 = PythonOperator(task_id="random_sleep3", python_callable=task_proc)
    t4 = PythonOperator(task_id="random_sleep4", python_callable=task_proc)
    t5 = PythonOperator(task_id="random_sleep5", python_callable=task_proc)

    t1 >> [t2, t3, t4] >> t5

このファイルを最初に作成したairflow-firstairflow-XXXXバケットのdagsフォルダーにアップロードします。

おおよそ20秒ほどでAirflowの管理画面にDAGが表示されました。思っていたよりも早いです。

手動でトリガーすると問題なくDAGが実行できました。

まとめ

AWSのマネージドAirflowを待っていた方も多いのではないでしょうか?マネージド最高!DAGファイルの更新はS3バケットに置くだけです。GitHubなどでバージョン管理している場合は、マージしたらS3にコピーするパイプラインを用意すればDAGのデプロイまで自動化ができますね。

それではよいAirflowライフを!

参考