データ統合基盤 CS アナリティクスで Google Cloud クライアントライブラリを使用した Python プログラムを実行してみる

2020.09.18

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

弊社クラスメソッドの自社プロダクト CS アナリティクス(以下 CSA )は、短期間、低コストで導入可能な統合データ分析基盤です。

概要

Google Cloud Platform(以下 GCP )では、Google BigQuery(以下 BigQuery )や Google Cloud Storage(以下 GCS )などの各リソースを Google Cloud API で操作することができます。 Google Cloudクライアントライブラリを使えば Google Cloud API を簡単に呼び出すことができ、各言語のプログラムで CGP リソースの操作を実行することができます。

CSA ではジョブで Python プログラムを実行することが可能で、BigQuery Python クライアントライブラリ( google-cloud-storage )と GCS Python クライアントライブラリ( google-cloud-storage )はデフォルトでインストール済みです。

今回は BigQuery にデータロードした後、クライアントライブラリを使用した Python プログラムで、テーブルを別データセットにコピーして GCS 上のファイルを別フォルダに移動します。

CSA JMCの挙動確認バージョン

当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。

  • CSA JMC v5.0.0

前提

CSA の 構成要素設定、BigQuery接続設定、データ連携構成要素登録の手順は割愛します。詳細は以下のエントリをご参照ください。

また、Google クライアントライブラリは Google Cloud API 経由で GCP にアクセスするため、GCP 側で使用するリソースの API を有効にする必要があります。

今回はクライアントライブラリで BigQuery と GCS を操作するため、BigQuery API と Cloud Storage API を有効に設定済みです。

Python プログラムを準備

以下の Python プログラムをローカル PC に保存しました。

from google.cloud import bigquery
from google.cloud import storage

from datetime import datetime as dt
import os

def copy_table(source_table, destination_table):
    client = bigquery.Client()

    job = client.copy_table(source_table, destination_table)
    job.result()
    print("A copy of the table created.")

def copy_blob(bucket_name, blob_name, destination_blob_name):
    client = storage.Client()

    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob_copy = bucket.copy_blob(
        blob, bucket, destination_blob_name
    )
    print(
        "Blob {} in bucket {} copied to blob {} in bucket {}.".format(
            blob.name,
            bucket.name,
            blob_copy.name,
            bucket.name,
        )
    )

def delete_blob(bucket_name, blob_name):
    client = storage.Client()

    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.delete()

    print("Blob {} deleted.".format(blob_name))

def main():
    # copy BigQyery Table
    project = "csa-dev-v5"
    dataset = "csa_mikami"
    dataset_dst = "csa_prd"
    table = "typhoon_2020"
    source_table = "{}.{}.{}".format(project, dataset, table)
    destination_table = "{}.{}.{}".format(project, dataset_dst, table)
    copy_table(source_table, destination_table)

    # move GCS file
    bucket_name = "csa-mikami"
    blob_name = "typhoon_2020/typhoon_2020.csv"
    destination_blob_name = os.path.join("backup", os.path.split(blob_name)[0], dt.now().strftime("%Y%m%d"), os.path.split(blob_name)[1])
    copy_blob(bucket_name, blob_name, destination_blob_name)
    delete_blob(bucket_name, blob_name)

CSA から Python プログラムを実行する場合、main 関数を記述する必要があります。 また、CSA に登録した Python プログラムから GCP にアクセスする場合、CSA の構成要素設定で指定したサービスアカウントを使用するため、サービスアカウントにプログラムに記述した操作を実行する権限があるかどうかもご確認ください。

以下のコードで、BigQuery クライアントライブラリを使用して、csa-dev-v5.csa_mikami.typhoon_2020 テーブルを csa-dev-v5.csa_prd.typhoon_2020 にコピーします。

from google.cloud import bigquery
(省略)
def copy_table(source_table, destination_table):
    client = bigquery.Client()

    job = client.copy_table(source_table, destination_table)
    job.result()
    print("A copy of the table created.")
