
BigQuery では本当にトランザクション処理がサポートされていないのか確認してみた
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、みかみです。
RDBMS など多くのデータベースでは、複数の SQL をひとまとめで実行し、途中でエラーが発生した場合に実行前の状態に自動でロールバックしてくれる、トランザクション処理をサポートしています。
BigQuery のドキュメントには、以下の記載がありました。
各 DML ステートメントは、暗黙のトランザクションを開始します。つまり、成功した各 DML ステートメントの終了時に、ステートメントによる変更が自動的にコミットされます。複数ステートメントのトランザクションはサポートされていません。
やりたいこと
- BigQuery は 本当に複数 SQL 文のトランザクション処理をサポートしていないのか確認したい
- BigQuery では どんな方法でもトランザクション処理を実行できないのか確認したい
- BigQuery でトランザクション処理を実行したい場合、どういう方法があるのか考えてみたい
結論
2020年3月現在、BigQuery では複数 SQL 文のトランザクション処理をサポートしていません。
BigQuery に対して SQL を実行する I/F は複数あります。
が、どの実行方法でもエラー発生時に自動ロールバックは行われません。
そもそも BigQuery は OLAP(OnLine Analytical Processing) 向けの DWH であり、OLTP(OnLine Transaction Processing) で必要なトランザクション処理をサポートしていなくても、クリティカルな問題は発生しないはずです。
また、もし BigQuery でトランザクション処理を実行したい場合でも、SQL や 処理内容を検討することで、処理結果を担保することは可能です。
利用する側で、それぞれのデータベースサービスの特性を理解したうえで使用することが必要だと思いました。
前提
BigQuery には、以下の3レコードのデータが入ったテーブルがあるものとします。

また、Google Cloud SDK( bq コマンド )や、BigQuery クライアントライブラリ for Python( google-cloud-bigquery )、GCS クライアントライブラリ for Python( google-cloud-storage )の実行環境は準備済みであることを前提としています。
- Google Cloud SDK をAmazon Linux 2環境にインストール(スクリプトベース+検証コード利用) | Developers.IO
- クイックスタート: クライアント ライブラリの使用 | BigQuery ドキュメント
- google-cloud-storage | PyPI
複数 SQL 文の一括実行でトランザクションが効かないことを確認
以下の SQL 文を1ファイルに記載しました。
- table1 のバックアップテーブル( table1_bk )を作成
- table1 のデータを全件削除
- table1_bk から table1 に、x = 1 のレコードを INSERT
- table1 の x = 1 のレコードを、y = 'one' で UPDATE
- table1_bk を DROP
CREATE TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk` AS SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1`; DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True; INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_bk` WHERE x = 1); UPDATE `cm-da-mikami-yuki-258308.ds_test.table1` SET y = 'one' WHERE x = 1; DROP TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk`;
bq query コマンドで、ファイル指定して実行してみます。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq query --use_legacy_sql=false < test_exec.sql Waiting on bqjob_r1d23da442ddb62ff_000001710fb62890_1 ... (6s) Current status: DONE
実行後のデータを確認してみると、SQL 操作の通り、テーブルデータが UPDATE され、バックアップテーブルはきちんと削除されています。

