この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Digdag が Apache License 2.0 の元でオープンソース化されましたよ! さぁ試すんだ…! 今すぐにでも! https://t.co/Uzc4a5GLCe ドキュメント:https://t.co/PF8wy5KHln
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
という訳で試してみました。注目度の高かったワークフローエンジン『Digdag』がついにOSS化されました!Githubリポジトリ及びドキュメントは以下となります。
目次
インストール
環境の準備
今回は手っ取り早く触ってみよう!という事でEC2環境(Amazon Linux)を用意しました。
$ ssh -i xxxxxxx.pem ec2-user@xx.xxx.xxx.xx
Last login: Wed Jun 15 15:03:28 2016 from xxx.xxx.xxx.xxx
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2016.03-release-notes/
$ sudo yum -y update
Digdagのインストール実施
導入はとても簡単。ソースコードを入手し、実行権限を付与。
$ sudo curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
100 18.6M 100 18.6M 0 0 1201k 0 0:00:15 0:00:15 --:--:-- 690k
$
$ sudo chmod +x /usr/local/bin/digdag
$
digdag --helpを実行してみます。おや?エラーが出ますね...
$ digdag --help
Exception in thread "main" java.lang.UnsupportedClassVersionError: io/digdag/cli/Main : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:803)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
$
このエラーに関する記述もドキュメントに記載されていました。Java8(8u72)以上の環境が必要となるようです。
$ java -version
java version "1.7.0_101"
OpenJDK Runtime Environment (amzn-2.6.6.1.67.amzn1-x86_64 u101-b00)
OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
既存Java環境をアンインストールし、
$ sudo yum -y remove java
$ java -version
-bash: /usr/bin/java: そのようなファイルやディレクトリはありません
RPMパッケージを入手、新しいバージョンのJavaをインストール。
$ sudo rpm -ivh jdk-8u92-linux-x64.rpm
準備しています... ################################# [100%]
更新中 / インストール中...
1:jdk1.8.0_92-2000:1.8.0_92-fcs ################################# [100%]
Unpacking JAR files...
tools.jar...
plugin.jar...
javaws.jar...
deploy.jar...
rt.jar...
jsse.jar...
charsets.jar...
localedata.jar...
$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
$
再度digdag --helpコマンドを実施。今度はちゃんとヘルプが表示されました!これでインストール完了です。
$ digdag --help
2016-06-15 15:35:29 +0000: Digdag v0.8.1
Usage: digdag <command> [options...]
Local-mode commands:
new <path> create a new workflow project
r[un] <workflow.dig> run a workflow
c[heck] show workflow definitions
sched[uler] run a scheduler server
selfupdate update digdag to the latest version
Server-mode commands:
server start digdag server
Client-mode commands:
push <project-name> create and upload a new revision
start <project-name> <name> start a new session attempt of a workflow
retry <attempt-id> retry a session
kill <attempt-id> kill a running session attempt
backfill start sessions of a schedule for past times
reschedule skip sessions of a schedule to a future time
log <attempt-id> show logs of a session attempt
workflows [project-name] [name] show registered workflow definitions
schedules show registered schedules
sessions show sessions for all workflows
sessions <project-name> show sessions for all workflows in a project
sessions <project-name> <name> show sessions for a workflow
session <session-id> show a single session
attempts show attempts for all sessions
attempts <session-id> show attempts for a session
attempt <attempt-id> show a single attempt
tasks <attempt-id> show tasks of a session attempt
version show client and server version
Options:
-L, --log PATH output log messages to a file (default: -)
-l, --log-level LEVEL log level (error, warn, info, debug or trace)
-X KEY=VALUE add a performance system config
-c, --config PATH.properties Configuration file (default: /home/ec2-user/.config/digdag/config)
Use `<command> --help` to see detailed usage of a command.
$
サンプルワークフローの実行
サンプルワークフローの実行についてもドキュメントでカバーされていますのでこちらも早速試してみます。
digdag initでワークフローの初期化/作成、
$ digdag init cmdag
2016-06-15 15:38:34 +0000: Digdag v0.8.1
Creating cmdag/.gitignore
Creating cmdag/tasks/shell_sample.sh
Creating cmdag/tasks/repeat_hello.sh
Creating cmdag/tasks/__init__.py
Creating cmdag/cmdag.dig
Done. Type `cd cmdag` and then `digdag run cmdag.dig` to run the workflow. Enjoy!
作成されたフォルダに移動し、digdag runコマンドで拡張子*.digを指定して実行。幾つかログが表示された後、処理が正常終了しました。
$ cd cmdag/
$ digdag run cmdag.dig
2016-06-15 15:40:44 +0000: Digdag v0.8.1
2016-06-15 15:40:46 +0000 [WARN] (main): Using a new session time 2016-06-15T00:00:00+00:00.
2016-06-15 15:40:46 +0000 [INFO] (main): Using session .digdag/status/20160615T000000+0000.
2016-06-15 15:40:46 +0000 [INFO] (main): Starting a new session project id=1 workflow name=cmdag session_time=2016-06-15T00:00:00+00:00
2016-06-15 15:40:47 +0000 [INFO] (0017@+cmdag+step1): sh>: tasks/shell_sample.sh
Step1 of session 2016-06-15T00:00:00+00:00
2016-06-15 15:40:47 +0000 [INFO] (0017@+cmdag+step2+worker1): sh>: tasks/repeat_hello.sh
Hello, world! from process 3334
2016-06-15 15:40:47 +0000 [INFO] (0019@+cmdag+step2+worker2): sh>: tasks/repeat_hello.sh
Hello, world! from process 3336
Hello, world! from process 3334
Hello, world! from process 3336
Hello, world! from process 3334
Hello, world! from process 3336
Hello, world! from process 3334
Hello, world! from process 3336
2016-06-15 15:40:51 +0000 [INFO] (0019@+cmdag+step3): py>: tasks.MyWorkflow.step3
Step3 of session 2016-06-15T00:00:00+00:00
Success. Task state is saved at .digdag/status/20160615T000000+0000 directory.
* Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
* Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
$
実行結果も気になりますが、まずは実行時に指定したファイルを見てみましょう。非常にシンプルな設定内容で何やらジョブ等が設定されています。step2の部分は_parallel: trueと指定があるので並行処理をさせているというのも把握出来ますね。
$ cat cmdag.dig
timezone: UTC
_export:
hello: "Hello, world!"
+step1:
sh>: tasks/shell_sample.sh
+step2:
_parallel: true
+worker1:
sh>: tasks/repeat_hello.sh
+worker2:
sh>: tasks/repeat_hello.sh
+step3:
# defined at tasks/__init__.py
py>: tasks.MyWorkflow.step3
$
設定ファイルで指定されていた実行ファイルの内容も確認して見ます。*.digファイルで定義した変数(hello)がこちらのファイルで受け渡されて出力されている様ですね。
$ cd tasks/
$ ll
合計 16
-rw-rw-r-- 1 ec2-user ec2-user 159 6月 15 15:38 __init__.py
-rw-rw-r-- 1 ec2-user ec2-user 578 6月 15 15:40 __init__.pyc
-rwxrw-r-- 1 ec2-user ec2-user 167 6月 15 15:38 repeat_hello.sh
-rwxrw-r-- 1 ec2-user ec2-user 49 6月 15 15:38 shell_sample.sh
$ cat shell_sample.sh
#!/bin/sh
echo "Step1 of session $session_time"
$ cat repeat_hello.sh
#!/bin/sh
echo "$hello from process $$"
sleep 1
echo "$hello from process $$"
sleep 1
echo "$hello from process $$"
sleep 1
echo "$hello from process $$"
sleep 1
$
タスクのステータスに関する情報が格納されているディレクトリに移動して中身を見てみます。非常にシンプルなタスクだったのか、詳細な内容までは出力されていませんが、処理それぞれの内容(*.yml)にstate: "success"という情報が出力されているのが確認出来ます。
$ pwd
/home/ec2-user/cmdag/.digdag/status/20160615T000000+0000
$ ll
合計 24
-rw-rw-r-- 1 ec2-user ec2-user 148 6月 15 15:40 +cmdag+step1.yml
-rw-rw-r-- 1 ec2-user ec2-user 156 6月 15 15:40 +cmdag+step2+worker1.yml
-rw-rw-r-- 1 ec2-user ec2-user 156 6月 15 15:40 +cmdag+step2+worker2.yml
-rw-rw-r-- 1 ec2-user ec2-user 148 6月 15 15:40 +cmdag+step2.yml
-rw-rw-r-- 1 ec2-user ec2-user 148 6月 15 15:40 +cmdag+step3.yml
-rw-rw-r-- 1 ec2-user ec2-user 142 6月 15 15:40 +cmdag.yml
$ cat +cmdag.yml
fullName: "+cmdag"
state: "success"
result:
subtaskConfig: {}
exportParams: {}
storeParams: {}
report:
inputs: []
outputs: []
$ cat +cmdag+step1.yml
fullName: "+cmdag+step1"
state: "success"
result:
subtaskConfig: {}
exportParams: {}
storeParams: {}
report:
inputs: []
outputs: []
$ cat +cmdag+step2+worker1.yml
fullName: "+cmdag+step2+worker1"
state: "success"
result:
subtaskConfig: {}
exportParams: {}
storeParams: {}
report:
inputs: []
outputs: []
$
その他ドキュメントの内容について
『Getting Started』に関する部分は上記の内容となります。公式ドキュメントにはその他以下の様なセクションで内容が構成されています。読み応えがありますがとても分かり易く解説されている印象です。
- Architecture
- Workflow definition
- Scheduling workflow
- Operators
- Command reference
- Language API - Python
- Language API - Ruby
- Release Notes
Digdagの生みの親:古橋さんの関連ツイートが特徴を端的に紹介されているので、併せて引用させて頂きます。
Digdag、まずはEmbulkによるETL処理の自動化に最適。複数のデータソースから並列または直列にデータロード→日付ごとにテーブル作成→一次集計とJOIN…という処理を直感的に記述できる。 #digdag pic.twitter.com/IDEV2eVbjZ
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
パラメータ化が可能。引数やファイルから受け取ったパラメータを、設定ファイルや引数に埋め込んでからコマンドを実行できる。同じような処理を同じテーブルや同じデータソースに対して適用したい場合に効果的。 #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
for_each> や if> などのフロー制御ができ、プログラマブルなワークフローを組み立てられる。外部のスクリプトを呼んでパラメータをとってきて、その値に応じて次のアクションを変えたり、ループさせたりできる。https://t.co/Y7v21U3Sek #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
Dockerイメージを指定してタスクを実行できる、スケジューラ内蔵なのでcronがいらない、分散実行に対応、実行時間超過/失敗時のエラー通知、etc etc。今ワークフローエンジンをちゃんと作るとこうなるよね、という機能がほぼあるはず。 #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
正直なところ、コードはEmbulkより良くできているので、メンテナンスが楽そう…チームメンバーも2人いるし。 #digdag
— Sadayuki Furuhashi (@frsyuki) 2016年6月15日
参考資料
更にDigdagを理解する上での助けとなる情報は以下の通り。過去イベントで発表されたスライド資料及び動画、また、古橋さんの事前インタビュー記事も必見です。
スライド資料: 分散ワークフローエンジン『DigDag』の実装 at Tokyo RubyKaigi #11
YouTube動画: 【基調講演】分散ワークフローエンジン『Digdag』の実装
関連インタビュー記事
まとめ
以上、新しくリリースされたTreasure Data社によるOSSプロジェクト『Digdag』に関する『やってみた』エントリでした。上記で試した内容はまだほんの触りの部分ですので、これから折を見てドキュメントを読みながらDigdagによるワークフロー構築を実践して行きたいと思います!こちらからは以上です。