データ統合基盤 CS アナリティクスで BigQuery にデータを差分ロードしてみた

2020.09.14

こんにちは、みかみです。

弊社クラスメソッドの自社プロダクト CS アナリティクス(以下 CSA )は、短期間、低コストで導入可能な統合データ分析基盤です。

概要

実際の運用では、日々更新されていくデータを差分ロード(重複データは削除して、特定の項目値が新しいデータで上書き)したいという要件が発生することもあると思います。

BigQuery 管理コンソールの GUI やクライアントライブラリなどで Google Cloud Storage(以下 GCS )から Google BigQuery(以下 BigQuery ) にファイルデータをロードする場合、ロードオプションで全件洗替( WRITE_APPEND )や追加ロード( WRITE_APPEND )の指定が可能ですが、重複データの削除や特定のレコードを上書きしたい場合、別途処理が必要です。

本エントリでは、CSA を使って GCS のデータを BigQuery に差分ロードするために、構成要素(データ連携 + SQL )を組み合わせて実行するジョブを作成してみます。

連携データを一時テーブルに全件洗替でロードし、一時テーブルとターゲットテーブルのデータを SQL でマージして、最後に一時テーブルを SQL で削除します。

CSA JMCの挙動確認バージョン

当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。

  • CSA JMC v5.0.0

準備

動作確認に使用するデータとテーブルを準備します。

CSA の 構成要素設定、BigQuery接続設定の手順は割愛します。詳細は以下のエントリをご参照ください。

テストデータを作成

Wikipedia からいただいてきた 2020 年の台風データをもとに、2 つの CSV ファイルを作成しました。

1 ファイル目は既にターゲットテーブルにロード済み想定のデータです。 データの中身は台風の号数とアジア名、発生期間とピーク時の強さ( kt, hPa )、データ更新日の 7 項目です。

差分ロードする 2 ファイル目のデータは以下です。

差分ロード実行で、以下の結果となることを期待しています。

  • 1 行目の台風 6 号メーカラーは既にロード済みなのでテーブルデータは更新されない
  • 2 行目の台風 7 号ヒーゴスはデータ更新日が新しいので後からロードしたデータで上書きされる
  • 4, 5 行目の台風 8 号バービーと 9 号メイサークは未ロードのデータなのでテーブルに追加ロードされる

1 ファイル目を typhoonn_2020_base.csv、2 ファイル目を typhoonn_2020_diff.csv というファイル名で保存し、それぞれ GCS の以下のパスにアップロードしました。

  • gs://csa-mikami/typhoon_base/typhoonn_2020_base.csv
  • gs://csa-mikami/typhoon/typhoonn_2020_diff.csv

ターゲットテーブルを作成

データをロードするテーブルは BigQuery 管理画面から画面操作で作成することもできますが、データロード時にファイルフォーマットに合わせたテーブルを自動作成することもできます。 今回は CSA のデータ連携を実行して、テストデータロードと同時にテーブルも自動作成してしまいます。

CSA にログインしたら、「構成要素」メニューから「データ連携」をクリックし、「作成」タブをクリック。

データ連携構成要素の新規作成画面で、以下の項目を入力/選択して「保存」します。

  • データセット名:テーブルを作成するデータセット名を選択
  • テーブル名:新規作成するテーブル名を入力
  • バケット名:データファイルを配置したバケットを選択
  • GCSファイルパスを固定指定にする:ON
  • GCSファイルパス:「GUIで選択」ボタンをクリック後、データファイルを配置したパスを選択
  • 取り込み方式:全件洗替(WRITE_TRUNCATE)を選択
  • ファイルの種類:CSV/TSVを選択
  • ファイル囲み文字:囲み文字無を選択
  • 区切り文字:カンマを選択
  • GZip圧縮有無:無圧縮を選択
  • テーブルが存在しない場合新規作成する:ON

続いて「ジョブ」メニューから「ジョブ一覧」を選択後、ジョブ一覧画面で「ジョブの追加」ボタンをクリック。ジョブの追加ポップアップで「ジョブ名」を入力して「追加」します。

ジョブ詳細画面「構成要素」欄の「編集」ボタンをクリックし、構成要素の編集画面で先ほど作成した構成要素を「設定可能な構成要素」欄から「現在の構成要素」欄に移動して「保存」します。

ジョブ一覧画面に戻り、追加したジョブの右端の「▶」アイコンをクリックして手動実行します。 「今すぐジョブを実行」ポップアップで「確認」ボタンをクリックしたら、次の「ジョブ実行確認」ポップアップで「実行」ボタンをクリックします。

しばらく待って、ジョブ一覧画面の実行カウンター欄で「成功」がカウントアップされれば完了です。

BigQuery 管理画面からテーブルを確認してみます。

テーブルの新規作成とデータロードが正常に実行できました。

データ差分ロードジョブを追加して実行

データ連携と SQL の構成要素を登録し、登録した構成要素を実行するジョブを追加して実行してみます。

構成要素(データ連携)を追加

