Digdagのアーキテクチャとコンセプトを理解する #digdag

treasure-data-logo

Treasure Data社によってOSSワークフローエンジン『Digdag』はその発表以後多くの反響を呼び、社内外を含め良く利用されるようになってきていますが個人的には下記の『試してみた』エントリ以降、あまり触って来ていませんでした。ちょっと個人的にも腰を据えて取り掛かってみようかという感じになってきましたので、仕組みや使い方を把握するという意味で一番参考になるであろう公式ドキュメントの一部を読み進めてみた記録をブログエントリとして残しておきたいと思います。

Digdagのアーキテクチャ

Digdagによるワークフローの自動化

ワークフローを使って、手動で行なっているあらゆる操作を自動化出来ます。一連のタスクを『ワークフロー』として定義し、Digdagを使ってそれらを実行します。タスクはオペレータプラグインを使って定義され、ワークフローエンジンを使ってその処理をコントロール出来ます。詳細は以下参照。

プラグインの実行フレームワークとしてDigdagがワークロード自動化周りの面倒を見てくれるので、利用ユーザーは処理の自動化に集中する事が出来るようになります。

タスクが失敗した場合や時間内にワークフローが終了しない場合、Digdagで通知する事が出来ます。Digdagは設定されたスケジュールに基いてワークフローを開始します。

タスクはローカルマシンで実行する事も出来ますし、分散環境やDockerコンテナ上で実行する事も可能です。

グループによるタスクの管理

複雑なワークフローを自動化しようとなると定義はたちまち複雑になります。Digdagを使う事で、タスクをグループとして編成させる事が可能です。定義を確認する際、まずは『鳥の目』で俯瞰し、そして詳細を見ていきましょう。そうする事で開発中のワークフローのデバッグやレビューがやりやすくなります。本番環境では、何が起きているのか、そして問題をどのように解決するべきかを管理者に教えてくれます。

grouping-tasks

依存するタスクが無い、また依存するタスクが全てが完了した場合、タスクは開始します。 グループの親(タスク)が実行されると、その子(タスク)が実行します。それら子タスク全てが成功すると、親タスクもまた成功扱いとなります。しかし、子タスクが失敗した場合、その当該子タスクと親タスクも失敗します。 ルート(親)タスクが成功または失敗となったタイミングでタスクの実行は完了となります。

パラメータ

タスクのグループ化は、タスク間でパラメータをやり取りする為に用いられます。 親タスクは変数を子タスクに対してエクスポート出来ます。(環境変数をセットするUNIXシェルのExportコマンドのようなイメージ)

親タスクは処理実行時に子タスクを生成する事が出来るので、前のタスクの結果に応じて異なるタスクを実行するといった事も可能です。詳細は下記を参照ください。

export-params

ワークフローをコードで記述する

workflow-as-code

Digdagに於いて、ワークフローはコードで定義されます。これによって、ソフトウェア開発のベストプラクティスの恩恵 - バージョン管理、コードレビュー、テスト、プルリクエストを用いた連携等 - に預かる事が可能となります。ワークフローをGitリポジトリにPushする事が出来、誰もがあなたが実践した内容と同じ結果を再現する事が可能となります。

ローカルモードでの実行

Digdagは単一ファイルでの実行が可能です。新しいワークフローの作成と実行はmakefileと同じくらいの容易さです。 拡張子*.digのファイルがワークフローの定義ファイルとして用いられます。digdag run my_workflow.digの様にコマンドを実行する事でワークフローが稼働します。

ローカルマシン上でワークフローを作成し、テストしてしまえば、サーバにそのファイルをPushして、ワークフロー上で定期的に実行してみたくなる事でしょう。

サーバー上での実行

拡張子*.digのファイルを含めた、同じディレクトリ内存在するファイル群は『プロジェクト』と呼ばれます。そのプロジェクト全体をサーバーにpushする事で、Digdagはサーバ上でワークフローを実行します。

Docker上でのタスク実行

Dockerを使う事で、コンテナ上でタスクを実行出来ます。Dockerオプションが設定されると、タスクはDockerコンテナ上で実行を行います。

_export:
  docker:
    image: ubuntu:14.04

+step1:
  py>: tasks.MyWorkflow.step1

Digdagのコンセプト

プロジェクトとリビジョン

Digdagでは、ワークフローはワークフローで使用されるその他ファイル群と併せてパッケージングされます。これらのファイルの中にはSQLスクリプトやPython/Ruby/Shellスクリプトや設定ファイル等も含まれます。これらをひとまとめとして『プロジェクト』と定義しています。

