DigdagのREST APIを使って指定したプロジェクトのワークフロー名を取得する

2022.06.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

DigdagのREST APIを使って、特定のプロジェクトにあるワークフロー名の一覧を取得したかったので、やり方を調べて試してみました。

ワークフロー名を取得できれば、例えば以前書いた方法で、特定のワークフローの最新の実行結果を取得したりできます。

準備

今回のプロジェクトは、上記に記載した記事で作成した検証用プロジェクトを流用しました。一部追加したプロジェクトがあるため、全体の流れを改めて記載します。

前提

Digdagはバージョン0.10.4で検証しました。

REST APIの仕様は以下のドキュメントを参考にしました。

https://docs.digdag.io/api/

プロジェクトの用意

sample_projectディレクトリを作成し、以下のように.digファイルを作成しておきます。

sample_project
├── sample_workflow1.dig
├── sample_workflow2.dig
└── sample_workflow3.dig

sample_workflow1.digはなんでも良いですが、例えば以下のようなものを用意しておきます。

sample_workflow1.dig

+echo_hello:
   echo>: Hello world from workflow1!

sample_workflow2.digsample_workflow3.digも同様のものを用意しておきました。

今回は結果が分かりやすいよう、同様の内容でsample_project2プロジェクトも用意しておきました。

sample_project2
└── sample_workflow4.dig

次にDigdagサーバーを以下のコマンドで起動しておきます。今回はDBを保存するため、digdag_sampleという適当なディレクトリをカレントディレクトリに作成しておきました。

# -o でDBを保存するディレクトリを指定
# -n でポートを指定
digdag server -o ./digdag_sample -n 8081

これで、ブラウザからhttp://localhost:8081/でDigdagサーバーにアクセスできます。

最後にDigdagサーバーが立ち上がった状態で、プロジェクトをプッシュします。sample_projectおよびsample_project2という名前でプロジェクトを作成します。

# .digファイルがあるディレクトリに移動する
cd sample_project

# -e でエンドポイントを指定
digdag push sample_project -e localhost:8081

# sample_project2も同様
cd sample_project2
digdag push sample_project2 -e localhost:8081

プロジェクトは以下のようになります。

完成したプロジェクト

やってみる

APIリファレンスを確認すると、以下の2つのステップで進める必要がありそうでしたので、順番にやってみました。

  1. プロジェクトのIDを取得する
  2. プロジェクトのIDを指定してワークフローを取得する

1.プロジェクトのIDを取得する

/api/projectsAPIのクエリ文字列で情報が欲しいプロジェクトの名前を指定すると、指定したプロジェクトのものだけ取得できました。

import requests
params = {"name": "sample_project"}
r = requests.get("http://localhost:8081/api/projects", params=params)
r.text

とすると、結果は以下のようになりました。

'{"projects":[{"id":"1","name":"sample_project","revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","createdAt":"2022-03-25T11:30:30Z","updatedAt":"2022-03-25T11:30:30Z","deletedAt":null,"archiveType":"db","archiveMd5":"984wUQ1zY71GROiueCuaUA=="}]}'

ちなみに、クエリ文字列を指定しない場合は、すべてのプロジェクトが返ってきました。

import requests
r = requests.get("http://localhost:8081/api/projects")
r.text

とすると、結果は以下のようになりました。

'{"projects":[{"id":"1","name":"sample_project","revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","createdAt":"2022-03-25T11:30:30Z","updatedAt":"2022-03-25T11:30:30Z","deletedAt":null,"archiveType":"db","archiveMd5":"984wUQ1zY71GROiueCuaUA=="},{"id":"2","name":"sample_project2","revision":"ae8b6664-7d74-4aae-ad62-6ebd6c195d16","createdAt":"2022-06-03T03:02:42Z","updatedAt":"2022-06-03T03:03:33Z","deletedAt":null,"archiveType":"db","archiveMd5":"6j1ViFwdWthWY8GaQZdYOQ=="}]}'

2.プロジェクトのIDを指定してワークフローを取得する

/api/projects/{id}/workflowsAPIで取得できました。パスパラメータの{id}には、先ほど取得したプロジェクトのIDを入れました。下の例はf-stringを使いました。

import requests
id = "1"
r = requests.get(f"http://localhost:8081/api/projects/{id}/workflows")
r.text

とすると、結果は以下のようになりました。

'{"workflows":[{"id":"1","name":"sample_workflow3","project":{"id":"1","name":"sample_project"},"revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","timezone":"UTC","config":{"+echo_hello":{"echo>":"Hello world from workflow3!"}}},{"id":"2","name":"sample_workflow2","project":{"id":"1","name":"sample_project"},"revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","timezone":"UTC","config":{"+echo_hello":{"echo>":"Hello world from workflow2!"}}},{"id":"3","name":"sample_workflow1","project":{"id":"1","name":"sample_project"},"revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","timezone":"UTC","config":{"+echo_hello":{"echo>":"Hello world from workflow1!"}}}]}'

ワークフロー名だけ欲しい時は、レスポンスを加工すれば取得できそうですね。

import json
import requests

id = "1"
r = requests.get(f"http://localhost:8081/api/projects/{id}/workflows")
r_json = json.loads(r.text)
[ workflow["name"] for workflow in r_json.get("workflows")]
# ['sample_workflow3', 'sample_workflow2', 'sample_workflow1']

最後に

今回はDigdagのREST APIを使って、プロジェクト名が分かっているときに、そのプロジェクトに属しているワークフロー名を取得してみました。ワークフロー名を取得できれば、そのワークフローの最新の実行結果を取得したりできたりするので、実行結果の把握がプログラム化できてよいですね。

参考になりましたら幸いです。

参考