Digdagのワークフロー定義について理解する #digdag

treasure-data-logo

先日の投稿でDigdagの『アーキテクチャ』に関する部分を読み解いて行きましたが、今回はその続き、『ワークフロー定義』に関する部分を読み進めてみたいと思います。

ワークフローは拡張子『*.dig』のファイルとして作成する

Digdagのワークフローは拡張子*.digを持つファイルとして作成します。ファイルの名前=ワークフローの名前、となります。例えば、hello_worldというワークフローを作るとします。その際のファイル名はhello_world.digとなります。中身はこんな感じの記述内容となります。

timezone: UTC

+step1:
  sh>: tasks/shell_sample.sh

+step2:
  py>: tasks.MyWorkflow.step2
  param1: this is param1

+step3:
  rb>: MyWorkflow.step3
  require: tasks/ruby_sample.rb

timezoneパラメータはワークフローのタイムゾーンを構成する為に使用れ、セッションタイムスタンプ変数やスケジューリングに影響してきます。デフォルトのタイムゾーンはUTCです。その他有効なタイムゾーンのサンプルとしてはAmerica/Los_AngelesEurope/BerlinAsia/Tokyoといった形となります。

"+"はタスク

"+"で始まるキー名称は『タスク』として認識されます。タスクは、上から順番に実行され、そのタスクはネスト構造を取る事(別のタスクを子タスクとして定義)が出来ます。上記例ではメインタスクの子タスクとして、+step2タスクが+step1タスクの後に実行されます。

オペレータ(operators)

type>: commandもしくは_type: NAMEのパラメータで始まるタスクは任意のアクションを実行します。

シェルスクリプトやPythonメソッド、メール配信等の様々な種類のオペレータを選択する事が可能です。ビルトイン(組み込み済)のオペレータ一覧は下記をご参照ください。

注意:
foo>: barのパラメータ設定は_type: foo_command: barのパラメータに相当します。
これは、2つのパラメータ設定のシンタックスシュガーを1行にまとめた形となります。

変数:${variables}の利用

ワークフローでは${...}のシンタックスで変数を利用する事が出来ます。 ビルトインの変数は以下の形で用意されています。

  • timezone
  • session_uuid
  • session_time
  • session_date
  • session_date_compact
  • session_local_time
  • session_tz_offset
  • session_unixtime

schedule:オプションが設定されている場合は以下の要素を活用する事が出来ます。

  • last_session_time
  • last_session_date
  • last_session_date_compact
  • last_session_local_time
  • last_session_tz_offset
  • last_session_unixtime
  • next_session_time
  • next_session_date
  • next_session_date_compact
  • next_session_local_time
  • next_session_tz_offset
  • next_session_unixtime

last_session_timeは直近最後のスケジュールのタイムスタンプです。スケジュールがhourlyで設定されている場合、最後の『時』を指します。スケジュールがdailyで設定されている場合、『昨日』を指します。直近最後のスケジュールが実際に実行されたどうかは問題ではありません。現在のセッション時刻から計算を行って算出された最後のタイムスタンプが単純に設定されます。

変数の計算

Javascriptスクリプトを使って、${...}シンタックスで変数を計算出来ます。

一般的なユースケースとしては異なるフォーマットでタイムスタンプ情報を変換する事等でしょう。Digdagでは時間計算用にMoment.jsをバンドルしています。

timezone: America/Los_Angeles

+format_session_time:
  # "2016-09-24 00:00:00 -0700"
  echo>: ${moment(session_time).format("YYYY-MM-DD HH:mm:ss Z")}

+format_in_utc:
  # "2016-09-24 07:00:00"
  echo>: ${moment(session_time).utc().format("YYYY-MM-DD HH:mm:ss")}

+format_tomorrow:
  # "September 24, 2016 12:00 AM"
  echo>: ${moment(session_time).add(1, 'days').format("LLL")}