プロジェクトがDigdagサーバーにアップロードされると、Digdagサーバーは新しいバージョンを追加し、古いバージョンのものを保持します。プロジェクトのバージョンは『リビジョン』と呼ばれます。ワークフロー実行時、Digdagは最新バージョンのものをデフォルトで実行します。ですが、以下のような用途で過去のバージョンを指定し実行する事も可能です。

  • 過去のワークフロー実行の定義を確認したい。
  • 以前と同じ結果を再現させるために古いリビジョンのワークフローを使って実行したい。
  • 問題が発生したので、問題発生前の最新稼働リビジョンの状態に戻したい。

プロジェクトには複数のワークフローを含める事が出来ます。 しかし、ワークフローが他のワークフローと関連しているものでなければ、独立した新しいプロジェクトとして作成するべきです。プロジェクト内の全てのワークフローは新しいバージョンのものをアップロードした際に一緒にアップデートされる、というのがその理由です。

セッションとアテンプト(試み)

セッションは、ワークフローを成功裏に実行させるための試みです。アテンプトは、実際のセッションの実行です。 失敗したワークフローを再実行すると、セッションは複数のアテンプトを持つ形になります。

セッションとアテンプトが分離されている理由は、実行が失敗するかもしれない、からです。セッションを一覧化する際、期待されるステータスは全てのセッションがgreen(成功)である状態です。 もし失敗したセッションを見つけたら、そのアテンプトを確認し、ログから問題をデバッグします。

その問題を解決するために新しいリビジョンのものをアップロードをし、新しいアテンプトを開始するかもしれません。セッションを使う事で、あなたは、計画された実行が正常に行われるかどうかを容易に確認する事ができます。

スケジュール実行とセッションタイム

セッションはsession_timeと呼ばれるタイムスタンプを持っています。 この要素は、『ワークフローの実行時間』を意味します。 例えば、ワークフローが日次でスケジューリングされている場合、この時間は通常 "2017-01-01 00:00:00"のように"00時00分00秒"となります。 あるデータが1時間前に準備出来ているようにするために、処理の実行を2時間遅らせたくなるかもしれません。 昨日の結果を埋め戻す(backfill)為に、翌日のワークフローを実行する事が在るかもしれません。この例での時間、2017-01-01 00:00:00は、session_timeと呼ばれます。

session_timeはワークフローの履歴の中で一意です。同じsession_timeを持つ2つのセッションがサブミットされた場合、後者のリクエストは破棄されます。これにより、同時刻に発生したセッションの偶発的な衝突を避ける事が出来ます。同じ時間にワークフローを実行させたい場合は新しいセッションをsubmitするのでは無く、過去のセッションを再試行すべきです。

タスク

セッションの試みが開始されると、ワークフローは一連のタスクに変換されます。 タスクそれぞれ互いに依存関係を持ちます。 例えば、+dump+process1+process2イベントに依存し、 +process1+process2+prepareに依存する、というようなものです。 Digdagはその依存関係を把握し、順序に従ってタスクを実行します。

パラメータのエクスポートと格納

タスクのパラメータには以下3種類があります。

local:
直接タスクに設定するパラメータ
export:
親タスクからエクスポートされたパラメータ
store:
以前のタスクで保存されたパラメータ

タスク実行時、これらのパラメータ群は1つのオブジェクトにマージされます。

localパラメータが最も高い優先度となり、exportとstoreパラメータは互いに上書きを行います。従って、後のタスクで設定されているパラメータの方がより優先度が高い形となります。exportパラメータは子タスクに値を渡す為に、親タスクで使用されます。storeパラメータは子タスクを含む、全ての『次のタスク』に使用されます。

exportパラメータの影響はstoreパラメータの比較に限定されます。これにより"モジュール化"されたワークフローを構築する事が出来ます。例えば、ワークフローがデータを処理するスクリプトを使っている場合、スクリプトの実行を制御する為に幾つかのパラメータを使う事でしょう。その一方で、パラメータによって影響を受けるスクリプトを作成したくないと思う事もあるかと思います。(例:データロードの部分はデータ処理部分の変更による影響を受けたくない)

このケースでは、スクリプトを単一の親タスク配下に置くことが出来、親タスクのパラメータをエクスポートする事が出来ます。

