初めて触るAirflow

最近、業務でAirflowを初めて触りました。調査したこと、試しに動かしてみたことなどまとめてみます。

Airflowとは

Apache Airflowはいわゆるワークフローエンジンと言われるツールの一種で、 複数のタスクの実行順序を定義するワークフローの作成、実行のスケジューリング、監視などを行うことができます。AirbnbのMaxime Beauchemin氏によって2014年10月に開発され、2016年にはApache Incubatorプロジェクトになっています(参考)。

Airflowでは、DAG(有向非巡回グラフ)でワークフローを表現します。 その基本的なコンセプトについては下記の記事で解説されていますので、ご参照ください。

Airflowのコンセプトと仕組みを理解する

とりあえず試してみる

まずは手元のPC (Mac) で試してみます。 簡単に動かせるDockerイメージとDocker Composeの設定ファイルが https://github.com/puckel/docker-airflow にありますので、これを利用します。

実行前準備

GitHubからファイルを取得します。

$ git clone https://github.com/puckel/docker-airflow.git
$ cd docker-airflow

このまま実行することもできますが、サンプルで入っているDAG (dags/tuto.py) を少しだけ修正することをお勧めします。dags/tuto.py の start_date を適当に最近の日付に修正しましょう。

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2015, 6, 1),  # ここを適当に最近の日付に修正
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

実行

docker-composeコマンドで起動します。

$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
Creating docker-airflow_postgres_1 ... done
Creating docker-airflow_redis_1    ... done
Creating docker-airflow_flower_1    ... done
Creating docker-airflow_webserver_1 ... done
Creating docker-airflow_scheduler_1 ... done
Creating docker-airflow_worker_1    ... done

起動したらブラウザから http://localhost:8080 にアクセスすると、以下のような画面になります。

左の"Off"と書いてあるスイッチのようなものが実行のスイッチです。 これをクリックして"On"にするとジョブが実行できるようになり 、start_dateから今日までの分のジョブが順次実行されます。 *1

Trigger Dagボタンをクリックすると、手動で実行することもできます。

"DAG Runs"からは、Dagの実行結果一覧を見ることができます。

また、"Tree View"ボタンをクリックすると、Dagの各タスクの状態をツリー形式で見ることができます。

図の四角いマークが1つ1つのタスクです。これを選択して個別に再実行することもできます。 *2

この他にもUIには様々な機能があります。色々触ってみると面白いです。

Amazon ECSで動かす

次にこれをAmazon ECSで動かしてみます。ECS CLIを使うとDocker Compose設定ファイルを使えるので便利そう、と思って使ってみました。

ref) ECS CLIを使ってDocker Composeのファイルを使ってECSにデプロイする

一応動かせたのですが、ECS CLIはDocker Composeの全てをサポートしてはおらず、色々修正が必要で結構ハマりました。以下に実行したファイルを置いてあります。

https://github.com/ktsmy/airflow-on-ecs-sample

以下のような感じで動かすことができます。

# GitHubからファイル取得
$ git clone https://github.com/ktsmy/airflow-on-ecs-sample.git
$ cd airflow-on-ecs-sample

# ECS clusterの作成
# EC2インスタンス、VPC、セキュリティグループなどが作成されます
# (キーペアをあらかじめ作成しておく必要があります)
$ ecs-cli up --keypair keypair_name --capability-iam --size 1 --instance-type r5.large --cluster-config ec2-tutorial

# DAGのファイルを置けるように、セキュリティグループの許可設定にSSH(ポート22)を追加
$ aws ec2 authorize-security-group-ingress --group-id created_security_group_id --protocol tcp --port 22 --cidr your_cidr

# DAGファイルを配置
$ cd ansible
$ vi hosts  # 作成されたEC2インスタンスのIPアドレスに修正
$ cp /path/to/keypair ./keypair.pem
$ ansible-playbook -i hosts site.yml
$ cd ..

# コンテナを起動
$ ecs-cli compose up --create-log-groups --cluster-config ec2-tutorial

参考までに、修正した点を記載しておきます。

  • depends_onをlinksに修正
    • ECS CLIはdepends_onをサポートしていないため、代わりにlinksを使う必要がありました。
    • linksにするとリンク環境変数が設定され、これがたまたまentrypointのスクリプトが使用する変数REDIS_PORT, POSTGRES_PORTと競合して起動処理が失敗していたため、適切な値にするよう設定が必要でした(これでかなりハマってしまいました)
  • DAGファイルの配布
    • DAGファイルをホスト側(EC2インスタンス)に配置してvolumeでマウントする仕組みのため、別途Ansibleで配布するようにしました *3
  • その他諸々
    • restartはサポートされていないためコメントアウトしました
    • デバッグのためにログをClowdWatch Logsで見られるように、logging設定を追加しました
    • 最初t2.microなどの小さなインスタンスで試していたのですがすぐにメモリ不足で落ちるため、メモリ大きめのインスタンスタイプがよさそうです。自分はr5.largeにしました。

最後に

Airflowを触ってみました。簡単なサンプルのDAGを動かしただけですが、かなり高機能で色々使えそうな印象でした。AWSのサービスの連携も可能なようなので、引き続き色々触ってみたいと思います。

参考

脚注

  1. デフォルトでは1日1回00:00に実行するようスケジューリングされます。実行開始するとstart_dateから現在までの分が実行され始めるため、start_dateが古いと大量のDAGが実行されてしまいます。なのでstart_dateは最近の日付にしておくと良さげです。
  2. デフォルトのExecutor(SequentialExecutor)ではできません。CeleryExecutorであれば可能です。
  3. だったら最初から全部Ansibleでやる方が良かったかもしれません・・・