BigQuery のデータを「データ統合基盤 CS アナリティクス」を使って Google Cloud Storage にエクスポート

2020.09.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

弊社クラスメソッドの自社プロダクト CS アナリティクス(以下 CSA )は、短期間、低コストで導入可能な統合データ分析基盤です。

概要

Python クライアントライブラリを使用して BigQuery のテーブルデータを GCS にエクスポートすることができます。

本エントリでは、CSA に登録した Python プログラムをジョブ実行することで、BigQuery のデータを GCS に CSV ファイルでエクスポートしてみます。

また、CSA のジョブで SQL と Python プログラムを実行して、ターゲットテーブルから特定の条件で抽出した一部のデータだけを、同様に GCS に CSV ファイルとしてエクスポートしてみます。

CSA JMCの挙動確認バージョン

当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。

  • CSA JMC v5.0.0

前提

エクスポート対象のデータは、既に BigQuery のテーブルに格納されているものとします。

CSA の 構成要素設定、BigQuery接続設定手順は割愛します。詳細は以下のエントリをご参照ください。

また、GCP の管理コンソールから BigQuery API を有効に設定済みです。 クライアントライブラリを使用する場合、Google API 経由で BigQuery にアクセスする必要があるためです。

BigQuery テーブルの全データをエクスポート

Python プログラムを準備

BigQuery Python クライアントライブラリでテーブルデータを GCS にエクスポートする、以下のプログラムをローカル PC に保存しました。

from google.cloud import bigquery
import csa_env

def main():
    vars = csa_env.get().get('vars')
    bucket_name = vars.get('BUCKET')
    project = vars.get('PROJECT')
    dataset_id = vars.get('DATASET')
    table_id = vars.get('TABLE')

    destination_uri = "gs://{}/export/{}.csv".format(bucket_name, table_id)
    dataset_ref = bigquery.DatasetReference(project, dataset_id)
    table_ref = dataset_ref.table(table_id)

    client = bigquery.Client()
    extract_job = client.extract_table(
        table_ref,
        destination_uri,
    )
    extract_job.result()

    print(
        "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
    )

バケット名、テーブル名などのリテラルは、実行引数として CSA 管理画面から指定したものをプログラム実行時に取得する想定です。

CSA にプログラムを登録してジョブを作成

CSA 管理画面「構成要素」メニュー「プログラム」から、Python プログラムを登録します。

「フォルダを選択」ボタンをクリックして、ローカル PC に保存したプログラムファイルを CSA にアップロードします。

続いて「ジョブ」メニュー「ジョブ一覧」から、プログラムを実行するジョブを追加します。

「ジョブの追加」ボタンをクリックして「ジョブ名」を入力して「追加」します。

画面下部「構成要素」欄の「編集」ボタンをクリックして、先ほど登録したプログラムを構成要素に設定します。

「現在の構成要素」欄に実行するプログラムを移動したら、「引数」ボタンをクリックして実行引数を追加します。

「保存」ボタンをクリックしてジョブを保存すれば、ジョブ作成完了です。

CSA ジョブを実行

ジョブ一覧画面から、作成したジョブを実行します。

実行開始のメッセージが表示されたのを確認して、しばらく待ちます。

実行履歴カウンターの「成功」がカウントアップされれば、ジョブ実行完了です。

GCS 管理コンソールからも、CSV ファイルが出力されていることが確認できました。

以下が出力対象の BigQuery のテーブルデータです。

ファイルをダウンロードして中身も確認してみます。

no,name,start_date,end_date,kt,hpa,update_timestamp
1,ヴォンフォン,2020-05-12,2020-05-16,85,960,2020-05-31 10:00:00 UTC
2,ヌーリ,2020-06-12,2020-06-14,40,996,2020-07-01 10:00:00 UTC
3,シンラコウ,2020-08-01,2020-08-02,35,992,2020-06-30 10:00:00 UTC
4,ハグピート,2020-08-01,2020-08-06,70,975,2020-08-31 10:00:00 UTC
5,チャンミー,2020-08-09,2020-08-11,45,996,2020-08-31 10:00:00 UTC
6,メーカラー,2020-08-10,2020-08-11,50,992,2020-08-31 10:00:00 UTC
7,ヒーゴス,2020-08-18,2020-08-20,55,992,2020-09-01 10:00:00 UTC
8,バービー,2020-08-22,,85,950,2020-09-01 10:00:00 UTC
9,メイサーク,2020-08-28,,,,2020-09-01 10:00:00 UTC

