[初心者向け]Cloud Storageに保存したSQLをPython上で呼び出してみた

Cloud Storageに保存したSQLをPython上で呼び出してみました。
2023.05.16

クラスメソッド株式会社データアナリティクス事業本部所属のニューシロです。
今回はGoogle CloudのサービスであるCloud Storageに保存したSQLをPython上で呼び出してみました。SQLで操作する対象は、同じくGoogle CloudのサービスであるBigQueryです。

執筆のきっかけ

1つのファイルにあらゆるコードを書いてしまうと、どこか変更する際に誤って他の箇所にも触れてしまうかもしれません。場所を分けて管理することも大切です。
自身の学習のためにも、ひとまとめになっていたPythonとSQLを分割した結果を初心者向けに発信しようと思いました。

本題

準備

テーブルはBigQueryの一般公開データセットを使います。様々な種類のデータセットがあって面白いです。データセットの種類についても色々調べたので、今後こちらについても情報発信しようと思います。

今回は世界の検索トレンドワードを格納してあるテーブルGoogle Trendsから、日本のトレンドワードを調べてみました。ツールはGoogle CloudのCloud Shell Editorを使用しました。

対象のコード

こちらのコードを分割します。Pythonコードは以下のリンクを参考にしました。

main.py

from google.cloud import bigquery


client = bigquery.Client()

# 実行するクエリ
query = """
    SELECT
      refresh_date,
      rank,
      term
    FROM
      `bigquery-public-data.google_trends.international_top_terms`
    WHERE
      refresh_date >= DATE_SUB(CURRENT_DATE("Asia/Tokyo"), INTERVAL 1 MONTH)
      AND term LIKE "%ゼルダ%"
      AND country_name = "Japan"
    GROUP BY
      refresh_date,
      rank,
      term
    ORDER BY
      refresh_date DESC,
      rank
"""

query_job = client.query(query)

# 結果を出力
for row in query_job:
    print("{}, Rank={}, {}".format(row["refresh_date"], row["rank"], row["term"]))

このコードで、Python上のSQLを用いてBigQueryのデータを出力することが可能です。
ちなみにトリプルクォート内は、一ヶ月以内に"ゼルダ"という単語がトレンドに入った際の日付と順位を取得するSQLです。

しかし記事の初めで述べたように、Python上にSQLを書くと、このSQLを変更する際に誤って他の箇所まで変更してしまう可能性があります。

できればSQLは別の場所で管理したいです。今回はこのコードのSQL部分をCloud Storageに保存し、その上でPythonを用いてSQLを使用していきます。

分割する

以下のコードをsqlファイルとして、Cloud Storageに作成したバケット内に保存します。 先ほどのコードのSQL部分です。

trend_zelda_in_japan.sql

SELECT
  refresh_date,
  rank,
  term
FROM
  `bigquery-public-data.google_trends.international_top_terms`
WHERE
  refresh_date >= DATE_SUB(CURRENT_DATE("Asia/Tokyo"), INTERVAL 1 MONTH)
  AND term LIKE "%ゼルダ%"
  AND country_name = "Japan"
GROUP BY
  refresh_date,
  rank,
  term
ORDER BY
  refresh_date DESC,
  rank

こちらをCloud Storageに作成したバケット内に保存しておきます。

このSQLファイルtrend_zelda_in_japan.sqlを読みとるようにPythonコードを書き換えます。

main.py

from google.cloud import bigquery
from google.cloud import storage


# バケット名、オブジェクト名からオブジェクト情報取得
storage_client = storage.Client()
bucket = storage_client.get_bucket("devio_sql")
blob = bucket.blob("trend_zelda_in_japan.sql")

# 実行するSQLをテキストデータとして取得
query = blob.download_as_text()

# BigQueryでSQL実行
bigquery_client = bigquery.Client()
query_job = bigquery_client.query(query)

# 結果を出力
for row in query_job:
    print("{}, Rank={}, {}".format(row["refresh_date"], row["rank"], row["term"]))

bucket.blob()でオブジェクト情報を取得し、download_as_text()を用いてそのオブジェクトをテキストデータとして取得しています。

new_shiro@cloudshell:~ (new_shiro)$ /usr/bin/python /home/new_shiro/devtest/dev.py
2023-05-15, Rank=21, ゼルダの伝説
2023-05-14, Rank=6, ゼルダの伝説
2023-04-19, Rank=25, ゼルダの伝説
2023-04-18, Rank=17, ゼルダの伝説
2023-04-17, Rank=15, ゼルダの伝説
2023-04-16, Rank=16, ゼルダの伝説

(やはりトレンドに入っていますね。ちなみに私も買いました!)

ちゃんとCloud Storageのファイルを読み込んで、SQLを実行することができました。
このようにコードを分けておくことで、管理がしやすくなります。

補足

オブジェクトをダウンロードする際、文字コードにも気をつけましょう(特に日本語が含まれている場合!)。
今回は大丈夫でしたが、もし文字コード関係で不都合が起きたら、download_as_text()でテキストとして取得するのではなく、download_as_bytes()でbytes型で取得してdecode()で変換するのも良いかと思います。

まとめ

管理しやすい構成を心がけていきましょう!
以上です。ここまでお読みいただきありがとうございました。

引用・参照まとめ