【レポート】Workflow Engines Night #workflowenginesnight
DI部の川崎です。Digdag使ってます。
今日はDMM.comラボさんが開催された、ワークフローエンジンをテーマにした勉強会「Workflow Engines Night」に参加する機会がありましたので、Digdagの開発者古橋さんのセッションを中心に、参加レポートをお届けします。
勉強会のハッシュタグはこちら #workflowenginesnight
はじめに
会場は、六本木一丁目のDMM.comラボさんのオフィスでした。勉強会の概要を引用します。
概要
今回はワークフローエンジンをテーマに勉強会を開催致します。
DMM.comラボからはビッグデータ部で『Digdag』を実際に導入・運用した話、
またインフラ本部からは『StackStorm』による運用コスト・リスク低減の取り組みについてお話しをさせていただきます。
なお、今回は『DigDag』の開発者である 古橋 貞之氏が帰国のタイミング!
とのことでゲストとして参加いただけることとなりました。
開催日がとても近いのですが、直接話を聞ける機会ですのでぜひ皆様ご参加ください!
発表タイトルは、以下の通りでした。
『StackStorm ワークフローエンジンによる運用コスト・リスク低減の取り組み』
株式会社DMM.comラボ 大山 裕泰 さん
→ 発表資料(https://speakerdeck.com/dmmlabo/stackstorm-wakuhuroenzinniyoruyun-yong-kosutorisukudi-jian-falsequ-rizu-mi)
『JenkinsからDigdagへ日次バッチを移行して幸せになるお話』
株式会社DMM.comラボ 鈴木 翔太 さん
→ 発表資料(https://speakerdeck.com/dmmlabo/jenkinskaradigdagheri-ci-batutiwoyi-xing-sitexing-seninaruohua)
以下で紹介する、古橋さんのセッション。
『Treasure DataにおけるDigdagによる大規模データ処理の自動化とエラー処理』
トレジャーデータ株式会社 古橋 貞之 さん
(※発表資料は以下でインライン表示)
LT『Digdagを仕事で使ってみて良かったこと、ハマったこと』
株式会社VASILY 塩崎 健弘 さん
→ 発表資料(https://speakerdeck.com/shiozaki/using-digdag-in-production-environment)
LT『とあるマーケティング部隊でのDigdagの活用事例』
@grimrose さん
→ 発表資料(https://gist.github.com/grimrose/5bea98db4c82056dad1ab84d6653308e)
Treasure DataにおけるDigdagによる大規模データ処理の自動化とエラー処理(トレジャーデータ株式会社 古橋 貞之さん)
[発表スライド]
早速、Digdag開発者の古橋さんのセッションについてレポートしていきます。
まずは、自己紹介。
シリコンバレーに住んでます。なぜかというと、トレジャーデータという会社をそこにつくったから、とのことでした。
(会場の参加者に挙手をお願い)
fluentdを知ってる人 → 9割の人が手を挙げる
MessagePack知ってる人 → 7割の人が手を挙げる
これから話す内容は、1歩立ちもどって、
そもそもなんで、Digdagを作ったのか?という話をします。
手作業を自動化するため。
定期的な実行はもちろんの事、
過去分を一括で処理したい、となると、
時刻を変数として処理する、という事が重要になる。
エラー処理、
失敗したところから、レジューム実行
実行時間可視化
タスク並列
ワークフローをテキストで記述、
GUIによるワークフロー開発
Pythonなどのコードを書かなくても使える
異なった環境で実行しても、再現性があるもの
このような機能を全部満たすものは、なかなか無かった。
Digdagなら、全部できます!
GUIだけ遅れているので、開発者募集中です。
Digdagを作る前に、ジョブ実行ツール、ワークフロー実行ツールを一通り調べた。
OSS、商用のエンジン
Makefileはプリミティブだけど、その機能を持つ。
ウリや強みを比較した。
チャレンジとして、いろいろなクラウド、オンプレのソフトウェア。
それぞれにAPIを持ち、扱うためのツールも違う。
裏側にあるデータベースもいろいろである。
以前は、MongoDBがよく使われていた。
元々使われていた、SQLServerなどの(従来型のデータベースに加え)
クラウドでデータ連携が行われるようになり、
次第に、ここに挙げた、Hadoop、カサンドラ、presto、Redshiftとデータベースがどんどん増えていき、
矢印がどんどん増えて、連携が難しくなった。
データの動きを、どのように管理していくか。
新しい技術として、Cosmos DBが出ました、DynamoDBが出ました、と次々と登場し、難しい問題になった。
ユースケースを見ていくと、
Ingest、Enrich、Model、Load、Utilize
Ingest:Fluentd/Embulkで処理
Enrich:botのデータ取り除く、地域データを追加、JSONファイルをフラットに展開
Model:ABテスト適用、データモデルつくる
Load:DBにロード、アプリから使えるようになる
Utilize:レコメンド、プロダクションのデータを使って、アプリが作れる
こんな感じなのが、典型的な分析アプリ。
このワンステップは、1つの処理ではとても終わらない
複数のアルゴリズムを使って、いいやつをとるとか。
複雑、エラー処理も必要で、大変。
昔は、シェルスクリプトでがんばる
1回実行するだけなら、何とかなるが、
それをプロダクション環境で、繰り返し実行するのはツライ。
エラーハンドリングの抜けがあったり。
Write one、Nobody Read!
エラーが起きても、アラート来ない。
気付かない。
レジュームできないから、最初からやり直し。
並列実行もできない。
ログ収集は、各サーバにログインして確認。
モジュール化できない、再利用できない
2箇所で同じ処理が書かれてる。
Digdagを作ったのは、こういった事を解決したい、
というのが大元のモチベーション。
ユーザーのユースケースの例。
Google Analyticsは、BigQueryにしかエクスポートできない、とか制限がある。
テーブルをTreasureDataに持ってくる。
数年分のデータ。
解析クエリを、10個くらい並列で実行して、
集約して、BIツールのTableauで見る。
レポートをSlackに通知。
クラウド3つ、加えてオンプレ。
Digdagを使えば、複数クラウドが現実的に利用できるようになった。
Digdagのコンセプト。Operator、Parameter、Task groups。
Operatorは、pgやs3、pyなど。
やり方、知識をパッケージしたもの。
pgの場合、複雑なトランザクション。
2回実行されるのを防ぐ仕組みが、組み込まれている。
内部で、クエリ投げて、更新してる。
検出する仕組みが入ってる。
JDBCタイムアウトや、リトライ。
S3も、そのまま実行すると、結構失敗が起きる。
リトライなしだとツライ。
知識をパッケージ化している。
パラメータ(引数)
ビルトイン引数、session_time
JavaScriptが使える。
http.last_contentを、パース
ワークフローのなかで、モジュール化ができ、
再利用可能になる。
Task
グループ化して、管理できる。
Operator library
tdオペレータ
databaseパラメータがセットされる。
tdで、クエリが投げられる。
プラグインになってるので、簡単に追加できる。
+記号がタスク
並べて、別のタスクを子ノードに。
ネストできるので、グループ化が可能
load
prosess
全体像を把握しやすい。
(画面の)+load_dataタスクは、_parallel: trueが指定されているので、真ん中の処理は並列で実行される。
タスクグルーピング
よくある、パイプライン。
依存関係の矢印がたくさん出てくる。
矢印を1個1個宣言することになる。
DAGベースの処理。
途中に処理を挟みたい、となると変更が必要な箇所が多い。
グループとして、扱って、そうすると、
矢印の数が減る。
ワークフローを読める。
スクリプトは読めないが、、
社内でワークフローを書く人は、エンジニアであることが多い。
パラメータ使って、処理を記述すれば、
「s3_wait」オペレータは、ファイルができるまで待つ。
${session_date}には日付が入る。
毎日S3でファイルを待ち受けて、
ファイルがアップされたら、
クエリを実行する、という処理が簡単に記述できる。
とてもパワフル。
RubyやPythonで書けば、何でもできるが。
Digdagのワークフローは、簡単に読める。
ワークフローを変えて欲しい、という依頼に
開発者じゃなくても対応できる。
パラメータ
td_for_eachオペレータ
テーブルの1行1行に対して、ぐるぐるループで実行できる。
アクティブのユーザを解析して、Eメールを送る、というような処理を簡単に自動化できる。
好みのテキストエディタで書いて、
Githubで管理できる。
Dockerを使い、
手元でも動けば、プロダクション環境のサーバでも動く、を実現できる。
特定のマシンじゃないと動かない、だと困る
どの環境でも動く。
Operatorプラグインをforkして、
別のプラグインを簡単に作れる。
AWSのコンテナのレジストリである、ECRに登録する。
(ECR: Amazon EC2 Container Registry)
ここで、デモ。
このWeb画面は、TreasureDataで提供しているバージョンのDigdagのサーバ。
左上のロゴだけ違う。
右上の「RETRY ALL」のボタンをクリック。
(実行中。画面上で「Timeline」のタスクが増えた。)
タスクを動的に増やせる。おもしろい機能。
「syndicate_loop」のタスクは、ループ処理。
「DAG」(有向非巡回グラフ、Directed Acyclic Graph)はループが書けない。(表現できない)
Digdagでは、DAGにタスクを動的に生成して、ループ処理を実現している。
ワークフロー定義のEditが、この画面でできる。
Digdagのワークフローの中では、JavaScriptが記述できる。
YMLなので、プログラムで生成もできる。
ワークフローを生成して、pushする。
850 active workflow
3,600 workflow 毎日実行される
task 28,000 (1つのワークフローは、平均7個か8個ぐらいのタスクで構成されている)
トータルでこれまでに、400,000 のワークフローが実行された。
具体的なユースケースを紹介。
顧客分析、アラート。
準備段階として、TreasureDataでクエリを実行。
その後、users.sqlのクエリを実行して、
結果に対して、1行ずつ処理を行う。
しきい値を越えてる人にメールを送る、という処理を行っている。
mail.txtがメールのテンプレート。パラメータを含めることができる。
メールに、クエリの結果を含めることができる。
リソース制限を越えた人にメールを送る。
td_loadオペレータ
Embulkを実行、
パラレルで実行される
Salesforceからリードを一時テーブルに読み込み、
アトミックにリネーム。
レポートを送る。
task_completeを実行して終了。
この処理を、15分に1回実行する。
Backend of BI app:Webのアプリ
2つのモード、all_mode / incremental_mode
ifオペレータで分岐処理
all_modeがtrueの場合は、こちらを処理し、
そうでない場合、incremental_modeの場合は、
(全部は書かれてないが)そちらの処理をする。
Spark appの結果を、
プロダクションにデプロイする例。
ここで、EMRオペレータを使っている。
HA構成について
TreasureDataでは、マルチテナントで運用しているので、
HAの要件を満たさないと困る。
taskの状態は、PostgreSQLに保存。
Digdag server側はステートレス。
Digdag serverを増やせば、
max-task-threads 100 と指定しているので、
1台あたり100並列
タスクは自動的に分割して実行される。
ログ
バッファに保存されて、指定のディレクトリのファイルにフラッシュされる。
digdag log <attempt_id> -f
で、tail -f のように、追跡表示できる。
ログは通常は、ローカルに保存される。
S3が利用できる環境では、
log-server.type=s3
で、S3に保存可能。
プラグイン構成になっているので、
新しいプラグインを作成して、保存先の処理を変更すれば、FTPサーバやSFTPサーバにも対応できる。
ぜひトライしてみてください!
以下、質疑応答:
Q1: Digdag使ってます。stdoutで出力される内容を、ログを一元管理する目的で、fluentdに出したいのだけど。
A1: Digdagには、
ロギングのプラグイン
ストレージのプラグイン
の2種類がある。
ロギングプラグインを変更して、共有ストレージに書く、というのが実現できるはず。
Q2: スケジューラモードとサーバーモードの違いがよくわからないのだけど?
A2: サーバーモード:リモートからのpushを待ってる。ワークフロー定義をDBで共有している。
スケジューラモード:手元での変更が、すぐ反映されるので便利。(ワークフロー定義を変更すると、スケジューラは自動的にdigファイルをリロードする)
実装はほぼ一緒。
Q3: REST APIを使いたいけど、仕様のドキュメントがない。仕様変更が心配。
A3: REST APIの使い方は、コードを読んでください。
サービスとして提供しているので、仕様はもう変えられなくなってきちゃった。
Q4: 同じワークフローを、パラメータを変えて複数実行しようとしたら、1回しか実行されなかった。なぜ?
A4: session: ある時刻、計画したもの。
attempt: 実際に実行されたもの。
Digdagでは、二重実行を避けるため、意図的に時刻と結びつけている。
途中からレジュームにも対応している。
同一のワークフローで、同一のセッションタイムでは、まとめられてしまう。(1つだけしか実行されない)
そのようなユースケースでは、
callオペレータで対応できる。
共通処理の部分をcallで呼び出す、
別々のワークフローを作ってください。
Q5: postgresの接続数が増えてしまうissueがあると聞いたが?
A5: 解決済み、トランザクションマネージャでプールされるようになった。
例えば、10個に限定することができる。
さいごに
時間の関係で、他の方のレポートが書けませんでしたが、とても興味深い内容でした。資料が公開されましたら、こちらにリンクする予定です。
勉強会終了後は、DMM.comラボさんの提供で簡単な懇親会が開かれ、参加者どうしで情報交換をすることができました。
とても有意義な勉強会を企画していただき、ありがとうございました!>DMM.comラボさん、発表者のみなさま。
個人的には、まだ使ったことがなかった、DigdagのWeb-UIのデモを見ることができて、参加した甲斐がありました。