テーブル作成めんどくさくないですか? CS アナリティクスで BigQuery のテーブルを自動作成してみた

2021.02.02

こんにちは、みかみです。

弊社クラスメソッドの自社プロダクト CS アナリティクス(以下 CSA )は、短期間、低コストで導入可能な統合データ分析基盤です。

概要

データベースにデータをロードする場合、ほとんどのデータベースではあらかじめロード先のテーブルを作成しておく必要がありますが、 BigQuery ではデータロード時に自動検出オプションを指定することにより、データファイルからスキーマを検出してテーブルを自動作成することができます。

特に BigQuery のテーブルではデータのネスト構造をサポートしており、子オブジェクトを持つ JSON ファイルデータなどをそのままロードすることができますが、 ファイル項目に合わせてネスト構造を持つテーブル定義を手動で作成するのは少し手間がかかります。 自動検出機能を使えば、データ項目の構造を意識することなく、ファイルデータをテーブルとして BigQuery に格納することができて便利です。

CSA でも GUI 画面操作だけで、ロードするファイルからスキーマを検出して自動でテーブル作成することができます。

前提

CSA の DWH 接続情報は設定済みです。詳細は以下のエントリをご参照ください。

また、動作確認のため、下記のサンプルデータを準備しました。

気象庁ホームページ 2011 年~ 2020 年桜の開花日のデータをもとに作成した、JSONL( JSON Lines )ファイルです。

{"place": "稚内", "data": {"years": ["2011-05-19", "2012-05-14", "2013-05-26", "2014-05-11", "2015-05-03", "2016-05-13", "2017-05-09", "2018-05-12", "2019-05-07", "2020-05-10"], "avg": "05-14"}, "kind": "えぞやまざくら"}
{"place": "旭川", "data": {"years": ["2011-05-09", "2012-05-02", "2013-05-18", "2014-05-02", "2015-04-27", "2016-05-03", "2017-05-03", "2018-04-30", "2019-05-01", "2020-05-03"], "avg": "05-05"}, "kind": "えぞやまざくら"}
{"place": "網走", "data": {"years": ["2011-05-16", "2012-05-03", "2013-05-25", "2014-05-07", "2015-04-30", "2016-05-08", "2017-05-05", "2018-05-02", "2019-05-05", "2020-05-07"], "avg": "05-11"}, "kind": "えぞやまざくら"}
(省略)
{"place": "宮古島", "data": {"years": ["2011-01-17", "2012-01-18", "2013-01-19", "2014-01-16", "2015-01-22", "2016-02-04", "2017-01-30", "2018-01-23", "2019-01-07", "2020-02-07"], "avg": "01-16"}, "kind": "ひかんざくら"}
{"place": "那覇", "data": {"years": ["2011-01-07", "2012-01-22", "2013-12-28", "2014-01-15", "2015-01-15", "2016-01-21", "2017-01-14", "2018-01-10", "2019-01-10", "2020-01-06"], "avg": "01-18"}, "kind": "ひかんざくら"}
{"place": "南大東島", "data": {"years": ["2011-01-24", "2012-01-17", "2013-01-07", "2014-01-07", "2015-01-15", "2016-02-09", "2017-01-27", "2018-01-07", "2019-01-25", "2020-02-06"], "avg": "01-20"}, "kind": "ひかんざくら"}

オブジェクト構造はこんな感じです。

{
	"place": "稚内",
	"data": {
		"years": [
			"2011-05-19",
			"2012-05-14",
			"2013-05-26",
			"2014-05-11",
			"2015-05-03",
			"2016-05-13",
			"2017-05-09",
			"2018-05-12",
			"2019-05-07",
			"2020-05-10"
		],
		"avg": "05-14"
	},
	"kind": "えぞやまざくら"
}

2011 年から 2020 年までのの開花日を配列とし、平均の開花日と一緒にネストしたデータ構造にしています。

作成した JSONL ファイルを、GCS にアップロードしました。

データ連携構成要素を作成

CSA 画面から「データ連携」の構成要素を作成します。

「構成要素」メニューから「データ連携」をクリックして、データ連携画面に遷移します。

データ連携画面「作成」タブから「データ連携」の構成要素を作成します。

「データセット名」をプルダウンから選択し、「テーブル名」に新規作成するテーブルの名前を入力します。 ロードするデータファイルの情報と取込方式を入力・選択していくと、スクロールした画面した方に「テーブルが存在しない場合新規作成する」という項目があります。

BigQuery 版 CSA では、この「テーブルが存在しない場合新規作成する」項目のチェックを ON にしておけば、データロード時に指定したテーブルが存在しなかった場合、ファイル項目を自動検出してテーブルも一緒に新規作成してくれます。

ジョブを作成して実行

先ほど作成した「データ連携」構成要素を実行するジョブを作成して、実際にデータをロードしてみます。

「ジョブ」メニューから「ジョブ一覧」画面に遷移して、「ジョブの追加」ボタンをクリックします。

任意の「ジョブ名」を入力したら、ジョブ詳細画面をスクロールして下の方にある「構成要素」項目の「編集」ボタンをクリック。

「設定可能な構成要素」の一覧から、先ほど作成した構成要素を「現在の構成要素」の枠にドラッグ&ドロップで移動して、「保存」ボタンをクリックします。

ジョブ詳細画面に戻ったら、画面の一番下にある「保存」ボタンをもう一度クリックするとジョブ作成完了です。

さて、いよいよジョブを実行してテーブルを作成してみますが、念のため、ジョブ実行前には BigQuery にテーブルがないことを確認しておきます。

ジョブ一覧画面から先ほど作成したジョブを手動実行してみます。

ジョブ実行が正常に終了したことを確認して

BigQuery 管理画面からテーブルを確認してみます。