次に、トランザクション処理確認のため、3. の INSERT 文でエラーを発生させてみます。
CREATE TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk` AS SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1`; DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True; -- INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_bk` WHERE x = 1); INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_tmp` WHERE x = 1); UPDATE `cm-da-mikami-yuki-258308.ds_test.table1` SET y = 'one' WHERE x = 1; DROP TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk`;
もしファイル内の全 SQL が同一トランザクションで実行されるのであれば、エラーによりロールバックが発生して、テーブルデータ削除前の状態に戻るはずです。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq query --use_legacy_sql=false < test_exec.sql Waiting on bqjob_r437602cf323df969_000001710fc1c479_1 ... (2s) Current status: DONE Error in query string: Error processing job 'cm-da-mikami-yuki-258308:bqjob_r437602cf323df969_000001710fc1c479_1': Not found: Table cm-da-mikami-yuki-258308:ds_test.table1_tmp was not found in location asia-northeast1 at [4:1]
SQL 実行エラーが発生したことを確認してから、テーブルデータを見てみます。

テーブルデータが全件削除されたままとなり、ロールバックされていないことが確認できました。。
BigQuery scripting で複数の SQL 文を一括実行したらどうなるのか確認
BigQuery のドキュメントを確認していると、何やらトランザクション処理っぽい記載が!
BigQuery スクリプトを使用すると、1 回のリクエストで複数のステートメントを BigQuery に送信して、変数を使用したり、IF、WHILE などの制御フロー ステートメントを使用できます。
BEGIN
sql_statement_list
END;
説明
BEGIN は、宣言された変数が対応する END まで存在するステートメントのブロックを開始します。sql_statement_list は、それぞれがセミコロンで終わるゼロ個以上の SQL ステートメントのリストです。
先ほどと同じ、途中でエラーが発生する複数の SQL 文を、BEGIN と END で括って実行してみます。
BEGIN
    CREATE TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk` AS SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1`;
    DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True;
    -- INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_bk` WHERE x = 1);
    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_tmp` WHERE x = 1);
    UPDATE `cm-da-mikami-yuki-258308.ds_test.table1` SET y = 'one' WHERE x = 1;
    DROP TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk`;
END;
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq query --use_legacy_sql=false < test_exec_scripting.sql Waiting on bqjob_r673f7697e3baeb2f_000001710fd8003e_1 ... (2s) Current status: DONE Error in query string: Error processing job 'cm-da-mikami-yuki-258308:bqjob_r673f7697e3baeb2f_000001710fd8003e_1': Not found: Table cm-da-mikami-yuki-258308:ds_test.table1_tmp was not found in location asia-northeast1 at [5:5]
実行結果を確認してみると・・・

あれ? やっぱりロールバックされてない。。
もう少しドキュメントを読み進めてみます。
BEGIN
sql_statement_list
EXCEPTION WHEN ERROR THEN
sql_statement_list
END;
説明
BEGIN...EXCEPTION はステートメントのブロックを実行します。ステートメントのいずれかにエラーが発生した場合、スクリプトは残りのブロックをスキップし、EXCEPTION 句のステートメントを実行します。
BigQuery の BEGIN はやはりトランザクション開始の意味ではなく、エラーが発生しても自動ロールバックは行われないものの、例外処理を書くことができるので、ロールバック用の SQL を明記してあげれば良さそうです。
SQL ファイルを以下に修正して、再度実行してみます。
BEGIN
    CREATE TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk` AS SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1`;
    DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True;
    -- INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_bk` WHERE x = 1);
    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_tmp` WHERE x = 1);
    UPDATE `cm-da-mikami-yuki-258308.ds_test.table1` SET y = 'one' WHERE x = 1;
    DROP TABLE `cm-da-mikami-yuki-258308.ds_test.table1_bk`;
EXCEPTION WHEN ERROR THEN
    IF (SELECT COUNT(*) FROM `cm-da-mikami-yuki-258308.ds_test.table1`) <> (SELECT COUNT(*) FROM `cm-da-mikami-yuki-258308.ds_test.table1_bk`) THEN
        DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True;
        INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` (SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_bk`);
    END IF;
    DROP TABLE IF EXISTS `cm-da-mikami-yuki-258308.ds_test.table1_bk`;
END;
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq query --use_legacy_sql=false < test_exec_scripting_rollback.sql Waiting on bqjob_rd869310151deee5_0000017110327247_1 ... (5s) Current status: DONE
クエリが実行されていること、およびテーブルデータがロールバックされていることが確認できました。


なお、ドキュメントに記載の通り、2020年3月現在、BigQuery scripting はベータ版とのことです。 ご使用の際にはどうぞご留意ください。
bq load コマンドの --replace オプション指定
利用可能なケースは限られますが、例えば、GCS 上のファイルから BigQuery の既存テーブルに、全件洗い替えでデータをロードするケースを想定しました。
全件洗い替えでロードするためにロード済みのデータを DELETE してしまうと、もしロードエラーが発生した場合、ロールバックが行われないため、既存データが消失してしまいます。
GCS に以下のデータファイルを配置しました。
3,san 4,yon 5,go
これを bq load コマンドで普通に BigQuery にロードすると、追加書き込みとなるため、ロード前に別途ロード済みデータを全件削除する必要が出てくるのですが、
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq load ds_test.table1 gs://test-mikami/test_data_3-5.txt x:integer,y:string Waiting on bqjob_r6d85a33c2104fca5_000001710bbf8ecc_1 ... (1s) Current status: DONE

