DigdagのREST APIを使って指定したプロジェクトのワークフロー名を取得する
データアナリティクス事業本部の鈴木です。
DigdagのREST APIを使って、特定のプロジェクトにあるワークフロー名の一覧を取得したかったので、やり方を調べて試してみました。
ワークフロー名を取得できれば、例えば以前書いた方法で、特定のワークフローの最新の実行結果を取得したりできます。
準備
今回のプロジェクトは、上記に記載した記事で作成した検証用プロジェクトを流用しました。一部追加したプロジェクトがあるため、全体の流れを改めて記載します。
前提
Digdagはバージョン0.10.4で検証しました。
REST APIの仕様は以下のドキュメントを参考にしました。
プロジェクトの用意
sample_project
ディレクトリを作成し、以下のように.digファイルを作成しておきます。
sample_project ├── sample_workflow1.dig ├── sample_workflow2.dig └── sample_workflow3.dig
sample_workflow1.dig
はなんでも良いですが、例えば以下のようなものを用意しておきます。
+echo_hello: echo>: Hello world from workflow1!
sample_workflow2.dig
とsample_workflow3.dig
も同様のものを用意しておきました。
今回は結果が分かりやすいよう、同様の内容でsample_project2
プロジェクトも用意しておきました。
sample_project2 └── sample_workflow4.dig
次にDigdagサーバーを以下のコマンドで起動しておきます。今回はDBを保存するため、digdag_sample
という適当なディレクトリをカレントディレクトリに作成しておきました。
# -o でDBを保存するディレクトリを指定 # -n でポートを指定 digdag server -o ./digdag_sample -n 8081
これで、ブラウザからhttp://localhost:8081/
でDigdagサーバーにアクセスできます。
最後にDigdagサーバーが立ち上がった状態で、プロジェクトをプッシュします。sample_project
およびsample_project2
という名前でプロジェクトを作成します。
# .digファイルがあるディレクトリに移動する cd sample_project # -e でエンドポイントを指定 digdag push sample_project -e localhost:8081 # sample_project2も同様 cd sample_project2 digdag push sample_project2 -e localhost:8081
プロジェクトは以下のようになります。
やってみる
APIリファレンスを確認すると、以下の2つのステップで進める必要がありそうでしたので、順番にやってみました。
- プロジェクトのIDを取得する
- プロジェクトのIDを指定してワークフローを取得する
1.プロジェクトのIDを取得する
/api/projects
APIのクエリ文字列で情報が欲しいプロジェクトの名前を指定すると、指定したプロジェクトのものだけ取得できました。
import requests params = {"name": "sample_project"} r = requests.get("http://localhost:8081/api/projects", params=params) r.text
とすると、結果は以下のようになりました。
'{"projects":[{"id":"1","name":"sample_project","revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","createdAt":"2022-03-25T11:30:30Z","updatedAt":"2022-03-25T11:30:30Z","deletedAt":null,"archiveType":"db","archiveMd5":"984wUQ1zY71GROiueCuaUA=="}]}'
ちなみに、クエリ文字列を指定しない場合は、すべてのプロジェクトが返ってきました。
import requests r = requests.get("http://localhost:8081/api/projects") r.text
とすると、結果は以下のようになりました。
'{"projects":[{"id":"1","name":"sample_project","revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","createdAt":"2022-03-25T11:30:30Z","updatedAt":"2022-03-25T11:30:30Z","deletedAt":null,"archiveType":"db","archiveMd5":"984wUQ1zY71GROiueCuaUA=="},{"id":"2","name":"sample_project2","revision":"ae8b6664-7d74-4aae-ad62-6ebd6c195d16","createdAt":"2022-06-03T03:02:42Z","updatedAt":"2022-06-03T03:03:33Z","deletedAt":null,"archiveType":"db","archiveMd5":"6j1ViFwdWthWY8GaQZdYOQ=="}]}'
2.プロジェクトのIDを指定してワークフローを取得する
/api/projects/{id}/workflows
APIで取得できました。パスパラメータの{id}
には、先ほど取得したプロジェクトのIDを入れました。下の例はf-stringを使いました。
import requests id = "1" r = requests.get(f"http://localhost:8081/api/projects/{id}/workflows") r.text
とすると、結果は以下のようになりました。
'{"workflows":[{"id":"1","name":"sample_workflow3","project":{"id":"1","name":"sample_project"},"revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","timezone":"UTC","config":{"+echo_hello":{"echo>":"Hello world from workflow3!"}}},{"id":"2","name":"sample_workflow2","project":{"id":"1","name":"sample_project"},"revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","timezone":"UTC","config":{"+echo_hello":{"echo>":"Hello world from workflow2!"}}},{"id":"3","name":"sample_workflow1","project":{"id":"1","name":"sample_project"},"revision":"70f162c7-b248-49cb-85b3-2ab3fa948285","timezone":"UTC","config":{"+echo_hello":{"echo>":"Hello world from workflow1!"}}}]}'
ワークフロー名だけ欲しい時は、レスポンスを加工すれば取得できそうですね。
import json import requests id = "1" r = requests.get(f"http://localhost:8081/api/projects/{id}/workflows") r_json = json.loads(r.text) [ workflow["name"] for workflow in r_json.get("workflows")] # ['sample_workflow3', 'sample_workflow2', 'sample_workflow1']
最後に
今回はDigdagのREST APIを使って、プロジェクト名が分かっているときに、そのプロジェクトに属しているワークフロー名を取得してみました。ワークフロー名を取得できれば、そのワークフローの最新の実行結果を取得したりできたりするので、実行結果の把握がプログラム化できてよいですね。
参考になりましたら幸いです。