BigQueryでDELETEとINSERTをスケジューリングして冪等なデータマート作成をしてみた
データアナリティクス事業本部の鈴木です。
DELETEとINSERTを使った冪等なデータマート作成クエリをBigQueryにスケジューリングしてみたのでご紹介します。
前提
今回は、以下のような3層構成のデータウェアハウスがあったとして、データウェアハウス層の2つのテーブルを使い、データマート層のテーブルを日時作成するような例を考えています。特にデータウェアハウス層のテーブルには日付のカラムがあり、スケジュール実行時に該当のデータを抽出して加工し、データマート層に当日分として格納するようなケースです。
処理はBigQueryのスケジューリング機能を使って実行する想定です。
なお、処理の途中で失敗した場合にロールバックしたいため、マルチステートメントのトランザクション機能を使っています。この機能は記事執筆時点ではプレビューであるためご注意ください。
※ 2022年10月のリリースでGAとなりました。(2020/10/21追記)
準備
検証に使うテーブルとデータを準備しておきます。
まず、asia-northeast1
に、sample_dataset
というデータセットを作成しました。
その下に、テーブルを作成します。
まずデータウェアハウス層のテーブルです。sample_dataset.log
はアクセスログ、sample_dataset.type
はデータマートを作成する際に結合するマスターデータのようなイメージです。sample_dataset.log
は時間が経つとどんどん積み上がっていく想定なので、run_date
カラムでパーティション分割しておきました。
CREATE TABLE sample_dataset.log ( ip STRING, port INTEGER, run_date DATE ) PARTITION BY run_date OPTIONS( require_partition_filter=true ); CREATE TABLE sample_dataset.type ( port INTEGER, type STRING ); INSERT `sample_dataset.log` (ip, port, run_date) VALUES('172.16.22.56', 8888, '2022-06-24'), ('172.21.104.134', 443, '2022-06-24'), ('172.30.105.13', 443, '2022-06-23'); INSERT `sample_dataset.type` (port, type) VALUES(8888, 'TCP'), (80, 'HTTP'), (8000, 'RDP'), (443, 'SSH');
データマート層のテーブルも作成します。
CREATE TABLE sample_dataset.result ( ip STRING, port INTEGER, type STRING, run_date DATE ) PARTITION BY run_date OPTIONS( require_partition_filter=true );
最終的に以下のような構成になります。
log
テーブルの中身はこのようになっています。特にrun_date
カラムはスケジュールする処理の中で参照するので重要です。
type
テーブルの中身はこのようになっています。
やってみる
クエリをスケジューリングする
今回は以下のようなクエリを作成しました。
-- トランザクションはプレビューです BEGIN TRANSACTION; -- TEMPテーブルを作成する CREATE TEMP TABLE stg_table(ip STRING, port INT, type STRING) AS -- 対象データから一時データを作成する SELECT lt.ip as ip, lt.port as port, tt.type as type FROM sample_dataset.log lt LEFT JOIN sample_dataset.type tt ON lt.port = tt.port WHERE lt.run_date = DATE(@run_time, 'Asia/Tokyo'); -- 重複箇所を削除する DELETE FROM `sample_dataset.result` mart WHERE mart.run_date = DATE(@run_time, 'Asia/Tokyo'); -- 新しい結果をInsertする INSERT `sample_dataset.result` (ip, port, type, run_date) SELECT ip, port, type, DATE(@run_time, 'Asia/Tokyo') run_date FROM stg_table; -- 一時テーブルを削除する DROP TABLE stg_table; COMMIT TRANSACTION;
ポイントは以下です。
- マルチステートメントのトランザクション機能で失敗した際にロールバックされるようにしています。
CREATE TEMP TABLE
で一時テーブルを作成しました。今回の実装だとご利益は処理がみやすくなる程度ですが、DELETE
する際に一時テーブルを条件に使うようなケースを想定しています。一時テーブルは念の為、処理の最後に削除するようにしています。@run_time
パラメータを使って更新する日付を指定しています。このパラメータは実行した時点での日時が入ります。過去分を入れたい場合はバックフィル機能を使うことができます。
上記のクエリをコンソールからスケジューリングします。スケジュールされたクエリを新規作成を押し、必要な設定を入力します。
名前は分かりやすいものを、頻度と実行時間は、検証なので適当な時間を入れておきました。
ロケーションはデータセットと同じところにしました。
動作を確認する
早速クエリを実行してみます。作成したスケジュールされたクエリの詳細画面から、バックフィルのスケジュール構成
を押します。ポップアップのうち、特定の期間で実行する
を選択し、開始日時と終了日時を選びました。
OKを推してバックフィルをスケジュールし、終了するまで待ってみます。
実行が終わったら、sample_dataset.result
のプレビューから6/24分の結果が入っていることを確認できました。
全く同じ操作をもう一度やってみても結果は同じでした。
次に、以下のSQLを実行してsample_dataset.log
のデータを一部消して、結果に反映されるか確認しました。
DELETE FROM `sample_dataset.log` WHERE port = 8888 and run_date = '2022-06-24';
同様の設定でバックフィルを実行して、結果を確認すると、以下のように削除したデータはデータマート側でも削除されました。
最後に別の日付でもバックフィルを実行し、6/24の分に影響がないことを確認しました。以下のように6/23の分のデータが作成されるようにバックフィルをスケジュールします。
実行が終わるのを待ってデータをみてみると、6/23分のデータが入っていることが確認できました。
最後に
DELETEとINSERTを使った冪等なデータマート作成クエリをBigQueryにスケジューリングしてみたのでご紹介しました。
今回はクエリパラメータで日付を持ち、スケジューリングやバックフィルの機能で処理する日付を制御しました。ロジック修正時のテーブルのデータ補正をする際は、バックフィルだと1日ずつの処理になってしまいとても時間がかかる可能性があるので、洗い替え用のSQLを用意しておいた方がよいかもしれません。
毎日の実行や数日分の再実行は、BigQueryの機能を使って簡単に実現できるのでとても便利ですね!
参考
- クエリのスケジューリング | BigQuery | Google Cloud
- Multi-statement transactions | BigQuery | Google Cloud
- BigQuery でトランザクション処理がサポート開始!! | DevelopersIO
- BigQuery のスケジューリングクエリって何がうれしいの? ~ 毎時 GCS データを集計するユースケースを想定して動かしてみた~ | DevelopersIO
- 転送の操作 | BigQuery Data Transfer Service | Google Cloud
- 分割テーブルの概要 | BigQuery | Google Cloud