bq load コマンドに、--replace オプションを付けることにより、明示的に DELETE を実行することなく全件洗い替えでデータをロードすることができます。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq load --replace ds_test.table1 gs://test-mikami/test_data_3-5.txt x:integer,y:string Waiting on bqjob_r1569f7801413f1e7_000001710bca6171_1 ... (1s) Current status: DONE

念のため、--replace オプション指定時のロードエラーケースも確認してみます。 スキーマ定義でデータ型の指定を不正にして、わざとロードエラーを発生させてみました。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ bq load ds_test.table1 gs://test-mikami/test_data_3-5.txt x:integer,y:integer BigQuery error in load operation: Error processing job 'cm-da-mikami-yuki-258308:bqjob_r35ed00df49dad5bd_000001710bd9568f_1': Provided Schema does not match Table cm-da-mikami-yuki-258308:ds_test.table1. Field y has changed type from STRING to INTEGER
既存データには影響ないことを確認できました。

実行する SQL 文を検討してみる
BigQuery では複数 SQL のトランザクション処理はサポートされていませんが、SQL 1文の中での ACID は担保されているため、複数の SQL 文で実行している処理を1文にまとめられれば、エラーが発生したとしても既存データに影響は出ないはずです。
例えば GCS 上のファイルから BigQuery の既存テーブルに、重複レコードは削除したうえでデータを差分ロードするケースを想定しました。
- 既存テーブルから重複レコードを削除
- ファイルデータをロード
の手順を踏んでしまうと、もし2. でエラーが発生した場合、1. で削除した既存データが消失したままとなってしまいます。
table1 がロード先の既存テーブル、table1_temp にロード対象のファイルデータがロードされているものとして、以下のSQL 文を実行するイメージです。
DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE x in (SELECT x FROM `cm-da-mikami-yuki-258308.ds_test.table1_temp`); INSERT INTO table1 SELECT * FROM table1_temp;
上記、DELETE + INSERT の2つの SQL は、MERGE 構文で1文にすることができます。
MERGE table1 org
    USING table1_temp tmp
    ON org.x = tmp.x AND org.y = tmp.y
    WHEN NOT MATCHED THEN
        INSERT(x, y) VALUES(x, y)
;
GCS に配置した以下のデータファイルを MERGE 構文を使って差分ロードしてみます。
3,san 4,yon 5,go
以下の Python コードを準備しました。
from google.cloud import bigquery
import subprocess
import json
project_id='cm-da-mikami-yuki-258308'
data_set='ds_test'
table_name='table1'
table_temp='{}_temp'.format(table_name)
schema_json = '{}_schema.json'.format(table_name)
load_file = 'gs://test-mikami/test_data_3-5.txt'
client = bigquery.Client()
try:
    # 一時テーブルに GCS データをロード
    ## テーブルスキーマを取得
    cmd_get_schema = 'bq show --schema --format=prettyjson {}.{} > {}'.format(data_set, table_name, schema_json)
    subprocess.call(cmd_get_schema, shell=True)
    ## データロード
    cmd = 'bq load {}.{} {} {}'.format(data_set, table_temp, load_file, schema_json)
    subprocess.call(cmd, shell=True)
    # 本テーブルデータを差分更新
    ## カラム名を取得
    with open(schema_json, 'r', encoding='utf-8') as f:
        schema = f.read()
    d = json.loads(schema)
    columns = [i.get('name') for i in d if i.get('name')]
    ## MERGEクエリ作成
    query_on = ''
    for c in columns:
        if not query_on:
            query_on = query_on + 'ON org.{0} = tmp.{0}'.format(c)
        else:
            query_on = query_on + ' AND org.{0} = tmp.{0}'.format(c)
    query = 'MERGE {0}.{1} org USING {0}.{2} tmp {3} WHEN NOT MATCHED THEN INSERT({4}) VALUES({4})'.format(data_set, table_name, table_temp, query_on, ','.join(columns))
    ## MERGEクエリ実行
    query_job = client.query(query)
