BigQuery に日時データをロードすると UTC に変換されてしまう問題に「データ統合基盤 CS アナリティクス」で対応する

2020.10.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

弊社クラスメソッドの自社プロダクト CS アナリティクス(以下 CSA )は、短期間、低コストで導入可能な統合データ分析基盤です。

概要

BigQuery のデフォルトタイムゾーンは UTC なので、タイムゾーン情報がない日時データを TIMESTAMP 型でロードすると、BigQuery に格納されたデータのタイムゾーンは UTC になってしまいます。

そのため、日時データを JST で BigQuery にロードするには、ロード前のデータにタイムゾーン情報を付与するか、テーブルカラムのデータ型を TIMESTAMP ではなく DATETIME 型にしてロードする必要があります。

テーブルデータ型 ロード前データ BigQuery 格納データ ロード前後の
データ整合性
TIMESTAMP 2020-10-01 09:00:00 2020-10-01 09:00:00 UTC NG
TIMESTAMP 2020-10-01T09:00:00+09:00 2020-10-01 00:00:00 UTC OK
DATETIME 2020-10-01 09:00:00 2020-10-01T09:00:00 OK
DATETIME 2020-10-01T09:00:00+09:00 なし(ロードエラー) NG

CSA では以下の構成要素を画面操作で登録し、登録した構成要素を組み合わせてジョブを作成して実行することが可能です。

  • データ連携:ファイルストレージから DWH へのデータロード
  • SQL:SQL の実行
  • プログラム:Python プログラムの実行

今回は CSA のジョブを作成して、前述の 2 つの方法で日時データを BigQuery にロードしてみます。

CSA JMCの挙動確認バージョン

当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。

  • CSA JMC v5.0.0

前提

CSA の 構成要素設定、BigQuery接続設定手順は割愛します。詳細は以下のエントリをご参照ください。

プログラム、SQL、データ連携構成要素の CSA への登録手順およびジョブ作成・実行手順も省きますので、詳細は過去エントリをご参照ください。

また、CSA からはクライアントライブラリを使用して Google API 経由で BigQuery および GCS にアクセスするため、GCP の管理コンソールから BigQuery API と Cloud Storage API を有効に設定する必要があります。

日時データを BigQuery にロードした結果を確認

初めに、タイムゾーンが付与されていない日時データをそのまま BigQuery にロードした場合の結果を確認してみます。

確認用に以下の CSV データを準備しました。

ロードと同時に新規でテーブル作成をするように指定して、BigQuery にデータロードした結果は以下です。

BigQuery ではデータロード時にファイルフォーマットに合わせて自動でデータ型を判断し、新規テーブルを作成することが可能です。日時データの場合、テーブルのデータ型は TIMESTAMP 型で自動検出してくれました。

TIMESTAMP 型はタイムゾーン付きのデータ型なので、ロード前のデータにタイムゾーン情報がない場合は、BigQuery のデフォルトタイムゾーンである UTC が付与されてしまいます。

なお、CLI などから SQL を実行してデータを参照する場合には、タイムゾーンは明示されません。

gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq query --nouse_legacy_sql \
> 'SELECT col_timestamp FROM csa_mikami.load_timestamp LIMIT 3'
Waiting on bqjob_r7bcf5afd59db54bf_00000174f891d316_1 ... (0s) Current status: DONE
+---------------------+
|    col_timestamp    |
+---------------------+
| 2020-06-28 15:18:31 |
| 1973-01-12 05:32:31 |
| 1996-10-24 16:06:47 |
+---------------------+

試しに使ってみる、またはアドホック分析など限定的な用途にしか使用しない場合、BigQuery 格納データのタイムゾーンと実際のデータのタイムゾーンが異なっていても気にする必要はないケースもあると思いますが、やはり本番運用で保守性などを考慮すると BigQuery にも連携データと同じタイムゾーンで日時データを格納しておきたいものです。。

ロードデータにタイムゾーンを付与してから BigQuery にロード