+get_execution_time:
  # "2016-09-24 05:24:49 -0700"
  echo>: ${moment().format("YYYY-MM-DD HH:mm:ss Z")}

変数の定義

以下3つの方法で変数を定義する事が出来ます。

  • _exportパラメータをYAMLで定義
  • APIを使い、変数を定義
  • ワークフロー実行時に変数をパラメータとして含める

_export:パラメータを使う

YAMLファイル内で_export:ディレクティブを用いて変数を定義出来ます。データベースのホスト名の様に、静的な設定情報をロードする時などに便利です。

タスクが_exportディレクティブを持っている場合、タスク及びその子タスクはスコープで定義されているその変数を利用する事が出来ます。 以下例では、全てのタスクがfoo=1を利用出来ますが、bar=2は+step1(と+analyze)のみが利用出来る形となります。

_export:
  foo: 1

+prepare:
  py>: tasks.MyWorkflow.prepare

+analyze:
  _export:
    bar: 2

  +step1:
    py>: tasks.MyWorkflow.analyze_step1

+dump:
  py>: tasks.MyWorkflow.dump

APIを使う

language APIを使って変数をプログラミングで定義出来ます。Python APIではdigdag.env.exportdigdag.env.storeが利用出来ます。

import digdag

class MyWorkflow(object):
  def prepare(self):
    digdag.env.store({"my_param": 2})

  def analyze(self, my_var):
    print("my_var should be 2: %d" % my_var)
digdag.env.store(dict):
変数を格納。以降のタスク全てで変数を使えるようになります。
digdag.env.export(dict):
YAMLファイル内での"_export"ディレクトリと同義。子タスクで利用可能。

APIリファレンス詳細については以下を参照ください。

ワークフロー実行時に変数をパラメータとして含める

新しいワークフローセッションを始める際に変数を定義する事が出来ます。-p KEY=VALUEを繋げる形で設定出来ます。

$ digdag run -p my_var1=foo -p my_var2=abc

!includeで他のファイルを取り込む

複雑なワークフローを整理するために、YAMLファイルを小さなファイルに分割する事が可能です。!includeディレクティブを用いる事で、それらのファイルを集約して利用する事が出来ます。

_export:
  mysql:
    !include : 'config/mysql.dig'
  hive:
    !include : 'config/hive.dig'

!include : 'tasks/foo.dig'

パラレル実行

_parallel: trueパラメータをグループに設定する事で、グループの小タスクを並行稼動させる事が出来ます。(孫タスクが影響を受ける事はありません)

+prepare:
  # +data1, +data2, and +data3 run in parallel.
  _parallel: true

  +data1:
    sh>: tasks/prepare_data1.sh

  +data2:
    sh>: tasks/prepare_data2.sh

  +data3:
    sh>: tasks/prepare_data3.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

_backgrond: trueがタスクまたはグループに設定されている場合、タスクまたはグループは以前のタスクと一緒に並列で実行します。 次のタスクは、バックグラウンド実行しているタスクまたはグループの完了まで待機します。

+prepare:
  +data1:
    sh>: tasks/prepare_data1.sh

  # +data1 and +data2 run in parallel.
  +data2:
    _background: true
    sh>: tasks/prepare_data2.sh

  # +data3 runs after +data1 and +data2.
  +data3:
    sh>: tasks/prepare_data3.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

エラー通知

_error:パラメータが設定されていると、オペレータはワークフローが失敗した際にその処理を行います。

# this task runs when a workflow fails.
_error:
  sh>: tasks/runs_when_workflow_failed.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

メール通知をする場合はmail> operatorを使う事も出来ます。

まとめ

という訳で、Digdagのワークフロー定義に関する内容のご紹介でした。処理制御についてもDiadagは少ない設定・労力で色々出来るのだなぁという事が分かった気がします。この部分については改めて実践で色々試して勘所を掴んでみたいと思います。こちらからは以上です。

参考情報

AWS Cloud Roadshow 2017 福岡