一時テーブルに全件洗替でデータロードする構成要素を作成します。

テーブル準備時と同様、「構成要素」メニュー「データ連携」からデータ連携一覧画面に遷移後、作成タブをクリック。 以下の項目を入力/選択してデータ連携の構成要素を追加します。

  • データセット名:テーブルを作成したデータセット名を選択
  • テーブル名:[ターゲットテーブル名]_stg を入力
  • バケット名:データファイルを配置したバケットを選択
  • GCSファイルパスを固定指定にする:ON
  • GCSファイルパス:「GUIで選択」ボタンをクリック後、データファイルを配置したパスを選択
  • 取り込み方式:全件洗替(WRITE_TRUNCATE)を選択
  • ファイルの種類:CSV/TSVを選択
  • ファイル囲み文字:囲み文字無を選択
  • 区切り文字:カンマを選択
  • GZip圧縮有無:無圧縮を選択
  • テーブルが存在しない場合新規作成する:ON

データ連携の構成要素が作成できました。

構成要素(SQL)を追加

一時テーブルにロードしたデータをターゲットテーブルにマージする SQL と、マージ完了後に一時テーブルを削除する SQL を登録します。

以下の SQL を merge_typhoon.sql というファイル名でローカル PC に保存しました。

MERGE
  csa_mikami.typhoon target
  USING
    csa_mikami.typhoon_stg stg 
  ON
    target.no = stg.no
    AND target.name = stg.name
WHEN MATCHED AND target.update_timestamp < stg.update_timestamp THEN
  UPDATE SET
    `no` = stg.no,
    name = stg.name,
    start_date = stg.start_date,
    end_date = stg.end_date,
    kt = stg.kt,
    hpa = stg.hpa,
    update_timestamp = stg.update_timestamp
WHEN NOT MATCHED THEN 
  INSERT(`no`, name, start_date, end_date, kt, hpa, update_timestamp)
  VALUES(`no`, name, start_date, end_date, kt, hpa, update_timestamp)

noname の値が一致するレコードがある場合は、もし一時テーブルの update_timestamp の方が大きい場合はターゲットテーブルのレコードを一時テーブルのレコードで上書きします。 また、noname が一致しない場合は、一時テーブルのレコードをターゲットテーブルに追加します。

また、以下の SQL を drop_typhoon_stg.sql というファイル名で保存しました。

DROP TABLE IF EXISTS csa_mikami.typhoon_stg

typhoon_stg テーブルがあれば DROP する SQL です。

この 2 つの SQL ファイルを、「構成要素」メニュー「SQL」からアップロードします。

「フォルダを選択」ボタンをクリックして、ローカル PC の SQL ファイルを保存したフォルダを選択します。

アップロード確認ポップアップが表示されるので、「アップロード」をクリック。

アップロードするファイルを確認して「アップロード」をクリック。

SQL ファイルが登録できました。

ジョブを追加

登録したデータ連携と SQL を実行するジョブを追加します。

「ジョブ」メニュー「ジョブ一覧」をクリックして、ジョブ一覧画面で「ジョブの追加」ボタンをクリック。ジョブの追加ポップアップでジョブ名を入力して「追加」します。

ジョブ詳細画面下部の「構成要素」欄の「編集」ボタンをクリックして構成要素を設定します。

構成要素の編集画面で、先ほど登録した構成要素を実行したい順番に登録して「保存」します。

ジョブ詳細画面上部に「ジョブ XXX を更新しました。」のメッセージが表示されれば、登録完了です。

ジョブ一覧画面からも、ジョブが新規作成されたことが確認できました。

ジョブを実行

追加したジョブを実行してみます。

ジョブ一覧画面「リンク」列の「▶」アイコンをクリックして手動実行します。

画面上部に実行開始メッセージが表示されたのを確認して、しばらく待ちます。

実行履歴カウンターでジョブ実行「成功」がカウントアップされたので、BigQuery 管理画面からテーブルデータを確認してみます。

こちらがジョブ実行前のテーブルデータです。 no 1~7 までの 7 レコードが格納され、no 7 のレコードの kthpa 項目値は null で、update_timestamp 項目値は 2020-08-31 10:00:00

ジョブ実行後のテーブルデータが以下です。

期待通り、no 7 のレコードがロードデータで上書きされ、重複レコードの no 6 のデータの更新はなく、差分の no 8, 9 のレコードが追加ロードされたことが確認できました。

まとめ

CSA では以下の構成要素を組み合わせて、必要なジョブを簡単に作成することができます。

  • データ連携
  • SQL
  • Python プログラム

前の構成要素の実行結果に従って後続の構成要素の実行制御も可能ですし、またジョブ実行完了時やエラー発生時にメール通知する機能も利用することができるので、要件に合わせてジョブの実装を簡単にカスタマイズできます。

BigQuery にも対応可能な統合データ分析基盤 CSA について、少しでもご興味をお持ちいただけましたら、ぜひお気軽に弊社クラスメソッドにご連絡ください!