日時データに JST のタイムゾーンを付与する Python プログラムを実行し、フォーマット変換したデータファイルを BigQuery にロードします。

Python プログラムの実行とデータロード処理は、CSA から画面操作で登録し、ジョブを作成して実行します。

GCS に配置してあるデータファイルを取得し、日時項目に JST のタイムゾーンを付与する以下の Python プログラムを CSA に登録します。

from google.cloud import storage
from io import BytesIO
import pandas as pd
from datetime import datetime, timedelta, timezone
import os
import csa_env

def main():
    # get parameters
    vars = csa_env.get().get('vars')
    bucket_name = vars.get('BUCKET')
    path_src = vars.get('PATH_SRC')
    path_dst = vars.get('PATH_DST')

    # get blob objects
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = client.list_blobs(bucket_name, prefix=path_src)
    for blob in blobs:
        if blob.name == path_src:
            continue
        print(blob.name)
        blob = bucket.blob(blob.name)
        content = blob.download_as_string()
        df = pd.read_csv(BytesIO(content))
        print('before: {}'.format(df['col_timestamp'][0]))

        # formate timestamp
        jst = timezone(timedelta(hours=+9), 'JST')
        df['col_timestamp'] = df['col_timestamp'].map(lambda x: datetime.strptime(x, '%Y/%m/%d %H:%M:%S').replace(tzinfo=jst).isoformat())
        print('after : {}'.format(df['col_timestamp'][0]))

        # upload formatted file
        file_name = os.path.basename(blob.name)
        blob = bucket.blob('{}{}'.format(path_dst, file_name))
        blob.upload_from_string(data=df.to_csv(index=False))

GCS バケット名、ファイル取得元のパス、フォーマット変換後のファイル配置パスは、後ほど CSA のジョブ作成画面から実行引数で指定します。

続いて、フォーマット変換後のデータファイルを BigQuery にロードする「データ連携」構成要素を登録しました。

最後に、登録した構成要素を Python プログラム→データ連携の順に実行するジョブを作成し、実行します。

CSA の実行ログ画面から、ジョブが正常に完了したことが確認できました。

GCP 管理コンソールからも、フォーマット変換後のファイルデータと BigQuery にロードされたデータを確認してみます。

BigQuery の格納データは UTC になるので一見わかりにくいですが、付与したタイムゾーン( JST )に合わせて、ロード前のデータの日時から -09:00 の UTC で BigQuery に格納されたことが確認できました。

SQL で参照する場合には、FORMAT_TIMESTAMP 関数で JST に変換する必要があるのでご注意ください。

gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq query --nouse_legacy_sql \
> 'SELECT FORMAT_TIMESTAMP("%Y-%m-%d %H:%M:%S", col_timestamp, "Asia/Tokyo") AS col_timestamp FROM csa_mikami.load_timestamp LIMIT 3'
Waiting on bqjob_r2a939e99c3db9982_00000174f8bc1c31_1 ... (0s) Current status: DONE   
+---------------------+
|    col_timestamp    |
+---------------------+
| 2020-06-28 15:18:31 |
| 1973-01-12 05:32:31 |
| 1996-10-24 16:06:47 |
+---------------------+

TIMESTAMP 型ではなく DATETIME 型で BigQuery にロード

他に、複数のタイムゾーンにまたがるデータをロードする可能性がない場合には、日時データを TIMESTAMP 型ではなく DATETIME 型で BigQuery にロードする方法もあります。

DATETIME 型であればタイムゾーン情報が付与されないため、ロード前後でデータの整合がずれることはありません。 しかし、ロードと同時にファイルフォーマットを判定して自動的にテーブルを新規作成すると日時データのデータ型は TIMESTAMP 型になってしまうので、日時項目のカラムを明示的に DATETIME 型で定義したテーブルを作成してからファイルデーをロードする必要があります。

