DigdagのREST APIを使って特定のワークフローの最新の実行結果を取得する

2022.03.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

DigdagのREST APIを使って、特定のプロジェクトとワークフローを指定してattemptの詳細が取得できるのを知ったので試してみました。

やりたいこと

Digdagで多数のワークフローを動かしているときに、各ワークフローの最新の実行が成功したのかどうかをコマンドで確認したいです。

Digdagを使っていると、各種処理をワークフロー化してスケジュール実行している場合が多いと思いますが、ワークフローに改修を入れた場合に、すぐ不具合が分かるケースがあれば、しばらくスケジュールを流しておいて初めて分かるケースもあると思います。

もちろん通知設定をしていれば把握できることもありますが、例えばチェック用のワークフローから一括して結果を確認するような実装ができれば、チェック時にポチッとワークフローを実行してレポートがわりに結果を残しておけるので便利だなと思い、まず第一歩としてコマンドから確認する方法を調査・実験しました。

結論としては、DigdagのREST APIの仕様から、/api/attemptsにプロジェクト名・ワークフロー名をクエリ文字列で指定してGETすることで、指定したパラメータでフィルタをかけてattemptの詳細を返してくれることが分かりました。

準備

前提

digdagはバージョン0.10.4で検証しました。

REST APIの仕様は以下のドキュメントを参考にしました。

https://docs.digdag.io/api/

プロジェクトの用意

適当なディレクトリを作成し、以下のように.digファイルを作成しておきます。

sample_project
├── sample_workflow1.dig
├── sample_workflow2.dig
└── sample_workflow3.dig

sample_workflow1.digはなんでも良いですが、例えば以下のようなものを用意しておきます。

sample_workflow1.dig

+echo_hello:
   echo>: Hello world from workflow1!

sample_workflow2.digsample_workflow3.digも同様のものを用意しておきました。

次にDigdagサーバーを以下のコマンドで起動しておきます。今回はDBを保存するため、digdag_sampleという適当なディレクトリをカレントディレクトリに作成しておきました。

# -o でDBを保存するディレクトリを指定
# -n でポートを指定
digdag server -o ./digdag_sample -n 8081

これで、ブラウザからhttp://localhost:8081/でDigdagサーバーにアクセスできます。

最後にDigdagサーバーが立ち上がった状態で、プロジェクトをプッシュします。sample_projectという名前でプロジェクトを作成します。

# .digファイルがあるディレクトリに移動する
cd sample_project

# -e でエンドポイントを指定
digdag push sample_project -e localhost:8081

このようにプロジェクトが作成されるので、検証の準備はOKです。

プロジェクト作成例

ワークフローの実行

実行履歴を作るため、各ワークフローを実行しておきます。

各ワークフローの画面で、右上のRUNボタンをぽちぽち押しました。

ワークフローの実行

後で分かりやすいよう、ワークフローは1→2→3→2→1→3→1→2→1の順番で実行しておきました。

このようにSessionsが記録されました。

作成した実行履歴

最新のattemptの詳細を取得する

/api/attemptsprojectworkflowを指定することで取得できるのでやってみました。

今回はsample_projectプロジェクトのsample_workflow1ワークフローの実行結果を新しい順に取得できるか確認します。

サーバーが立ち上がった状態で、下記コマンドを実行しました。

curl "http://localhost:8081/api/attempts/?project=sample_project&workflow=sample_workflow1"

以下のようにレスポンスが取得できました。(結果は見にくかったのでフォーマットしてあります。)

{
    "attempts": [
        {
            "status": "success",
            "id": "9",
            "index": 1,
            "project": {
                "id": "1",
                "name": "sample_project"
            },
            "workflow": {
                "name": "sample_workflow1",
                "id": "3"
            },
            "sessionId": "9",
            "sessionUuid": "ed16e680-9f87-48e2-a3cb-b2406798cc53",
            "sessionTime": "2022-03-25T12:29:26+00:00",
            "retryAttemptName": null,
            "done": true,
            "success": true,
            "cancelRequested": false,
            "params": {},
            "createdAt": "2022-03-25T12:29:26Z",
            "finishedAt": "2022-03-25T12:29:26Z"
        },
        {
            "status": "success",
            "id": "7",
            "index": 1,
            "project": {
                "id": "1",
                "name": "sample_project"
            },
            "workflow": {
                "name": "sample_workflow1",
                "id": "3"
            },
            "sessionId": "7",
            "sessionUuid": "30ef8a46-54e8-45e4-ac54-9b68e9626b62",
            "sessionTime": "2022-03-25T12:29:18+00:00",
            "retryAttemptName": null,
            "done": true,
            "success": true,
            "cancelRequested": false,
            "params": {},
            "createdAt": "2022-03-25T12:29:18Z",
            "finishedAt": "2022-03-25T12:29:18Z"
        },
        {
            "status": "success",
            "id": "5",
            "index": 1,
            "project": {
                "id": "1",
                "name": "sample_project"
            },
            "workflow": {
                "name": "sample_workflow1",
                "id": "3"
            },
            "sessionId": "5",
            "sessionUuid": "f705ab3d-8efb-4641-97f5-4d491906c35b",
            "sessionTime": "2022-03-25T12:27:16+00:00",
            "retryAttemptName": null,
            "done": true,
            "success": true,
            "cancelRequested": false,
            "params": {},
            "createdAt": "2022-03-25T12:27:16Z",
            "finishedAt": "2022-03-25T12:27:16Z"
        },
        {
            "status": "success",
            "id": "1",
            "index": 1,
            "project": {
                "id": "1",
                "name": "sample_project"
            },
            "workflow": {
                "name": "sample_workflow1",
                "id": "3"
            },
            "sessionId": "1",
            "sessionUuid": "7ff3f5ff-67c8-43ea-8676-559a30d01179",
            "sessionTime": "2022-03-25T11:38:45+00:00",
            "retryAttemptName": null,
            "done": true,
            "success": true,
            "cancelRequested": false,
            "params": {},
            "createdAt": "2022-03-25T11:38:45Z",
            "finishedAt": "2022-03-25T11:38:45Z"
        }
    ]
}

レスポンスから配列の初めの方ほど新しいことが分かるので、例えばjqを使って一番最新の実行結果が成功したのかを取得してみます。

curl -s "http://localhost:8081/api/attempts/?project=sample_project&workflow=sample_workflow1" | jq '.attempts[0].status'

以下のように最新のattemptの実行結果だけ取得できました。

"success"

最後に

今回はDigdagのREST APIを使って、特定のワークフローの最新のattemptの詳細を取得してみました。ワークフローの実行結果のチェックにお悩みの方の参考になれば幸いです。

参考