
【レポート】Workflow Engines Night #workflowenginesnight
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
DI部の川崎です。Digdag使ってます。
今日はDMM.comラボさんが開催された、ワークフローエンジンをテーマにした勉強会「Workflow Engines Night」に参加する機会がありましたので、Digdagの開発者古橋さんのセッションを中心に、参加レポートをお届けします。
勉強会のハッシュタグはこちら #workflowenginesnight
はじめに
会場は、六本木一丁目のDMM.comラボさんのオフィスでした。勉強会の概要を引用します。
概要
今回はワークフローエンジンをテーマに勉強会を開催致します。
DMM.comラボからはビッグデータ部で『Digdag』を実際に導入・運用した話、
またインフラ本部からは『StackStorm』による運用コスト・リスク低減の取り組みについてお話しをさせていただきます。
なお、今回は『DigDag』の開発者である 古橋 貞之氏が帰国のタイミング!
とのことでゲストとして参加いただけることとなりました。
開催日がとても近いのですが、直接話を聞ける機会ですのでぜひ皆様ご参加ください!
発表タイトルは、以下の通りでした。
『StackStorm ワークフローエンジンによる運用コスト・リスク低減の取り組み』
株式会社DMM.comラボ 大山 裕泰 さん
→ 発表資料(https://speakerdeck.com/dmmlabo/stackstorm-wakuhuroenzinniyoruyun-yong-kosutorisukudi-jian-falsequ-rizu-mi)
『JenkinsからDigdagへ日次バッチを移行して幸せになるお話』
株式会社DMM.comラボ 鈴木 翔太 さん
→ 発表資料(https://speakerdeck.com/dmmlabo/jenkinskaradigdagheri-ci-batutiwoyi-xing-sitexing-seninaruohua)
以下で紹介する、古橋さんのセッション。
『Treasure DataにおけるDigdagによる大規模データ処理の自動化とエラー処理』
トレジャーデータ株式会社 古橋 貞之 さん
(※発表資料は以下でインライン表示)
LT『Digdagを仕事で使ってみて良かったこと、ハマったこと』
株式会社VASILY 塩崎 健弘 さん
→ 発表資料(https://speakerdeck.com/shiozaki/using-digdag-in-production-environment)
LT『とあるマーケティング部隊でのDigdagの活用事例』
@grimrose さん
→ 発表資料(https://gist.github.com/grimrose/5bea98db4c82056dad1ab84d6653308e)
Treasure DataにおけるDigdagによる大規模データ処理の自動化とエラー処理(トレジャーデータ株式会社 古橋 貞之さん)
[発表スライド]
![]()
早速、Digdag開発者の古橋さんのセッションについてレポートしていきます。
![]()
まずは、自己紹介。
シリコンバレーに住んでます。なぜかというと、トレジャーデータという会社をそこにつくったから、とのことでした。
(会場の参加者に挙手をお願い)
fluentdを知ってる人 → 9割の人が手を挙げる
MessagePack知ってる人 → 7割の人が手を挙げる
![]()
これから話す内容は、1歩立ちもどって、
そもそもなんで、Digdagを作ったのか?という話をします。
手作業を自動化するため。
![]()
定期的な実行はもちろんの事、
過去分を一括で処理したい、となると、
時刻を変数として処理する、という事が重要になる。
エラー処理、
失敗したところから、レジューム実行
実行時間可視化
タスク並列
ワークフローをテキストで記述、
GUIによるワークフロー開発
Pythonなどのコードを書かなくても使える
異なった環境で実行しても、再現性があるもの
このような機能を全部満たすものは、なかなか無かった。
Digdagなら、全部できます!
GUIだけ遅れているので、開発者募集中です。
![]()
Digdagを作る前に、ジョブ実行ツール、ワークフロー実行ツールを一通り調べた。
OSS、商用のエンジン
Makefileはプリミティブだけど、その機能を持つ。
ウリや強みを比較した。
![]()
チャレンジとして、いろいろなクラウド、オンプレのソフトウェア。
それぞれにAPIを持ち、扱うためのツールも違う。
![]()
裏側にあるデータベースもいろいろである。
以前は、MongoDBがよく使われていた。
元々使われていた、SQLServerなどの(従来型のデータベースに加え)
クラウドでデータ連携が行われるようになり、
次第に、ここに挙げた、Hadoop、カサンドラ、presto、Redshiftとデータベースがどんどん増えていき、
矢印がどんどん増えて、連携が難しくなった。
データの動きを、どのように管理していくか。
![]()
新しい技術として、Cosmos DBが出ました、DynamoDBが出ました、と次々と登場し、難しい問題になった。
![]()
ユースケースを見ていくと、
Ingest、Enrich、Model、Load、Utilize
Ingest:Fluentd/Embulkで処理
Enrich:botのデータ取り除く、地域データを追加、JSONファイルをフラットに展開
Model:ABテスト適用、データモデルつくる
Load:DBにロード、アプリから使えるようになる
Utilize:レコメンド、プロダクションのデータを使って、アプリが作れる
こんな感じなのが、典型的な分析アプリ。
このワンステップは、1つの処理ではとても終わらない
複数のアルゴリズムを使って、いいやつをとるとか。
複雑、エラー処理も必要で、大変。
![]()
昔は、シェルスクリプトでがんばる
1回実行するだけなら、何とかなるが、
それをプロダクション環境で、繰り返し実行するのはツライ。
エラーハンドリングの抜けがあったり。
Write one、Nobody Read!
エラーが起きても、アラート来ない。
気付かない。
レジュームできないから、最初からやり直し。
並列実行もできない。
ログ収集は、各サーバにログインして確認。
モジュール化できない、再利用できない
2箇所で同じ処理が書かれてる。
![]()
Digdagを作ったのは、こういった事を解決したい、
というのが大元のモチベーション。
![]()
ユーザーのユースケースの例。
Google Analyticsは、BigQueryにしかエクスポートできない、とか制限がある。
テーブルをTreasureDataに持ってくる。
数年分のデータ。
解析クエリを、10個くらい並列で実行して、
集約して、BIツールのTableauで見る。
レポートをSlackに通知。
クラウド3つ、加えてオンプレ。
Digdagを使えば、複数クラウドが現実的に利用できるようになった。
![]()
Digdagのコンセプト。Operator、Parameter、Task groups。
Operatorは、pgやs3、pyなど。
やり方、知識をパッケージしたもの。
pgの場合、複雑なトランザクション。
2回実行されるのを防ぐ仕組みが、組み込まれている。
内部で、クエリ投げて、更新してる。
検出する仕組みが入ってる。
JDBCタイムアウトや、リトライ。
S3も、そのまま実行すると、結構失敗が起きる。
リトライなしだとツライ。
知識をパッケージ化している。
パラメータ(引数)
ビルトイン引数、session_time
JavaScriptが使える。
http.last_contentを、パース
ワークフローのなかで、モジュール化ができ、
再利用可能になる。
Task
グループ化して、管理できる。
![]()
Operator library
tdオペレータ
databaseパラメータがセットされる。
tdで、クエリが投げられる。
プラグインになってるので、簡単に追加できる。
![]()
+記号がタスク
並べて、別のタスクを子ノードに。
ネストできるので、グループ化が可能
load
prosess
全体像を把握しやすい。
(画面の)+load_dataタスクは、_parallel: trueが指定されているので、真ん中の処理は並列で実行される。
![]()
タスクグルーピング
よくある、パイプライン。
依存関係の矢印がたくさん出てくる。
矢印を1個1個宣言することになる。
DAGベースの処理。
途中に処理を挟みたい、となると変更が必要な箇所が多い。
![]()
グループとして、扱って、そうすると、
矢印の数が減る。
ワークフローを読める。
スクリプトは読めないが、、
![]()
社内でワークフローを書く人は、エンジニアであることが多い。
パラメータ使って、処理を記述すれば、
「s3_wait」オペレータは、ファイルができるまで待つ。
${session_date}には日付が入る。
毎日S3でファイルを待ち受けて、
ファイルがアップされたら、
クエリを実行する、という処理が簡単に記述できる。
とてもパワフル。
RubyやPythonで書けば、何でもできるが。
Digdagのワークフローは、簡単に読める。
ワークフローを変えて欲しい、という依頼に
開発者じゃなくても対応できる。
![]()
パラメータ
td_for_eachオペレータ
テーブルの1行1行に対して、ぐるぐるループで実行できる。
アクティブのユーザを解析して、Eメールを送る、というような処理を簡単に自動化できる。
![]()
好みのテキストエディタで書いて、
Githubで管理できる。
Dockerを使い、
手元でも動けば、プロダクション環境のサーバでも動く、を実現できる。
特定のマシンじゃないと動かない、だと困る
どの環境でも動く。
![]()
Operatorプラグインをforkして、
別のプラグインを簡単に作れる。
AWSのコンテナのレジストリである、ECRに登録する。
(ECR: Amazon EC2 Container Registry)
![]()
ここで、デモ。
![]()
このWeb画面は、TreasureDataで提供しているバージョンのDigdagのサーバ。
左上のロゴだけ違う。
右上の「RETRY ALL」のボタンをクリック。
(実行中。画面上で「Timeline」のタスクが増えた。)
タスクを動的に増やせる。おもしろい機能。
「syndicate_loop」のタスクは、ループ処理。
「DAG」(有向非巡回グラフ、Directed Acyclic Graph)はループが書けない。(表現できない)
Digdagでは、DAGにタスクを動的に生成して、ループ処理を実現している。
![]()
![]()
![]()
![]()
![]()
ワークフロー定義のEditが、この画面でできる。
Digdagのワークフローの中では、JavaScriptが記述できる。
YMLなので、プログラムで生成もできる。
ワークフローを生成して、pushする。
![]()
![]()
![]()
![]()
850 active workflow
3,600 workflow 毎日実行される
task 28,000 (1つのワークフローは、平均7個か8個ぐらいのタスクで構成されている)
トータルでこれまでに、400,000 のワークフローが実行された。
![]()
具体的なユースケースを紹介。
顧客分析、アラート。
準備段階として、TreasureDataでクエリを実行。
その後、users.sqlのクエリを実行して、
結果に対して、1行ずつ処理を行う。
しきい値を越えてる人にメールを送る、という処理を行っている。
![]()
mail.txtがメールのテンプレート。パラメータを含めることができる。
メールに、クエリの結果を含めることができる。
リソース制限を越えた人にメールを送る。
![]()
td_loadオペレータ
Embulkを実行、
パラレルで実行される
Salesforceからリードを一時テーブルに読み込み、
アトミックにリネーム。
レポートを送る。
task_completeを実行して終了。
この処理を、15分に1回実行する。
![]()
Backend of BI app:Webのアプリ
2つのモード、all_mode / incremental_mode
ifオペレータで分岐処理
all_modeがtrueの場合は、こちらを処理し、
そうでない場合、incremental_modeの場合は、
(全部は書かれてないが)そちらの処理をする。
![]()
Spark appの結果を、
プロダクションにデプロイする例。
ここで、EMRオペレータを使っている。
![]()
![]()
HA構成について
TreasureDataでは、マルチテナントで運用しているので、
HAの要件を満たさないと困る。
taskの状態は、PostgreSQLに保存。
Digdag server側はステートレス。
![]()
Digdag serverを増やせば、
max-task-threads 100 と指定しているので、
1台あたり100並列
タスクは自動的に分割して実行される。
![]()
ログ
バッファに保存されて、指定のディレクトリのファイルにフラッシュされる。
digdag log <attempt_id> -f
で、tail -f のように、追跡表示できる。
![]()
ログは通常は、ローカルに保存される。
S3が利用できる環境では、
log-server.type=s3
で、S3に保存可能。
プラグイン構成になっているので、
新しいプラグインを作成して、保存先の処理を変更すれば、FTPサーバやSFTPサーバにも対応できる。
ぜひトライしてみてください!
![]()
以下、質疑応答:
Q1: Digdag使ってます。stdoutで出力される内容を、ログを一元管理する目的で、fluentdに出したいのだけど。
A1: Digdagには、
ロギングのプラグイン
ストレージのプラグイン
の2種類がある。
ロギングプラグインを変更して、共有ストレージに書く、というのが実現できるはず。
Q2: スケジューラモードとサーバーモードの違いがよくわからないのだけど?
A2: サーバーモード:リモートからのpushを待ってる。ワークフロー定義をDBで共有している。
スケジューラモード:手元での変更が、すぐ反映されるので便利。(ワークフロー定義を変更すると、スケジューラは自動的にdigファイルをリロードする)
実装はほぼ一緒。
Q3: REST APIを使いたいけど、仕様のドキュメントがない。仕様変更が心配。
A3: REST APIの使い方は、コードを読んでください。
サービスとして提供しているので、仕様はもう変えられなくなってきちゃった。
Q4: 同じワークフローを、パラメータを変えて複数実行しようとしたら、1回しか実行されなかった。なぜ?
A4: session: ある時刻、計画したもの。
attempt: 実際に実行されたもの。
Digdagでは、二重実行を避けるため、意図的に時刻と結びつけている。
途中からレジュームにも対応している。
同一のワークフローで、同一のセッションタイムでは、まとめられてしまう。(1つだけしか実行されない)
そのようなユースケースでは、
callオペレータで対応できる。
共通処理の部分をcallで呼び出す、
別々のワークフローを作ってください。
Q5: postgresの接続数が増えてしまうissueがあると聞いたが?
A5: 解決済み、トランザクションマネージャでプールされるようになった。
例えば、10個に限定することができる。
さいごに
時間の関係で、他の方のレポートが書けませんでしたが、とても興味深い内容でした。資料が公開されましたら、こちらにリンクする予定です。
勉強会終了後は、DMM.comラボさんの提供で簡単な懇親会が開かれ、参加者どうしで情報交換をすることができました。
とても有意義な勉強会を企画していただき、ありがとうございました!>DMM.comラボさん、発表者のみなさま。
個人的には、まだ使ったことがなかった、DigdagのWeb-UIのデモを見ることができて、参加した甲斐がありました。