また、DATETIME 型にロードする場合、データの日時フォーマットがスラッシュ区切りだとエラーになってしまいます。 日時項目のカラムを DATETIME 型で定義した CREATE TABLE の SQL と、日時データを YYYY-MM-DD hh:mm:ss フォーマットに変換してからロードする CSA のジョブを作成して実行してみます。

日時項目を DATETIME 型で定義したテーブルを作成する、以下の SQL を CSA に登録しました。 データセット名、テーブル名は実行引数で指定するため、プレースホルダーで記載しています。

CREATE TABLE IF NOT EXISTS {{ vars.DATASET }}.{{ vars.TABLE }} (
    col_date DATE,
    col_timestamp DATETIME
)

また、日時データフォーマット変換と BigQuery へのロードを実行する以下の Python プログラムを CSA に登録しました。 バケット名、テーブル名などのリテラルは、SQL 同様ジョブの実行引数で指定します。

from google.cloud import storage
from google.cloud import bigquery
from io import BytesIO, StringIO
import pandas as pd
from datetime import datetime as dt
import csa_env

def main():
    # get parameters
    vars = csa_env.get().get('vars')
    bucket_name = vars.get('BUCKET')
    path_src = vars.get('PATH_SRC')
    dataset_id = vars.get('DATASET')
    table_id = vars.get('TABLE')

    # get blob objects
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = client.list_blobs(bucket_name, prefix=path_src)
    for blob in blobs:
        if blob.name == path_src:
            continue
        print(blob.name)
        blob = bucket.blob(blob.name)
        content = blob.download_as_string()
        df = pd.read_csv(BytesIO(content))

        # formate timestamp
        df['col_date'] = df['col_date'].map(lambda x: dt.strptime(x, '%Y/%m/%d').strftime('%Y-%m-%d'))
        df['col_timestamp'] = df['col_timestamp'].map(lambda x: dt.strptime(x, '%Y/%m/%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S'))
        print('formatted:\n{}'.format(df))

        # load data to bigquery
        client_bigquery = bigquery.Client()
        job_config = bigquery.LoadJobConfig(write_disposition='WRITE_APPEND')
        load_job = client_bigquery.load_table_from_file(
            StringIO(df.to_csv(header=False, index=False)),
            '{}.{}'.format(dataset_id, table_id),
            job_config=job_config,
        )
        print("Starting job {}".format(load_job.job_id))
        load_job.result()
        print("Job finished.")

SQL と Python プログラムを実行するジョブを作成して実行します。

CSA の実行ログから、ジョブが正常に実行されたことを確認できました。

BigQuery 管理コンソールからも、ロード済みのデータを確認してみます。

DATATIME 型のテーブルが作成され、TIMESTAMP 型とは異なり、ロードデータにはタイムゾーンが付与されません。

CLI から SQL を実行してデータを参照してみます。

gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq query --nouse_legacy_sql \
> 'SELECT col_timestamp FROM csa_mikami.load_datetime LIMIT 3'
Waiting on bqjob_r344c44fecbf00e45_00000175019695ef_1 ... (0s) Current status: DONE   
+---------------------+
|    col_timestamp    |
+---------------------+
| 2020-06-28T15:18:31 |
| 1973-01-12T05:32:31 |
| 1996-10-24T16:06:47 |
+---------------------+

DATATIME 型にはタイムゾーン情報が付与されないため FORMAT_TIMESTAMP 関数を使う必要はなく、ロード前のデータの日時がそのままロードされていることが確認できました。

まとめ

CSA では SQL や Python プログラムファイルを登録して、画面操作で簡単にジョブを作成・実行することが可能です。 実行引数が指定できるので、プレースホルダーを使用した汎用的な SQL やプログラムを登録しておけば、データの種類が増えた場合にジョブを追加するのも簡単です。 ジョブのスケジュール実行や実行通知設定も可能なので、毎時、日次、月次などの定時バッチの運用にも便利です。

BigQuery にも対応可能な統合データ分析基盤 CSA について、少しでもご興味をお持ちいただけましたら、ぜひお気軽に弊社クラスメソッドにご連絡ください!