無事、新しいテーブルが自動作成され、フィルデータも正常にロードできました。

自動作成したテーブルの DDL を作成

テーブル定義をファイル管理したい場合も多いと思います。

BigQuery では bq show --schema コマンドで、JSON 形式のテーブル定義を取得することができます。

bq コマンドや Google クライアントライブラリを使用してプログラムから BigQuery などにアクセスする場合、Cloud Shell を利用すると環境構築の手間がかからず便利です。

GCP 管理画面上部のアイコンをクリックするだけで、すぐに Cloud Shell を使いはじめることができます。

Cloud Shell から bq show --schema コマンドで取得したテーブル定義が以下です。

gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq show --schema --format=prettyjson csa_mikami.sakura_json
[
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "avg",
        "type": "STRING"
      },
      {
        "description": "%E4Y-%m-%d",
        "mode": "REPEATED",
        "name": "years",
        "type": "DATE"
      }
    ],
    "mode": "NULLABLE",
    "name": "data",
    "type": "RECORD"
  },
  {
    "mode": "NULLABLE",
    "name": "kind",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "place",
    "type": "STRING"
  }
]

取得した JSON スキーマは、bq load コマンドによるデータロード時や bq mk --table コマンドでのテーブル作成時、BigQuery 管理画面からテーブルを作成する場合にもスキーマ定義を「テキストとして編集」チェックを ON にして入力することで指定可能です。

また、もしテーブル定義を CREATE TABLE 文で保存しておきたい場合には、プログラムから DDL を作成することも可能です。

試しに Google クライアントライブラリを使用してテーブルスキーマを取得し、DDL を作成して SQL ファイルとして GCS に保存する、以下の Python プログラムを作成しました。

from google.cloud import bigquery
from google.cloud import storage

def get_data_type(d_type):
    if d_type == 'INTEGER':
        ret = 'INT64'
    elif d_type == 'FLOAT':
        ret = 'FLOAT64'
    elif d_type == 'BOOLEAN':
        ret = 'BOOL'
    elif d_type == 'RECORD':
        ret = 'STRUCT'
    else:
        ret = d_type
    return ret

def get_column_schema(column):
    if column.mode == "REPEATED":
        schema = "ARRAY<{}>".format(get_data_type(column.field_type))
    elif column.mode == "REQUIRED":
        schema = "{} NOT NULL".format(get_data_type(column.field_type))
    else:
        schema = get_data_type(column.field_type)
    return schema

def get_column_schema_record(column, indent="    "):
    ddl_sub = "<{}\n{}>".format(create_schema(column.fields, indent+"    "), indent)
    if column.mode == "REPEATED":
        schema = "ARRAY<STRUCT{}>".format(ddl_sub)
    elif column.mode == "REQUIRED":
        schema = "STRUCT{} NOT NULL".format(ddl_sub)
    else:
        schema = "STRUCT{}".format(ddl_sub)
    return schema

def create_schema(schema, indent="    "):
    ddl = ""
    for column in schema:
        column_name = column.name
        column_schema = get_column_schema_record(column, indent) if column.field_type == "RECORD" else get_column_schema(column)
        column_description = " OPTIONS(description='{}')".format(column.description) if column.description else ""
        ddl += "\n{}{} {}{},".format(indent, column_name, column_schema, column_description)
    ddl = ddl[:-1]
    return ddl


table_name = "csa_mikami.sakura_json"
client = bigquery.Client()
table = client.get_table(table_name)
ddl = "CREATE TABLE IF NOT EXISTS {}.{}.{}({}\n);".format(table.project, table.dataset_id, table.table_id, create_schema(table.schema))
print(ddl)

bucket_name = "csa-mikami"
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob_name = "{}.sql".format(table.table_id)
blob = bucket.blob(blob_name)
blob.upload_from_string(ddl)
print("DDL {} created.".format(blob_name))

Cloud Shell からプログラムを実行してみます。

gcp_da_user@cloudshell:~/sample (csa-dev-v5)$ python3 sakura_schema2ddl.py
CREATE TABLE IF NOT EXISTS csa-dev-v5.csa_mikami.sakura_json(
    data STRUCT<
        avg STRING,
        years ARRAY<DATE> OPTIONS(description='%E4Y-%m-%d')
    >,
    kind STRING,
    place STRING
);
DDL sakura_json.sql created.

プログラムが正常に実行できたので、GCS に DDL が出力されているかどうか確認してみます。

出力ファイルの中身も確認してみます。

CREATE TABLE IF NOT EXISTS csa-dev-v5.csa_mikami.sakura_json(
    data STRUCT<
        avg STRING,
        years ARRAY<DATE> OPTIONS(description='%E4Y-%m-%d')
    >,
    kind STRING,
    place STRING
);

無事、DDL をファイル出力することができました。

なお、このような Google クライアントライブラリを使用した Python プログラムは、CSA から実行することも可能です。

CSA からの Python プログラム実行方法は以下のエントリをご参照ください。

CSA ではジョブ実行引数やサイト変数の登録も可能なので、テーブル名をプレースホルダーとして記載した Python プログラムを登録しておいて、テーブルを新規で自動作成するたびにプログラムを実行して DDL を出力することも可能です。

まとめ

CSA を使ってデータロード時にテーブルを自動作成する方法をご紹介しました。

CSA では Python プログラムや SQL などを組み合わせて、GUI 操作で簡単にバッチジョブを作成することができます。

CSA から実行する Python プログラムでは Google クライアントライブラリを使うこともできるので、要件に合わせて柔軟にジョブを組み立てて実行することが可能です。

統合データ分析基盤 CSA は、BigQuery の他に Redshift と Snowflakeにも対応しております。 少しでもご興味をお持ちいただけましたら、ぜひお気軽に弊社クラスメソッドにご連絡ください!