BigQueryでDELETEとINSERTをスケジューリングして冪等なデータマート作成をしてみた

2022.06.27

データアナリティクス事業本部の鈴木です。

DELETEとINSERTを使った冪等なデータマート作成クエリをBigQueryにスケジューリングしてみたのでご紹介します。

前提

今回は、以下のような3層構成のデータウェアハウスがあったとして、データウェアハウス層の2つのテーブルを使い、データマート層のテーブルを日時作成するような例を考えています。特にデータウェアハウス層のテーブルには日付のカラムがあり、スケジュール実行時に該当のデータを抽出して加工し、データマート層に当日分として格納するようなケースです。

データウェアハウスの例

処理はBigQueryのスケジューリング機能を使って実行する想定です。

なお、処理の途中で失敗した場合にロールバックしたいため、マルチステートメントのトランザクション機能を使っています。この機能は記事執筆時点ではプレビューであるためご注意ください。

※ 2022年10月のリリースでGAとなりました。(2020/10/21追記)

準備

検証に使うテーブルとデータを準備しておきます。

まず、asia-northeast1に、sample_datasetというデータセットを作成しました。

データセットの作成

その下に、テーブルを作成します。

まずデータウェアハウス層のテーブルです。sample_dataset.logはアクセスログ、sample_dataset.typeはデータマートを作成する際に結合するマスターデータのようなイメージです。sample_dataset.logは時間が経つとどんどん積み上がっていく想定なので、run_dateカラムでパーティション分割しておきました。

CREATE TABLE sample_dataset.log
(
    ip STRING,
    port INTEGER,
    run_date DATE
)
PARTITION BY
    run_date
OPTIONS(
  require_partition_filter=true
);

CREATE TABLE sample_dataset.type
(
    port INTEGER,
    type STRING
);

INSERT `sample_dataset.log` (ip, port, run_date)
VALUES('172.16.22.56', 8888, '2022-06-24'),
      ('172.21.104.134', 443, '2022-06-24'),
      ('172.30.105.13', 443, '2022-06-23');

INSERT `sample_dataset.type` (port, type)
VALUES(8888, 'TCP'),
      (80, 'HTTP'),
      (8000, 'RDP'),
      (443, 'SSH');

データマート層のテーブルも作成します。

CREATE TABLE sample_dataset.result
(
    ip STRING,
    port INTEGER,
    type STRING,
    run_date DATE
)
PARTITION BY
    run_date
OPTIONS(
  require_partition_filter=true
);

最終的に以下のような構成になります。

テーブルの作成結果

logテーブルの中身はこのようになっています。特にrun_dateカラムはスケジュールする処理の中で参照するので重要です。

logテーブルの内容

typeテーブルの中身はこのようになっています。 typeテーブルの内容

やってみる

クエリをスケジューリングする

今回は以下のようなクエリを作成しました。

-- トランザクションはプレビューです
BEGIN TRANSACTION;

-- TEMPテーブルを作成する
CREATE TEMP TABLE stg_table(ip STRING, port INT, type STRING)
AS
  -- 対象データから一時データを作成する
  SELECT
    lt.ip as ip,
    lt.port as port,
    tt.type as type
  FROM sample_dataset.log lt
  LEFT JOIN sample_dataset.type tt
  ON lt.port = tt.port
  WHERE lt.run_date = DATE(@run_time, 'Asia/Tokyo');

-- 重複箇所を削除する
DELETE FROM `sample_dataset.result` mart
WHERE mart.run_date = DATE(@run_time, 'Asia/Tokyo');

-- 新しい結果をInsertする
INSERT `sample_dataset.result` (ip, port, type, run_date)
SELECT 
  ip,
  port,
  type,
  DATE(@run_time, 'Asia/Tokyo') run_date
FROM stg_table;

-- 一時テーブルを削除する
DROP TABLE stg_table;

COMMIT TRANSACTION;

ポイントは以下です。

  • マルチステートメントのトランザクション機能で失敗した際にロールバックされるようにしています。
  • CREATE TEMP TABLEで一時テーブルを作成しました。今回の実装だとご利益は処理がみやすくなる程度ですが、DELETEする際に一時テーブルを条件に使うようなケースを想定しています。一時テーブルは念の為、処理の最後に削除するようにしています。
  • @run_timeパラメータを使って更新する日付を指定しています。このパラメータは実行した時点での日時が入ります。過去分を入れたい場合はバックフィル機能を使うことができます。

上記のクエリをコンソールからスケジューリングします。スケジュールされたクエリを新規作成を押し、必要な設定を入力します。

名前は分かりやすいものを、頻度と実行時間は、検証なので適当な時間を入れておきました。

コンソールから設定する

クエリのスケジューリング1

ロケーションはデータセットと同じところにしました。

クエリのスケジューリング2

動作を確認する

早速クエリを実行してみます。作成したスケジュールされたクエリの詳細画面から、バックフィルのスケジュール構成を押します。ポップアップのうち、特定の期間で実行するを選択し、開始日時と終了日時を選びました。

バックフィル実行

OKを推してバックフィルをスケジュールし、終了するまで待ってみます。

実行が終わったら、sample_dataset.resultのプレビューから6/24分の結果が入っていることを確認できました。

6月24日分の結果1回目

全く同じ操作をもう一度やってみても結果は同じでした。

6月24日分の結果2回目

次に、以下のSQLを実行してsample_dataset.logのデータを一部消して、結果に反映されるか確認しました。

DELETE FROM `sample_dataset.log`
WHERE port = 8888 and run_date = '2022-06-24';

同様の設定でバックフィルを実行して、結果を確認すると、以下のように削除したデータはデータマート側でも削除されました。

6月24日分の結果削除後

最後に別の日付でもバックフィルを実行し、6/24の分に影響がないことを確認しました。以下のように6/23の分のデータが作成されるようにバックフィルをスケジュールします。

6月23日分の設定

実行が終わるのを待ってデータをみてみると、6/23分のデータが入っていることが確認できました。

6月23日分の結果1回目

最後に

DELETEとINSERTを使った冪等なデータマート作成クエリをBigQueryにスケジューリングしてみたのでご紹介しました。

今回はクエリパラメータで日付を持ち、スケジューリングやバックフィルの機能で処理する日付を制御しました。ロジック修正時のテーブルのデータ補正をする際は、バックフィルだと1日ずつの処理になってしまいとても時間がかかる可能性があるので、洗い替え用のSQLを用意しておいた方がよいかもしれません。

毎日の実行や数日分の再実行は、BigQueryの機能を使って簡単に実現できるのでとても便利ですね!

参考