except Exception as e:
    raise e
finally:
    # 一時テーブル削除
    query_drop = 'DROP TABLE IF EXISTS `{}.{}.{}`'.format(project_id, data_set, table_temp)
    query_job = client.query(query_drop)
    # テーブルスキーマファイル削除
    subprocess.call('rm -f {}'.format(schema_json), shell=True)
実行してみると、想定通り、差分の2レコードが追加されたことが確認できました。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ python test_load_diff.py Waiting on bqjob_r35cda804d4629b14_000001710c45a13a_1 ... (1s) Current status: DONE

続いて、わざと不正な MERGE クエリを実行して、エラー発生時に既存データが変更されないことを確認してみます。
前述の Python コードの一部を以下に変更して実行してみました。
(省略)
    ## MERGEクエリ作成
    query_on = ''
    for c in columns:
#        if not query_on:
#            query_on = query_on + 'ON org.{0} = tmp.{0}'.format(c)
#        else:
#            query_on = query_on + ' AND org.{0} = tmp.{0}'.format(c)
        query_on = query_on + 'ON org.{0} = tmp.{0}'.format(c)
    query = 'MERGE {0}.{1} org USING {0}.{2} tmp {3} WHEN NOT MATCHED THEN INSERT({4}) VALUES({4})'.format(data_set, table_name, table_temp, query_on, ','.join(columns))
    ## MERGEクエリ実行
    query_job = client.query(query)
(省略)
クエリ実行履歴で SQL が実行エラーになったことを確認後、

既存データにも影響が出ていないことが確認できました。

処理内容を検討してみる(バックアップ作成)
処理完了まで既存テーブルのバックアップを保持しておけば、エラー発生時にも明示的にロールバック可能です。
先ほどと同じく、差分更新でデータをロードするケースを想定しています。
以下の BigQuery スクリプトを記載した SQL ファイルと、Python スクリプトを準備しました。
BEGIN
    CREATE OR REPLACE TEMP TABLE table1_bk AS SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1`;
    DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE x in (SELECT x FROM `cm-da-mikami-yuki-258308.ds_test.table1_temp`);
    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_temp`;
EXCEPTION WHEN ERROR THEN
    DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True;
    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` SELECT * FROM table1_bk;
END;
import subprocess
project_id='cm-da-mikami-yuki-258308'
data_set='ds_test'
table_name='table1'
table_temp='{}_temp'.format(table_name)
schema_json = '{}_schema.json'.format(table_name)
load_file = 'gs://test-mikami/test_data_3-5.txt'
sql_file = 'test_load_diff_backup.sql'
try:
    # 一時テーブルに GCS データをロード
    ## テーブルスキーマを取得
    cmd_get_schema = 'bq show --schema --format=prettyjson {}.{} > {}'.format(data_set, table_name, schema_json)
    subprocess.call(cmd_get_schema, shell=True)
    ## データロード
    cmd = 'bq load {}.{} {} {}'.format(data_set, table_temp, load_file, schema_json)
    subprocess.call(cmd, shell=True)
    # SQLファイルを実行
    cmd = 'bq query --use_legacy_sql=false < {}'.format(sql_file)
    subprocess.call(cmd, shell=True)
except Exception as e:
    raise e
finally:
    # 一時テーブル削除
    cmd = 'bq rm -f {}.{}'.format(data_set, table_temp)
    subprocess.call(cmd, shell=True)
    # テーブルスキーマファイル削除
    subprocess.call('rm -f {}'.format(schema_json), shell=True)
上記 Python 実行で、ファイルデータが差分ロードされることを確認します。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ python test_load_diff_backup.py Waiting on bqjob_r135268effbdb7364_000001711652e6f7_1 ... (1s) Current status: DONE Waiting on bqjob_r17acf64bc3414e43_000001711652faae_1 ... (3s) Current status: DONE

次に、エラーケース確認のため、INSERT 文でエラーになるように SQL ファイルを変更して実行してみます。
BEGIN
    CREATE OR REPLACE TEMP TABLE table1_bk AS SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1`;
    DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE x in (SELECT x FROM `cm-da-mikami-yuki-258308.ds_test.table1_temp`);