(省略)
def main():
    # copy BigQyery Table
    project = "csa-dev-v5"
    dataset = "csa_mikami"
    dataset_dst = "csa_prd"
    table = "typhoon_2020"
    source_table = "{}.{}.{}".format(project, dataset, table)
    destination_table = "{}.{}.{}".format(project, dataset_dst, table)
    copy_table(source_table, destination_table)
(省略)

BigQuery Python クライアントライブラリのコードは GCP の公式ドキュメントやクライアントライブラリのリファレンスもご参照ください。

そして以下で、Cloud Strage Python クライアントライブラリを使って、GCS の gs://csa-mikami/typhoon_2020/typhoon_2020.csv ファイルを gs://csa-mikami/backup/typhoon_2020/YYYYMMDD/typhoon_2020.csv にコピーした後削除します。

(省略)
from google.cloud import storage

from datetime import datetime as dt
import os
(省略)
def copy_blob(bucket_name, blob_name, destination_blob_name):
    client = storage.Client()

    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob_copy = bucket.copy_blob(
        blob, bucket, destination_blob_name
    )
    print(
        "Blob {} in bucket {} copied to blob {} in bucket {}.".format(
            blob.name,
            bucket.name,
            blob_copy.name,
            bucket.name,
        )
    )

def delete_blob(bucket_name, blob_name):
    client = storage.Client()

    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.delete()

    print("Blob {} deleted.".format(blob_name))

def main():
(省略)
    # move GCS file
    bucket_name = "csa-mikami"
    blob_name = "typhoon_2020/typhoon_2020.csv"
    destination_blob_name = os.path.join("backup", os.path.split(blob_name)[0], dt.now().strftime("%Y%m%d"), os.path.split(blob_name)[1])
    copy_blob(bucket_name, blob_name, destination_blob_name)
    delete_blob(bucket_name, blob_name)

Python プログラムを CSA に登録

「構成要素」メニュー「プログラム」をクリックします。

プログラム一覧画面で「フォルダを選択」ボタンから、ローカル PC に保存した Python ファイルをアップロードします。

画面上部に「アップロードしました。」メッセージが表示されれば、アップロード完了です。

CSA のジョブを作成して実行

「ジョブ」メニュー「ジョブ一覧」から、ジョブ一覧画面に遷移します。

「ジョブの追加」ボタンをクリックし、あらかじめ登録しておいたデータ連携とプログラムを実行するジョブを新規追加します。

ジョブが追加できたら、一覧画面から実行します。

しばらく待ち、「実行履歴カウンター」の「成功」がカウントアップされれば完了です。

プログラム内に記載した print文は、ジョブ実行履歴画面から遷移可能なジョブ実行ログ画面で確認することができます。

GCP 管理コンソールからも、実行結果を確認してみます。

期待通り、BigQuery のテーブルが別データセットにコピーされています。

また、GCS のファイルが別フォルダにコピーされ、元ファイルは削除されていることが確認できました。

まとめ

CSA では、Python(3.7)のプログラムを登録してジョブの実行が可能で、以下の Python ライブラリがデフォルトでインストール済みです

  • google-cloud-bigquery
  • google-cloud-storage
  • pandas
  • requests
  • scipy
  • psycopg2
  • jinja2
  • boto3 ( AWS SDK for Python )

上記以外の Python ライブラリも、「サイト設定」->「プログラム実行設定」画面から追加することで、CSA のプログラム実行で使用することができます。

CSA のジョブで Python プログラムを実行することで、ロードデータを整形したり、ロード済みのデータファイルを削除したり、ジョブの処理内容を柔軟に実装することが可能です。

BigQuery にも対応可能な統合データ分析基盤 CSA について、少しでもご興味をお持ちいただけましたら、ぜひお気軽に弊社クラスメソッドにご連絡ください!