Google CloudのWorkflowsを使ってパブリックなAPIからデータを収集するワークフローを組んでみた〜CDKTFを添えて〜

2024.01.24

CX事業本部@大阪の岩田です。

Google CloudのWorkflowsから外部のAPIを連続で呼び出したいユースケースがあったので、練習のためにJikan APIを利用してワークフローを組んでみました。

やること

ワークフローで実施することはざっくり以下の通りです。

  • 制作会社の一覧を取得する
  • 取得した制作会社の一覧をループしながら以下の処理を実行
    • 該当の制作会社が制作したアニメを1件取得し、タイトルを変数に格納
  • 変数に格納した全てのアニメのタイトルをレスポンスとして返却する

ワークフローの詳細

今回構築したワークフローの全体像は以下の通りです。なお、本来実装すべきチェク処理やエラーハンドリング等は割愛しているので、もしこのコードを流用される場合はご注意ください。

- assign_vars:
    assign:
      - titles: []
      - api_endpoint_base: 'https://api.jikan.moe/v4/'
- main:
    steps:
    - fetch_producers:
        call: http.get
        args:
            url: '${api_endpoint_base + "producers"}'
            query:
              limit: 10
        result: fetch_producers_result
    - fetch_animes:
        for:
          value: producer
          in: ${fetch_producers_result.body.data}
          steps:
            - sleep:
                call: sys.sleep
                args:
                  seconds: 1
            - fetch_anime_by_producer:
                call: http.get
                args:
                  url: '${api_endpoint_base + "anime"}'
                  query:
                    producers: ${producer.mal_id}
                    limit: 1
                result: fetch_anime_by_producer_response
            # TODO 本来はレスポンスの要素数が1以上かのチェックが必要
            - assign_title:
                assign:
                  - titles: ${list.concat(titles, fetch_anime_by_producer_response.body.data[0].title)}
- return:
    return: ${titles}

可視化すると以下のようなフローになります。

ステップごとに詳細を見ていきましょう

変数の初期化(assign_vars)

やってることはシンプルに変数を準備するだけです。ポイントとしてtitlesという変数を定義しているのですが、後のループ処理の内側からアクセスできるように最初のステップで初期化しています。

- assign_vars:
    assign:
      - titles: []
      - api_endpoint_base: 'https://api.jikan.moe/v4/'

制作会社の一覧取得(fetch_producers)

以下の部分で制作会社の一覧を取得しています。

    - fetch_producers:
        call: http.get
        args:
            url: '${api_endpoint_base + "producers"}'
            query:
              limit: 10
        result: fetch_producers_result

この記述で`GET https://api.jikan.moe/v4/producers?liit=10`を実行し、レスポンスを`fetch_producers_result`という変数に格納&返却します。今回は検証目的なのでlimit=10 で取得件数を制限していますが、実際のユースケースではレスポンスに含まれるページング関連の情報をパースしてループするといった実装になるでしょう。

ループしながらアニメの情報を取得(fetch_animes)

以下の部分で先程取得した制作会社の情報をループで回しながらアニメの情報を取得していきます。

    - fetch_animes:
        for:
          value: producer
          in: ${fetch_producers_result.body.data}
          steps:
            - sleep:
                call: sys.sleep
                args:
                  seconds: 1
            - fetch_anime_by_producer:
                call: http.get
                args:
                  url: '${api_endpoint_base + "anime"}'
                  query:
                    producers: ${producer.mal_id}
                    limit: 1
                result: fetch_anime_by_producer_response
            # TODO 本来はレスポンスの要素数が1以上かのチェックが必要
            - assign_title:
                assign:
                  - titles: ${list.concat(titles, fetch_anime_by_producer_response.body.data[0].title)}

