DigdagのREST APIを使って特定のワークフローの最新の実行結果を取得する
データアナリティクス事業本部の鈴木です。
DigdagのREST APIを使って、特定のプロジェクトとワークフローを指定してattemptの詳細が取得できるのを知ったので試してみました。
やりたいこと
Digdagで多数のワークフローを動かしているときに、各ワークフローの最新の実行が成功したのかどうかをコマンドで確認したいです。
Digdagを使っていると、各種処理をワークフロー化してスケジュール実行している場合が多いと思いますが、ワークフローに改修を入れた場合に、すぐ不具合が分かるケースがあれば、しばらくスケジュールを流しておいて初めて分かるケースもあると思います。
もちろん通知設定をしていれば把握できることもありますが、例えばチェック用のワークフローから一括して結果を確認するような実装ができれば、チェック時にポチッとワークフローを実行してレポートがわりに結果を残しておけるので便利だなと思い、まず第一歩としてコマンドから確認する方法を調査・実験しました。
結論としては、DigdagのREST APIの仕様から、/api/attempts
にプロジェクト名・ワークフロー名をクエリ文字列で指定してGETすることで、指定したパラメータでフィルタをかけてattemptの詳細を返してくれることが分かりました。
準備
前提
digdagはバージョン0.10.4で検証しました。
REST APIの仕様は以下のドキュメントを参考にしました。
プロジェクトの用意
適当なディレクトリを作成し、以下のように.digファイルを作成しておきます。
sample_project ├── sample_workflow1.dig ├── sample_workflow2.dig └── sample_workflow3.dig
sample_workflow1.dig
はなんでも良いですが、例えば以下のようなものを用意しておきます。
+echo_hello: echo>: Hello world from workflow1!
sample_workflow2.dig
とsample_workflow3.dig
も同様のものを用意しておきました。
次にDigdagサーバーを以下のコマンドで起動しておきます。今回はDBを保存するため、digdag_sample
という適当なディレクトリをカレントディレクトリに作成しておきました。
# -o でDBを保存するディレクトリを指定 # -n でポートを指定 digdag server -o ./digdag_sample -n 8081
これで、ブラウザからhttp://localhost:8081/
でDigdagサーバーにアクセスできます。
最後にDigdagサーバーが立ち上がった状態で、プロジェクトをプッシュします。sample_project
という名前でプロジェクトを作成します。
# .digファイルがあるディレクトリに移動する cd sample_project # -e でエンドポイントを指定 digdag push sample_project -e localhost:8081
このようにプロジェクトが作成されるので、検証の準備はOKです。
ワークフローの実行
実行履歴を作るため、各ワークフローを実行しておきます。
各ワークフローの画面で、右上のRUN
ボタンをぽちぽち押しました。
後で分かりやすいよう、ワークフローは1→2→3→2→1→3→1→2→1
の順番で実行しておきました。
このようにSessionsが記録されました。
最新のattemptの詳細を取得する
/api/attempts
でproject
とworkflow
を指定することで取得できるのでやってみました。
今回はsample_project
プロジェクトのsample_workflow1
ワークフローの実行結果を新しい順に取得できるか確認します。
サーバーが立ち上がった状態で、下記コマンドを実行しました。
curl "http://localhost:8081/api/attempts/?project=sample_project&workflow=sample_workflow1"
以下のようにレスポンスが取得できました。(結果は見にくかったのでフォーマットしてあります。)
{ "attempts": [ { "status": "success", "id": "9", "index": 1, "project": { "id": "1", "name": "sample_project" }, "workflow": { "name": "sample_workflow1", "id": "3" }, "sessionId": "9", "sessionUuid": "ed16e680-9f87-48e2-a3cb-b2406798cc53", "sessionTime": "2022-03-25T12:29:26+00:00", "retryAttemptName": null, "done": true, "success": true, "cancelRequested": false, "params": {}, "createdAt": "2022-03-25T12:29:26Z", "finishedAt": "2022-03-25T12:29:26Z" }, { "status": "success", "id": "7", "index": 1, "project": { "id": "1", "name": "sample_project" }, "workflow": { "name": "sample_workflow1", "id": "3" }, "sessionId": "7", "sessionUuid": "30ef8a46-54e8-45e4-ac54-9b68e9626b62", "sessionTime": "2022-03-25T12:29:18+00:00", "retryAttemptName": null, "done": true, "success": true, "cancelRequested": false, "params": {}, "createdAt": "2022-03-25T12:29:18Z", "finishedAt": "2022-03-25T12:29:18Z" }, { "status": "success", "id": "5", "index": 1, "project": { "id": "1", "name": "sample_project" }, "workflow": { "name": "sample_workflow1", "id": "3" }, "sessionId": "5", "sessionUuid": "f705ab3d-8efb-4641-97f5-4d491906c35b", "sessionTime": "2022-03-25T12:27:16+00:00", "retryAttemptName": null, "done": true, "success": true, "cancelRequested": false, "params": {}, "createdAt": "2022-03-25T12:27:16Z", "finishedAt": "2022-03-25T12:27:16Z" }, { "status": "success", "id": "1", "index": 1, "project": { "id": "1", "name": "sample_project" }, "workflow": { "name": "sample_workflow1", "id": "3" }, "sessionId": "1", "sessionUuid": "7ff3f5ff-67c8-43ea-8676-559a30d01179", "sessionTime": "2022-03-25T11:38:45+00:00", "retryAttemptName": null, "done": true, "success": true, "cancelRequested": false, "params": {}, "createdAt": "2022-03-25T11:38:45Z", "finishedAt": "2022-03-25T11:38:45Z" } ] }
レスポンスから配列の初めの方ほど新しいことが分かるので、例えばjqを使って一番最新の実行結果が成功したのかを取得してみます。
curl -s "http://localhost:8081/api/attempts/?project=sample_project&workflow=sample_workflow1" | jq '.attempts[0].status'
以下のように最新のattemptの実行結果だけ取得できました。
"success"
最後に
今回はDigdagのREST APIを使って、特定のワークフローの最新のattemptの詳細を取得してみました。ワークフローの実行結果のチェックにお悩みの方の参考になれば幸いです。