Cloud WorkflowsでBigQueryのクエリ結果を使ってみる

2022.11.24

はじめに

データアナリティクス事業本部のkobayashiです。

GoogleCloudのWorkflowsでBigQueryを扱うブログを何本か書きましたが今回はBigQueryへクエリを実行してその結果を扱ってみたいと思います。

WorkflowsでBigQueryのクエリ結果を扱う

Workflowsで使うことのできるBigQueryコネクタの中でgoogleapis.bigquery.v2.jobs.queryを使うことでBigQueryでクエリを実行した結果をWorkflows中で扱うことができます。

具体的には実行結果はgoogleapis.bigquery.v2.jobs.queryを実行したStepのresultとして出力でき、それを後続の処理で扱う事ができます。クエリ結果はQueryResponse型 で帰ってきますのでこの中から必要な値を取り出して使います。

QueryResponseでは代表として以下のような値があリます。

  • erorrs
    • クエリ実行中に発生したエラーメッセージ
  • jobComplete
    • クエリ実行が完了したかどうか
  • rows
    • クエリ結果
  • schema
    • クエリ結果のスキーマ
  • totalRows
    • クエリ結果セットの合計行数

他にも取得できる値があるので詳しくは公式ドキュメントをご確認ください。

今回は一般公開データセットの「Google Trends - International」からサンプルのSQLにある国別に最も検索されている検索語句を取得できる「What are the top search terms across the globe for the latest available data?」のクエリを使ってみたいと思います。

Workflowsを実行する

以下がWorkflowsのyamlになります。内容は以下のようになります。

ExecBqQueryステップでサンプルクエリを実行しその結果をquery_resultに入れています。後続の各ステップではクエリ結果の入ったquery_resultから値をlogに出力しています。

query_resultQueryResponse型 になるのでQueryResponse型のドキュメントを確認しつつ出力する値を指定します。

main:
  params: [ args ]
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
    - ExecBqQuery:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            query: |
                SELECT
                  country_name,
                  term,
                  ARRAY_AGG(STRUCT(rank,week) ORDER BY week DESC, refresh_date DESC LIMIT 1) x
                FROM
                    `bigquery-public-data.google_trends.international_top_terms`
                WHERE
                    refresh_date = 
                        (SELECT
                            MAX(refresh_date)
                        FROM
                        `bigquery-public-data.google_trends.international_top_terms`)
                        AND
                     week = 
                        (SELECT
                            MAX(week)
                        FROM
                        `bigquery-public-data.google_trends.international_top_terms`
                        )
                GROUP BY    
                    country_name, term
                ORDER BY
                    (SELECT country_name FROM UNNEST(x)), (SELECT rank FROM UNNEST(x))
            useLegacySql: false
        result: query_result
    - query_result_schema:
        call: sys.log
        args:
#          クエリ結果のスキーマ
          text: ${query_result.schema}
          severity: INFO
    - query_result_totalrows:
        call: sys.log
        args:
#          クエリ結果の件数
          text: ${query_result.totalRows}
          severity: INFO
    - query_result_rows:
        call: sys.log
        args:
#          クエリ結果の1行目の1列目(country_name)の値
          text: ${query_result.rows[0].f[0].v}
          severity: INFO
    - query_result_rows3:
        call: sys.log
        args:
#          クエリ結果の1行目の値
          text: ${query_result.rows[0]}
          severity: INFO
    - query_result_rows4:
        call: sys.log
        args:
#          クエリ結果全体
          text: ${query_result.rows}
          severity: INFO
    - query_result_queryresult:
        call: sys.log
        args:
#          QueryResponseすべて
          text: ${query_result}
          severity: INFO

ではこのコードを実行してみます。

$ gcloud workflows deploy bq_query --source=bq_query.yml --service-account bq_query@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1
$ gcloud workflows run bq_query --location asia-northeast1

Workflowsの実行が成功するとログにそれぞれ出力結果が表示されます。

#   クエリ結果のスキーマ
{"fields":[{"mode":"NULLABLE","name":"country_name","type":"STRING"},{"mode":"NULLABLE","name":"term","type":"STRING"},{"fields":[{"mode":"NULLABLE","name":"rank","type":"INTEGER"},{"mode":"NULLABLE","name":"week","type":"DATE"}],"mode":"REPEATED","name":"x","type":"RECORD"}]}
#   クエリ結果の件数
1023
#   クエリ結果の1行目の1列目(country_name)の値
Argentina
#   クエリ結果の1行目の値
{"f":[{"v":"Argentina"},{"v":"Polonia"},{"v":[{"v":{"f":[{"v":"1"},{"v":"2022-11-06"}]}}]}]}
#   クエリ結果全体
[{"f":[{"v":"Argentina"},{"v":"Polonia"},{"v":[{"v":{"f":[{"v":"1"},{"v":"2022-11-06"}]}}]}]},{"f":[{"v":"Argentina"},{"v":"Sanatorio Finochietto"},{"v":[{"v":{"f":[{"v":"2"},{"v":"2022-11-06"}]}}]}]},{"f":[{"v":"Argentina"},{"v":"A Q hora juega Argentina"},{"v":[{"v":{"f":[{"v":"3"},{"v":"2022-11-06"}]}}]}]},...]
#   QueryResponseすべて
{"cacheHit":true,"jobComplete":true,"jobReference":{"jobId":"job_Rp_WyYBSPaychMVqYEBvjw2Cnxs3","location":"US","projectId":"kobayashi-masahiro"},"kind":"bigquery#queryResponse","rows":[{"f":[{"v":"Argentina"},{"v":"Polonia"},{"v":[{"v":{"f":[{"v":"1"},{"v":"2022-11-06"}]}}]}]},...

クエリ結果がjson型式で出力されていることがわかります。rows全体も出力していますが、これは最大応答サイズで切られてしまいますので厳密に全件を使うことはできません。追加の行がある場合はQueryResponseのjobReferenceに含まれるjobId,locationを指定してgoogleapis.bigquery.v2.jobs.getQueryResultsコネクタを使うことで取得できます。ただし、Workflowsで最大応答サイズを超えたすべてのクエリ結果を使うことはほぼないと思います。

Method: googleapis.bigquery.v2.jobs.getQueryResults  |  Workflows  |  Google Cloud

まとめ

Cloud WorkflowsでBigQueryへクエリを実行してその結果を扱ってみました。クエリ結果全体は使うことはないと思いますが、クエリ結果の件数やエラーなどに応じて後続の処理を分岐させるといった使い方で活躍するのではないでしょうか。

最後まで読んで頂いてありがとうございました。