storeパラメータは後続のタスクには見える状態=前のタスクには見えていない状態となっています。 例えば、ワークフローを実行し、再実行したとします。 このケースでは、タスクによって格納されたパラメータは、そのタスクが正常終了していたとしてもその前のタスクでは表示されません。

storeパラメータはグローバル変数ではありません。 2つのタスクが平行稼働している場合、それらタスクは異なるstoreパラメータを利用しています。 これは実際の実行タイミングに関係なく、ワークフローの動作として一致するものです。 例えば、2つの並行タスクに依存する他のタスクが実行される場合、最後のタスクによって格納されるパラメータはタスク送信された順序で使用されます。

オペレータとプラグイン

オペレータ(Operator)は、謂わば『タスクの実行者』です。 オペレータはワークフロー定義の中で『sh>』や『pg>』というような形で定義されます。 タスクが実行されると、Digdagはオペレータを選択し、パラメータ(local,export,store)をマージ、マージされたパラメータをオペレータに渡します。

オペレータは、一般的なワークロードのパッケージとしてみなす事が出来ます。オペレータを使う事によって、少ないスクリプトの記述でより多くの事が出来るようになります。オペレータはプラグインとして設計されています。(完全に、とまではいきませんが)

ワークフローを簡素化するオペレータをインストールし、他のワークフローで再利用出来るように、オペレータを作成します。Digdag自体、多くのオペレータをその環境上で実行するシンプルなプラットフォームとも言えるでしょう。

動的タスク生成と『_check』『_error』タスク

Digdagはワークフローを、依存関係を持つタスクのセットに変換します。 このタスクのグラフは、DAG(Directed Acyclic Graph:有向非循環グラフ)と呼ばれます。 DAGは実行に適しています。最も依存しているタスクから実行します。 しかし、DAGではループを表現する事は出来ません。ifの分岐を表現する事も簡単ではありません。

しかし、ループや分岐の処理は有用なものです。 この問題を解決する為に、Digdagは動的に実行するDAGにタスクを追加します。 下記例ではDigdagはループを表す、^subで始まる3つのタスクを生成します。

  • +example^sub+loop-0
  • +example^sub+loop-1
  • +example^sub+loop-2
+example:
  loop>: 3
  _do:
    echo>: this is ${i}th loop

_check_errorオプションは、動的にタスクを生成する時に使用します。 これらのパラメータは、タスクが成功または失敗した場合にのみ、他のタスクを実行する為にDigdagによって使用されます。

_checkタスクは、タスクが正常に完了した後に生成されます。 これは特に、次のタスクを始める前に、完了したタスクの結果を検証したい時に便利です。

_errorタスクはタスクが異常終了、失敗した後に生成されます。 外部システムにタスクの失敗を通知する時等に有用です。

タスクの命名・タスクの再実行

タスクは、試み(attempt)の一意の名前を有しています。

attemptを再実行する際、この名前は直前の実行されたattemptと一致させる為に使用されます。 子タスクはは親タスクの名前をprefixとして持っています。 ワークフロー名はまた、ルートタスクとしてprefixで始まって(持って)います。

以下例では、タスク名は以下のような形となります。

  • +my_workflow+load+from_mysql+tables
  • +my_workflow+load+from_postgres
  • +my_workflow+dump
# my_workflow.dig
+load:
  +from_mysql:
    +tables:
      ...
  +from_postgres:
    ...
+dump:
  ...

ワークスペース

ワークスペースは、タスクを実行する際に利用するディレクトリです。

Digdagはプロジェクトのアーカイブからこのディレクトリ(=workspace)にファイルを抽出、ディレクトリを変更し、タスクを実行します。(注:ローカルモード実行では、作業ディレクトリ=ワークスペースという概念の為、ワークスペースには何も生成しません。)

プラグインはワークスペースの親ディレクトリへのアクセスを許可しません。 これはDigdagサーバーが共有環境で実行されているためです。 プロジェクトは自己完結型の構成であるべきです。外部環境に依存する必要はありません。

スクリプトの演算子は例外です。(例:sh>オペレータ等) docker:オプションを使って実行する事を推奨します。

まとめ

という訳でDigdagの仕組みを理解するための『アーキテクチャ』と『コンセプト』のご紹介でした。ドキュメントしても非常に読み易く、全体の理解もかなり進んだ感じがします。とは言えもっと良く理解するのは実際に使いこなすのが一番ですので、引き続き色々触って行ってみようと思います。こちらからは以上です。

参考情報

AWS Cloud Roadshow 2017 福岡