for...in: ${fetch_producers_result.body.data}の指定で制作会社の一覧をイテレートしながらvalue: producerの指定した通り、制作会社の情報をproducerという変数に格納します。各ループの中ではAPIのレートリミット対策として1秒のsleepを挟んでからGET https://api.jikan.moe/v4/anime?producers=<制作会社のID>&limit=1相当のHTTPリクエストを発行しています。アニメの情報を取得するエンドポイントでは?producers=1,2,3のように複数の制作会社を指定できるのですが、今回はWorkflowsの練習目的なので、愚直にループしながら制作会社を1つ1つ指定する実装としています。

各イテレーションの最後には${list.concat(titles, fetch_anime_by_producer_response.body.data[0].title)} で変数titlesの末尾に当該イテレーションで取得したアニメのタイトルを追加しています。

CDKTFでデプロイしてみる

上記のワークフローをCDKTFでデプロイしてみます。今回利用したライブラリ等のバージョンは以下の通りです。

  • cdktf: 0.20.1
  • cdktf-cli: 0.20.1
  • @cdktf/provider-google: 13.1.0
  • terraform-provider-google: 5.12

main.tsの中身は以下の通りです。同じ階層に先程のyamlファイルがanime-workflow.yamlというファイル名で保存されている前提で組んでいます。

import { Construct } from "constructs";
import { App, TerraformStack, TerraformVariable } from "cdktf";
import { serviceAccount } from "@cdktf/provider-google";
import * as google from "@cdktf/provider-google";
import { readFileSync } from "fs";

class AnimeWorkFlowStack extends TerraformStack {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    const projectId = new TerraformVariable(this, "projectId", {
      type: "string",
      description: "Google Cloud ProjectId",
    })

    new google.provider.GoogleProvider(this, 'GoogleProvider', {
      project: projectId.value
    });

    const workflowsSa = new serviceAccount.ServiceAccount(this, 'anime-workflows-sa', {
      accountId: 'anime-workflows-sa',
      displayName: 'anime workflows service account',
    });

    new google.projectIamMember.ProjectIamMember(this, 'bastion-instance-sa-to-cloudsql-client', {
      project: projectId.value,
      role: 'roles/logging.logWriter',
      member: workflowsSa.member,
    });

    const content = readFileSync('./anime-workflow.yaml').toString().replace(/\$/g, '$$$$')

    new google.workflowsWorkflow.WorkflowsWorkflow(this, 'anime-workflows', {
      description: 'anime workflows',
      serviceAccount: workflowsSa.email,
      name: 'anime-workflow',
      region: 'asia-northeast1',
      sourceContents: content
    })
  }
}

const app = new App();
new AnimeWorkFlowStack(app, "AnimeWorkFlow");
app.synth();

Google CloudのプロジェクトIDはTerraformのvariableで受け取るようにしているので、以下のコマンドでデプロイを行います。

TF_VAR_projectId=<Google CloudのプロジェクトID> npm run cdktf deploy

ワークフローを動かしてみる

デプロイできたら実際にマネコンからワークフローを実行してみます。

以下の通り実行に成功しました!

レスポンスは以下の通りでした

[
  "Naruto",
  "Full Metal Panic? Fumoffu",
  "Sunabouzu",
  "Cowboy Bebop: Tengoku no Tobira",
  ".hack//Sign",
  "Shinseiki Evangelion",
  "Hachimitsu to Clover",
  "Mushishi",
  "Tennis no Oujisama",
  "Trigun"
]

知ってるタイトルもあれば、知らないタイトルもあり...

詳細はよく分からないですが想定通り動いてそうです!

まとめ

Google CloudのWorkflowsは初めて触ったのですが、外部APIと連携する処理を簡単にワークフロー化できて面白かったです。今回試した程度であればコストは微々たるものですが、実際に外部APIを大量に呼び出すようなワークフローを実装する場合はWorkflowsの課金体系に注意が必要です。内部ステップや外部 HTTP 呼び出しに対して従量課金が発生するため、ループしながら1件ずつWorkflowsからHTTP呼び出しを実行するのではなく、ある程度のバッチ単位でCloud Functionsを呼び出してCloud Functionsから外部APIを呼び出すようなアーキテクチャも検討すると良さそうです。

参考