【レポート】Workflow Engines Night #workflowenginesnight

【レポート】Workflow Engines Night #workflowenginesnight

Clock Icon2017.06.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

DI部の川崎です。Digdag使ってます。

今日はDMM.comラボさんが開催された、ワークフローエンジンをテーマにした勉強会「Workflow Engines Night」に参加する機会がありましたので、Digdagの開発者古橋さんのセッションを中心に、参加レポートをお届けします。

勉強会のハッシュタグはこちら #workflowenginesnight

はじめに

会場は、六本木一丁目のDMM.comラボさんのオフィスでした。勉強会の概要を引用します。

概要
今回はワークフローエンジンをテーマに勉強会を開催致します。
DMM.comラボからはビッグデータ部で『Digdag』を実際に導入・運用した話、
またインフラ本部からは『StackStorm』による運用コスト・リスク低減の取り組みについてお話しをさせていただきます。
なお、今回は『DigDag』の開発者である 古橋 貞之氏が帰国のタイミング!
とのことでゲストとして参加いただけることとなりました。
開催日がとても近いのですが、直接話を聞ける機会ですのでぜひ皆様ご参加ください!

発表タイトルは、以下の通りでした。

『StackStorm ワークフローエンジンによる運用コスト・リスク低減の取り組み』
株式会社DMM.comラボ 大山 裕泰 さん
→ 発表資料(https://speakerdeck.com/dmmlabo/stackstorm-wakuhuroenzinniyoruyun-yong-kosutorisukudi-jian-falsequ-rizu-mi

『JenkinsからDigdagへ日次バッチを移行して幸せになるお話』
株式会社DMM.comラボ 鈴木 翔太 さん
→ 発表資料(https://speakerdeck.com/dmmlabo/jenkinskaradigdagheri-ci-batutiwoyi-xing-sitexing-seninaruohua

以下で紹介する、古橋さんのセッション。
『Treasure DataにおけるDigdagによる大規模データ処理の自動化とエラー処理』
トレジャーデータ株式会社 古橋 貞之 さん
(※発表資料は以下でインライン表示)

LT『Digdagを仕事で使ってみて良かったこと、ハマったこと』
株式会社VASILY 塩崎 健弘 さん
→ 発表資料(https://speakerdeck.com/shiozaki/using-digdag-in-production-environment

LT『とあるマーケティング部隊でのDigdagの活用事例』
@grimrose さん
→ 発表資料(https://gist.github.com/grimrose/5bea98db4c82056dad1ab84d6653308e

Treasure DataにおけるDigdagによる大規模データ処理の自動化とエラー処理(トレジャーデータ株式会社 古橋 貞之さん)

[発表スライド]

thumb_P_20170607_203008_vHDR_Auto_1024
早速、Digdag開発者の古橋さんのセッションについてレポートしていきます。

thumb_P_20170607_203014_vHDR_Auto_1024
まずは、自己紹介。
シリコンバレーに住んでます。なぜかというと、トレジャーデータという会社をそこにつくったから、とのことでした。
(会場の参加者に挙手をお願い)
fluentdを知ってる人 → 9割の人が手を挙げる
MessagePack知ってる人 → 7割の人が手を挙げる

thumb_P_20170607_203057_vHDR_Auto_1024
これから話す内容は、1歩立ちもどって、
そもそもなんで、Digdagを作ったのか?という話をします。
手作業を自動化するため。

thumb_P_20170607_203136_vHDR_Auto_1024
定期的な実行はもちろんの事、
過去分を一括で処理したい、となると、
時刻を変数として処理する、という事が重要になる。

エラー処理、
失敗したところから、レジューム実行
実行時間可視化
タスク並列

ワークフローをテキストで記述、
GUIによるワークフロー開発
Pythonなどのコードを書かなくても使える
異なった環境で実行しても、再現性があるもの

このような機能を全部満たすものは、なかなか無かった。
Digdagなら、全部できます!
GUIだけ遅れているので、開発者募集中です。

thumb_P_20170607_203317_vHDR_Auto_1024
Digdagを作る前に、ジョブ実行ツール、ワークフロー実行ツールを一通り調べた。
OSS、商用のエンジン
Makefileはプリミティブだけど、その機能を持つ。
ウリや強みを比較した。

thumb_P_20170607_203335_vHDR_Auto_1024
チャレンジとして、いろいろなクラウド、オンプレのソフトウェア。
それぞれにAPIを持ち、扱うためのツールも違う。

thumb_P_20170607_203410_vHDR_Auto_1024
裏側にあるデータベースもいろいろである。
以前は、MongoDBがよく使われていた。
元々使われていた、SQLServerなどの(従来型のデータベースに加え)
クラウドでデータ連携が行われるようになり、
次第に、ここに挙げた、Hadoop、カサンドラ、presto、Redshiftとデータベースがどんどん増えていき、
矢印がどんどん増えて、連携が難しくなった。
データの動きを、どのように管理していくか。

thumb_P_20170607_203457_vHDR_Auto_1024
新しい技術として、Cosmos DBが出ました、DynamoDBが出ました、と次々と登場し、難しい問題になった。

thumb_P_20170607_203513_vHDR_Auto_1024
ユースケースを見ていくと、
Ingest、Enrich、Model、Load、Utilize
Ingest:Fluentd/Embulkで処理
Enrich:botのデータ取り除く、地域データを追加、JSONファイルをフラットに展開
Model:ABテスト適用、データモデルつくる
Load:DBにロード、アプリから使えるようになる
Utilize:レコメンド、プロダクションのデータを使って、アプリが作れる

こんな感じなのが、典型的な分析アプリ。
このワンステップは、1つの処理ではとても終わらない
複数のアルゴリズムを使って、いいやつをとるとか。
複雑、エラー処理も必要で、大変。

thumb_P_20170607_203650_vHDR_Auto_1024
昔は、シェルスクリプトでがんばる
1回実行するだけなら、何とかなるが、
それをプロダクション環境で、繰り返し実行するのはツライ。
エラーハンドリングの抜けがあったり。

Write one、Nobody Read!
エラーが起きても、アラート来ない。
気付かない。
レジュームできないから、最初からやり直し。
並列実行もできない。
ログ収集は、各サーバにログインして確認。
モジュール化できない、再利用できない
2箇所で同じ処理が書かれてる。

thumb_P_20170607_203811_vHDR_Auto_1024
Digdagを作ったのは、こういった事を解決したい、
というのが大元のモチベーション。

thumb_P_20170607_203820_vHDR_Auto_1024
ユーザーのユースケースの例。
Google Analyticsは、BigQueryにしかエクスポートできない、とか制限がある。
テーブルをTreasureDataに持ってくる。
数年分のデータ。
解析クエリを、10個くらい並列で実行して、
集約して、BIツールのTableauで見る。
レポートをSlackに通知。

クラウド3つ、加えてオンプレ。
Digdagを使えば、複数クラウドが現実的に利用できるようになった。

thumb_P_20170607_203936_vHDR_Auto_1024
Digdagのコンセプト。Operator、Parameter、Task groups。

Operatorは、pgやs3、pyなど。
やり方、知識をパッケージしたもの。
pgの場合、複雑なトランザクション。
2回実行されるのを防ぐ仕組みが、組み込まれている。
内部で、クエリ投げて、更新してる。
検出する仕組みが入ってる。
JDBCタイムアウトや、リトライ。

S3も、そのまま実行すると、結構失敗が起きる。
リトライなしだとツライ。
知識をパッケージ化している。

パラメータ(引数)
ビルトイン引数、session_time
JavaScriptが使える。
http.last_contentを、パース
ワークフローのなかで、モジュール化ができ、
再利用可能になる。

Task
グループ化して、管理できる。

thumb_P_20170607_204159_vHDR_Auto_1024
Operator library
tdオペレータ
databaseパラメータがセットされる。
tdで、クエリが投げられる。
プラグインになってるので、簡単に追加できる。

thumb_P_20170607_204245_vHDR_Auto_1024
+記号がタスク
並べて、別のタスクを子ノードに。
ネストできるので、グループ化が可能
load
prosess
全体像を把握しやすい。

(画面の)+load_dataタスクは、_parallel: trueが指定されているので、真ん中の処理は並列で実行される。

thumb_P_20170607_204319_vHDR_Auto_1024
タスクグルーピング
よくある、パイプライン。

依存関係の矢印がたくさん出てくる。
矢印を1個1個宣言することになる。
DAGベースの処理。

途中に処理を挟みたい、となると変更が必要な箇所が多い。

thumb_P_20170607_204410_vHDR_Auto_1024
グループとして、扱って、そうすると、
矢印の数が減る。
ワークフローを読める。
スクリプトは読めないが、、

thumb_P_20170607_204431_vHDR_Auto_1024
社内でワークフローを書く人は、エンジニアであることが多い。
パラメータ使って、処理を記述すれば、
「s3_wait」オペレータは、ファイルができるまで待つ。

${session_date}には日付が入る。
毎日S3でファイルを待ち受けて、
ファイルがアップされたら、
クエリを実行する、という処理が簡単に記述できる。
とてもパワフル。

RubyやPythonで書けば、何でもできるが。
Digdagのワークフローは、簡単に読める。
ワークフローを変えて欲しい、という依頼に
開発者じゃなくても対応できる。

thumb_P_20170607_204543_vHDR_Auto_1024
パラメータ
td_for_eachオペレータ
テーブルの1行1行に対して、ぐるぐるループで実行できる。
アクティブのユーザを解析して、Eメールを送る、というような処理を簡単に自動化できる。

thumb_P_20170607_204626_vHDR_Auto_1024
好みのテキストエディタで書いて、
Githubで管理できる。

Dockerを使い、
手元でも動けば、プロダクション環境のサーバでも動く、を実現できる。

特定のマシンじゃないと動かない、だと困る
どの環境でも動く。

thumb_P_20170607_204718_vHDR_Auto_1024
Operatorプラグインをforkして、
別のプラグインを簡単に作れる。

AWSのコンテナのレジストリである、ECRに登録する。
(ECR: Amazon EC2 Container Registry)

thumb_P_20170607_204747_vHDR_Auto_1024
ここで、デモ。

thumb_P_20170607_204754_vHDR_Auto_1024
このWeb画面は、TreasureDataで提供しているバージョンのDigdagのサーバ。
左上のロゴだけ違う。

右上の「RETRY ALL」のボタンをクリック。

(実行中。画面上で「Timeline」のタスクが増えた。)

タスクを動的に増やせる。おもしろい機能。
「syndicate_loop」のタスクは、ループ処理。
「DAG」(有向非巡回グラフ、Directed Acyclic Graph)はループが書けない。(表現できない)
Digdagでは、DAGにタスクを動的に生成して、ループ処理を実現している。

thumb_P_20170607_204824_vHDR_Auto_1024
thumb_P_20170607_204831_vHDR_Auto_1024
thumb_P_20170607_204836_vHDR_Auto_1024
thumb_P_20170607_204902_vHDR_Auto_1024
thumb_P_20170607_204924_vHDR_Auto_1024
ワークフロー定義のEditが、この画面でできる。

Digdagのワークフローの中では、JavaScriptが記述できる。
YMLなので、プログラムで生成もできる。
ワークフローを生成して、pushする。

thumb_P_20170607_204927_vHDR_Auto_1024
thumb_P_20170607_205005_vHDR_Auto_1024
thumb_P_20170607_205033_vHDR_Auto_1024
thumb_P_20170607_205036_vHDR_Auto_1024
850 active workflow
3,600 workflow 毎日実行される
task 28,000 (1つのワークフローは、平均7個か8個ぐらいのタスクで構成されている)
トータルでこれまでに、400,000 のワークフローが実行された。

thumb_P_20170607_205105_vHDR_Auto_1024
具体的なユースケースを紹介。
顧客分析、アラート。
準備段階として、TreasureDataでクエリを実行。
その後、users.sqlのクエリを実行して、
結果に対して、1行ずつ処理を行う。
しきい値を越えてる人にメールを送る、という処理を行っている。

thumb_P_20170607_205154_vHDR_Auto_1024
mail.txtがメールのテンプレート。パラメータを含めることができる。
メールに、クエリの結果を含めることができる。
リソース制限を越えた人にメールを送る。

thumb_P_20170607_205223_vHDR_Auto_1024
td_loadオペレータ
Embulkを実行、
パラレルで実行される
Salesforceからリードを一時テーブルに読み込み、
アトミックにリネーム。
レポートを送る。

task_completeを実行して終了。
この処理を、15分に1回実行する。

thumb_P_20170607_205308_vHDR_Auto_1024
Backend of BI app:Webのアプリ
2つのモード、all_mode / incremental_mode
ifオペレータで分岐処理
all_modeがtrueの場合は、こちらを処理し、
そうでない場合、incremental_modeの場合は、
(全部は書かれてないが)そちらの処理をする。

thumb_P_20170607_205400_vHDR_Auto_1024
Spark appの結果を、
プロダクションにデプロイする例。
ここで、EMRオペレータを使っている。

thumb_P_20170607_205422_vHDR_Auto_1024
thumb_P_20170607_205435_vHDR_Auto_1024
HA構成について
TreasureDataでは、マルチテナントで運用しているので、
HAの要件を満たさないと困る。
taskの状態は、PostgreSQLに保存。
Digdag server側はステートレス。

thumb_P_20170607_205436_vHDR_Auto_1024
Digdag serverを増やせば、
max-task-threads 100 と指定しているので、
1台あたり100並列
タスクは自動的に分割して実行される。

thumb_P_20170607_205502_vHDR_Auto_1024
ログ
バッファに保存されて、指定のディレクトリのファイルにフラッシュされる。
digdag log <attempt_id> -f
で、tail -f のように、追跡表示できる。

thumb_P_20170607_205529_vHDR_Auto_1024
ログは通常は、ローカルに保存される。
S3が利用できる環境では、
log-server.type=s3
で、S3に保存可能。

プラグイン構成になっているので、
新しいプラグインを作成して、保存先の処理を変更すれば、FTPサーバやSFTPサーバにも対応できる。
ぜひトライしてみてください!

thumb_P_20170607_205557_vHDR_Auto_1024
以下、質疑応答:

Q1: Digdag使ってます。stdoutで出力される内容を、ログを一元管理する目的で、fluentdに出したいのだけど。
A1: Digdagには、
ロギングのプラグイン
ストレージのプラグイン
の2種類がある。
ロギングプラグインを変更して、共有ストレージに書く、というのが実現できるはず。

Q2: スケジューラモードとサーバーモードの違いがよくわからないのだけど?
A2: サーバーモード:リモートからのpushを待ってる。ワークフロー定義をDBで共有している。
スケジューラモード:手元での変更が、すぐ反映されるので便利。(ワークフロー定義を変更すると、スケジューラは自動的にdigファイルをリロードする)
実装はほぼ一緒。

Q3: REST APIを使いたいけど、仕様のドキュメントがない。仕様変更が心配。
A3: REST APIの使い方は、コードを読んでください。
サービスとして提供しているので、仕様はもう変えられなくなってきちゃった。

Q4: 同じワークフローを、パラメータを変えて複数実行しようとしたら、1回しか実行されなかった。なぜ?
A4: session: ある時刻、計画したもの。
attempt: 実際に実行されたもの。

Digdagでは、二重実行を避けるため、意図的に時刻と結びつけている。
途中からレジュームにも対応している。
同一のワークフローで、同一のセッションタイムでは、まとめられてしまう。(1つだけしか実行されない)

そのようなユースケースでは、
callオペレータで対応できる。
共通処理の部分をcallで呼び出す、
別々のワークフローを作ってください。

Q5: postgresの接続数が増えてしまうissueがあると聞いたが?
A5: 解決済み、トランザクションマネージャでプールされるようになった。
例えば、10個に限定することができる。

さいごに

時間の関係で、他の方のレポートが書けませんでしたが、とても興味深い内容でした。資料が公開されましたら、こちらにリンクする予定です。
勉強会終了後は、DMM.comラボさんの提供で簡単な懇親会が開かれ、参加者どうしで情報交換をすることができました。

とても有意義な勉強会を企画していただき、ありがとうございました!>DMM.comラボさん、発表者のみなさま。

個人的には、まだ使ったことがなかった、DigdagのWeb-UIのデモを見ることができて、参加した甲斐がありました。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.