期待通り、BigQuery テーブルの全てのデータが CSV ファイルにエクスポートされたことが確認できました。

BigQuery テーブルの一部のデータをエクスポート

Python クライアントでテーブルの一部のデータのみ SELECT してエクスポートする場合、一度エクスポート対象データを別テーブルに格納する必要があります。

先ほどと同じデータエクスポートプログラムの実行前後に、エクスポート対象の一時テーブルを作成する SQL と一時テーブルを削除する SQL を実行します。

SQL を準備

対象テーブルから start_date2020-08-01 以降の update_timestamp カラムを除いたデータを SELECT して新しく一時テーブルを作成する以下の SQL を、ローカル PC に保存しました。

CREATE TABLE {{ vars.DATASET }}.{{ vars.TABLE }} AS (
SELECT
  * EXCEPT(update_timestamp)
FROM
  {{ vars.DATASET }}.{{ vars.TABLE_SRC }}
WHERE
  start_date >= '2020-08-01'
)

データセット名とテーブル名は、プログラム実行時と同様、CSA のジョブ登録時に実行引数として指定する想定です。

作成した一時テーブルデータのエクスポートは、先ほどと同じ Python プログラムで実行します。

エクスポート完了後に一時テーブルを削除する以下の SQL も準備しました。

DROP TABLE IF EXISTS {{ vars.DATASET }}.{{ vars.TABLE }}

CSA に SQL を登録してジョブを作成

「構成要素」メニュー「SQL」から、準備した SQL を CSA に登録します。

「フォルダを選択」ボタンをクリックし、SQL ファイルを ローカル PC から CSV にアップロードします。

続いてジョブを追加します。

一時テーブル作成 SQL、 データエクスポート実行プログラム、一時テーブル削除 SQL の順に構成要素を設定したら、「引数」ボタンをクリックします。

各構成要素に、以下の実行引数を指定しました。

  • 一時テーブル作成 SQL 実行引数
    • DATASET:csa_mikami
    • TABLE_SRC:typhoon_2020
    • TABLE:typhoon_2020_tmp
  • データエクスポート実行プログラム実行引数
    • BUCKET:csa-mikami
    • PROJECT:csa-dev-v5
    • DATASET:csa_mikami
    • TABLE:typhoon_2020_tmp
  • 一時テーブル削除 SQL 実行引数
    • DATASET:csa_mikami
    • TABLE:typhoon_2020_tmp

CSA ジョブを実行

ジョブ保存後、手動実行してみます。

ジョブ実行が正常終了したことを確認後、GCS にも CSV ファイルがエクスポートされていることが確認できました。

想定通りであれば、以下のデータが CSV ファイルに出力されたはずです。

CSV ファイルの中身は以下です。

no,name,start_date,end_date,kt,hpa
7,ヒーゴス,2020-08-18,2020-08-20,55,992
6,メーカラー,2020-08-10,2020-08-11,50,992
3,シンラコウ,2020-08-01,2020-08-02,35,992
4,ハグピート,2020-08-01,2020-08-06,70,975
5,チャンミー,2020-08-09,2020-08-11,45,996
9,メイサーク,2020-08-28,,,
8,バービー,2020-08-22,,85,950

期待通りの抽出データが CSV ファイルにエクスポートされたことが確認できました。

まとめ

CSA では Python プログラムや SQL などを組み合わせて、GUI 操作で簡単にジョブを作成することができます。

プログラムや SQL で使用するテーブル名やバケット名などのリテラルは、プレースホルダーで記載して CSA 管理画面からジョブ実行引数やサイト変数などで自由に指定することができるので、必要な処理に合わせて構成要素を使いまわすことも可能です。

BigQuery にも対応可能な統合データ分析基盤 CSA について、少しでもご興味をお持ちいただけましたら、ぜひお気軽に弊社クラスメソッドにご連絡ください!