この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、みかみです。
Python で実装されている Job 管理ツール Apache Airflow。 WebUI のJob 管理画面で直感的に分かりやすい操作が可能で、エラー発生時などの通知機能もあり、スケールアウトにも対応していて複数サーバで分散実行もできます。
Python でバッチ Job 開発経験のある方には多分おなじみの Airflow、私も存在は知っていましたが、実際使ったことはありませんでした。
やりたいこと
- Airflow の構成(アーキテクチャ)を知りたい
- Airflow の使い方(Job 作成&実行方法)を知りたい
Airflow のアーキテクチャ
Airflowは、 管理画面表示部の Webserver と、Job実行のスケジュール管理部の Scheduler 、Job実行部の Worker(Executer) から成り立っているようです。
各モジュールは管理 DB を介して Job の実行スケジュールや依存関係、実行結果を共有します。
Webserver は Python の Web アプリケーションフレームワーク Flask で実装されています。
また Worker(Executer) は、例えば Python の 分散タスクキューフレームワーク Celery を使用した分散並列処理などにカスタマイズすることもできます。
Job Queue を使わない構成や管理 DB のエンジンも自由に選択可能なのであくまでイメージですが、ざっくり以下のような構成になっていると理解しました。
Airflow の Job 構成
Airflow では、Job の 実行順や依存関係などのワークフローは DAG で定義するとのこと。
Job本体(実際の処理)は Operator に記述されており、Python で処理を実行するための PythonOperator や、Bash 実行のための BashOperator の他、各種 RDBMS や Hive、AWS や GCP など 様々なサービスに対する処理を実行する Operator も用意されています。
さらに、 BaseOperator クラスを継承して、必要な処理を実行する Operator を自由に追加することもできます(カスタマイズ性高くて良いですねv
DAG を追加して動かしてみる
と、学んだところで、実際に Airflow で Job を作成して動かしてみようと思います。
まずは Airflow をインストール。 Python 3.7 実行環境構築済みの EC2(Amazon Linux)に、下記クイックスタートに記載のコマンドでインストール&起動しました。
管理画面にアクセスしてみると、サンプル DAG のリスト表示が確認できました。
続いて、チュートリアルとサンプルの example_python_operator.py を参考に、DAG を追加してみます。
$AIRFLOW_HOME 配下に dags ディレクトリを作成して、test_mikami.py ファイルを追加しました。
(test_airflow) [ec2-user@ip-10-0-43-239 airflow]$ pwd
/home/ec2-user/airflow
(test_airflow) [ec2-user@ip-10-0-43-239 airflow]$ mkdir dags
(test_airflow) [ec2-user@ip-10-0-43-239 airflow]$ cd dags
(test_airflow) [ec2-user@ip-10-0-43-239 dags]$ vi test_mikami.py
BashOperator で date コマンドを実行してから5秒間 sleep し、PythonOperator で「Hello! Airflow!!」と出力するだけの単純な処理です。
test_load.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from pprint import pprint
from airflow.operators.python_operator import PythonOperator
# Parameters
default_args = {
'owner': 'ec2-user',
'depends_on_past': False,
'start_date': days_ago(2),
}
dag = DAG(
'test_mikami',
default_args=default_args,
description='For TEST execute DAG',
schedule_interval=timedelta(days=1),
)
# BashOperator:date + sleep 5
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
# PythonOperator:print
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Hello! Airflow!!'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
t1 >> t2 >> run_this
airflow webserver を再起動してみると、管理画面のDAG リストにも、test_mikami が追加表示されました。
管理画面から手動実行してしばらく待ってから実行結果を確認してみると、どうやら正常終了できたようです。
表示されているタスクをクリックしてログを確認してみると、Python での「Hello! Airflow!!」出力まで、ちゃんと実行されていることが確認できました。
手始めに、とりあえず各パラメータの意味もよく分からないまま既存のサンプルをコピペして動かしてみましたが、オペレータにパラメータを指定して、いろいろな実行制御ができるようです。
タスクの依存関係の書き方も簡単で、慣れればとても使いやすそうです。
まとめ(所感)
機能もドキュメントも充実で、「これが OSS でいいの?」というのが正直な感想でした。
config で必要に応じた構成に変更することもでき、各種サービスに対応した Operator(バッチ処理実行部)も用意されているので、自由度も高そう。
- Configuration Reference | Apache Airflow Documentation
- Github > Apache Airflow > default_airflow.cfg
- airflow.operators | Apache Airflow Documentation
- airflow.contrib.operators | Apache Airflow Documentation
リッチな分ソースコードも多く、コンセプトの理解も必要なので慣れるまでは少し難しく感じそうですが、OSS でカスタマイズも可能なので、使いやすくカスタマイズしていくのも良さそうです。