--    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_temp`;
    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` SELECT * FROM `cm-da-mikami-yuki-258308.ds_test.table1_temp_err`;
EXCEPTION WHEN ERROR THEN
    DELETE FROM `cm-da-mikami-yuki-258308.ds_test.table1` WHERE True;
    INSERT INTO `cm-da-mikami-yuki-258308.ds_test.table1` SELECT * FROM table1_bk;
END;
エラー発生時の例外は BigQuery スクリプトで Catch してロールバックしているので、クエリ実行結果としては「成功」となりますが、SQL が実行されたことを確認し

既存データも正常にロールバックされていることが確認できました。

処理内容を検討してみる(サーバ側で処理)
BigQuery での DELETE や INSERT、MERGE などの DML の実行には、1日あたりのクエリ実行数に上限があるそうです。
データ操作言語(DML)ステートメントには以下の上限が適用されます。
テーブルごとの INSERT、UPDATE、DELETE、MERGE の各ステートメントを合わせた 1 日あたりの最大数 - 1,000
サーバ側のリソース次第でもありますが、あらかじめサーバで必要なトランザクション処理実行済みのデータを BigQuery に再ロードしてあげれば、DML の上限や BigQuery のトランザクションを気にする必要はありません。
なお、1日あたりのデータロードにも同様に制限がありますが、こちらはまず上限に引っかかることはなさそうです。
BigQuery にデータを読み込むときは、以下の各上限が適用されます。
1 日あたりのテーブルあたり読み込みジョブ数 - 1,000(失敗を含む)
1 日あたりのプロジェクトあたり読み込みジョブ数 - 100,000 個(失敗を含む)
先ほどと同じく、GCS に配置されているファイルを差分更新で ロードするケースを想定しています。
処理内容としては、以下のイメージです。
- 既存テーブルデータを取得
- GCS ファイルデータを取得
- 1.と2. をマージ
- 3. を既存テーブルに全件洗い替えでロード
以下の Python スクリプトを準備しました。
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
from io import BytesIO
project_id='cm-da-mikami-yuki-258308'
data_set='ds_test'
table_name='table1'
bucket_name = 'test-mikami'
load_file_path = 'test_data_3-5.txt'
try:
    # GCSからファイルデータを取得
    gcs = storage.Client()
    bucket = gcs.get_bucket(bucket_name)
    blob = bucket.get_blob(load_file_path)
    contents = blob.download_as_string()
    df_file = pd.read_csv(BytesIO(contents), names=('x', 'y'), header=None)
    # BigQueryから既存テーブルデータを取得
    bq = bigquery.Client()
    sql = "SELECT * FROM `{}.{}.{}`".format(project_id, data_set, table_name)
    df = bq.query(sql).to_dataframe()
    # ファイルデータをマージして重複行削除
    df = pd.concat([df, df_file], ignore_index=True).drop_duplicates()
    # BigQueryにデータをロード
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("x", bigquery.enums.SqlTypeNames.INTEGER),
            bigquery.SchemaField("y", bigquery.enums.SqlTypeNames.STRING),
        ],
        write_disposition="WRITE_TRUNCATE",
    )
    job = bq.load_table_from_dataframe(
        df, '{}.{}'.format(data_set, table_name), job_config=job_config
    )
    job.result()
except Exception as e:
    raise e
実行後、テーブルデータを確認してみます。
(test_bq) [ec2-user@ip-10-0-43-239 ~]$ python test_load_diff_merged.py (test_bq) [ec2-user@ip-10-0-43-239 ~]$

意図通り、BigQuery 側でトランザクション処理実行することなく、データを差分ロードすることができました。
まとめ(所感)
個人的に、これまでの業務では RDBMS に始まり PostgreSQL 互換の AWS Redshift をメインに触っていたので、BigQuery でトランザクション処理をサポートしていないことに初めは少しびっくりしました。
ですが、それは私が BigQuery の特性を理解していないかっただけですし、また BigQuery もドキュメントに「ベータ版」の記載が多々見られるように、日々新しい機能サポートが追加されている状況です。
データベース移行やクラウド移行などよく聞くワードではありますが、使う側として、それぞれのデータベースサービスの特性を見極めることが大切だと感じました。







