Airflowに関する個人的FAQ

Airflowを触っていて個人的にハマったことなどをFAQ形式でまとめてみました。全然Frequentlyじゃない気がするのはきっと気のせいです。
2019.03.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Airflowを触っていて個人的にハマったことなどをFAQ形式でまとめてみました。全然Frequentlyじゃない気がするのはきっと気のせいです。

以下、バージョンは本記事公開時の最新 1.10.2 です。

インストール

インストールが失敗するのですが?

普通に pip install apache-airflow とすると以下のエラーで失敗します。

RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE

Airflowが依存するライブラリの中にライセンスがGPL 2.0であるunidecodeが入ってしまっていることによるものです。 現状ではインストール時に以下のどちらかの環境変数を設定する必要があります。

  • SLUGIFY_USES_TEXT_UNIDECODE=yes
  • unidecodeの代わりに非GPLである

text-unidecodeを使います。 - AIRFLOW_GPL_UNIDECODE=1.0.23 - GPLが含まれることを承知の上で、unidecodeをインストールします。

なおこの問題はすでに修正されています(pull request)。次期バージョンでは発生しなくなると思われます。

参考

AmazonLinuxだとインストールに失敗するのですが?

AmazonLinux2でインストールすると以下のエラーで失敗します。

Cannot uninstall 'enum34'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.

pip以外でenumが先にインストールされているのが原因のようです。pipで強制的にenum34を上書きインストールすることで解消します。

sudo pip install --ignnore-installed enum34

参考

それでもやっぱりAmazonLinuxでインストールに失敗するのですが?

gccが無いとか言われたらgccをインストールしてください

sudo yum install gcc

設定

環境によって設定を変えられますか?

環境変数で設定を上書きすることができます。セクション名とキーで AIRFLOW__{SECTION}__{KEY} のような変数名で指定します。(見にくいですが間のアンダースコアは2つです。)

例えばwebserverのURLは設定ファイルで以下のように指定しますが

[webserver]
base_url = http://airflow-hostname:8080

これを環境変数で AIRFLOW__WEBSERVER__BASE_URL=http://dev.airflow-hostname:8080 のように設定することで上書きできます。

参考

workerのログをS3に出力できますか?

設定のcoreセクションで以下のように設定します

[core]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://bucket-name/path/to/log

remote_log_conn_id = aws_defaultはデフォルトの接続設定で、Airflowを実行しているIAMロールにS3に書き込む権限があればOKです。

タスクが完了すると、UIからはS3に出力されたログを見ることができます。

参考

DAG関連

DAGにパラメータを渡せますか?

UIからはできないようですが、CLIのtrigger_dagコマンドで実行すれば可能です。 オプション--confにJSON形式で指定します。

$ airflow trigger_dag <dag_id> --conf '{"key": "value"}'

confで指定した値をOperatorで利用するには以下の2つの方法があります。

template文字列内で受け取る

BashOperatorの引数bash_commandなどJinja templateが使用できるところでは、{{ dag_run.conf['key'] }}としてconfの値を受け取れます。

BashOperator(
    bash_command="echo {{ dag_run.conf['key'] }}",
)

Jinja templateを使える引数は、Operatorのtemplate_fieldsという変数で指定されているものです()。 APIリファレンスでは"templated"と書かれています。

context経由で受け取る

Operator内部のcontextという変数にdag_runが入っています。 例えば

PythonOperatorの場合は、実行する関数python_callableのキーワード引数としてcontextが渡ってきますので、以下のように使うことができます。

def print_value(**context):
    print(context['dag_run'].conf['key'])

PythonOperator(
    python_callable=print_value,
)

参考

BashOperatorが"TemplateNotFound"とかいうエラーで失敗するのですが?

コマンド文字列の末尾にスペースを入れると正常に実行できる場合があります。

参考

Operatorの中で設定値を使うことはできますか?

例えばあるDAGが失敗した時にSlackに通知したいが、URLを設定値から取得したい、というマニアックな場合を考えます。この場合、template内で conf.get('section', 'key') のようにして取得することができます。

text = ''':no_entry: {{ dag.dag_id }} {{ run_id }} failed.
{{ conf.get("webserver", "base_url") }}/admin/airflow/graph?dag_id={{ dag.dag_id }}&run_id={{ run_id }}
'''
fail = SlackAPIPostOperator(
    task_id='notify_failuer',
    channel='#notify-channel',
    token=os.getenv('SLACK_API_TOKEN'),
    text=text,
    username='Airflow',
    default_args=args,
    dag=dag,
)

このほか、templateで使えるものはドキュメントのココにまとまっています。

他のDAGの終了を待ち合わせることはできますか?

ExternalTaskSensorにそのような機能があります。

前のタスクのステータスによってフローを変えることはできますか?

前のタスクのステータスによって、というものは用意されていないようですが、 BranchPythonOperatorというものがあり、 pythonの関数の実行結果によってフローを変えることは可能です。

参考

スケジュール実行したくないのですが?

DAGのパラメータschedule_intervalNoneにすれば可能です。 なお、default_argsのdictの中で指定しても効果はありませんので注意してください。

default_args = {
    'owner': 'airflow',
    'start_date': dates.days_ago(1),
    #'schedule_interval': None,  <- コレは効かない
}

dag = DAG(
    dag_id='foo',
    default_args=default_args,
    schedule_interval=None,  # コレ
)

参考

失敗したタスクから先を実行し直したいのですが?

可能です。

  • UIでは、やり直したいDagRunをGraph Viewで表示 -> やり直したいタスクをクリック -> "Clear"をクリック
  • CLIではairflow clear -s -e -t task_a

ちなみに、DAGにconfを設定して実行していた場合も大丈夫です。最初に設定したのと同じconfがやり直す際にも設定されます。

参考

まとめ

以上、ほとんど個人的な備忘録ですが、自分がハマったり調査したことのメモとして残しておきます。 今後も何かあれば随時追記される、かもしれません。