この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
どうも!DA部の春田です。
BigQueryと言えば、複数ステートメントでのトランザクション処理がないことで有名かと思います。
上記のブログでは、SQLやbq
コマンドでの代用方法が記載されていますが、最近業務でPythonでのロールバック処理が必要となったので、本ブログではその時書いたPythonコードをご紹介します。
BQでロールバックを行うデコレータ関数
こちらが今回作成したコードです。
import uuid
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
class BqClient(bigquery.Client):
def __init__(self, project_id='haruta-takumi'):
super().__init__(project_id)
@classmethod
def rollback(cls, table_id):
'''
データロード時などにロールバックを適用させるデコレータ
ロード前のテーブルのバックアップとして取っておく
'''
bq = cls()
def wrapper(func):
def _wrapper(*args, **kwargs):
# 並列実行時にバックアップテーブルの重複を避けるためのユニーク
unique = str(uuid.uuid4())[:8]
bk_table_id = table_id + '_bk_' + unique
try:
bq.get_table(table_id)
except NotFound:
# テーブルが存在しない→バックアップの必要がないのでそのまま関数を起動
print('@rollback: Table {} is not found'.format(table_id))
func(*args, **kwargs)
else:
# テーブルが存在する→バックアップを作成
print('@rollback: Copy table for backup: {}'.format(bk_table_id))
copy_job = bq.copy_table(table_id, bk_table_id)
copy_job.result()
try:
func(*args, **kwargs)
except Exception as e:
# 関数失敗時→現在のテーブルを削除してバックアップを復元
print('@rollback: Rollback table')
bq.delete_table(table_id, not_found_ok=True)
copy_job = bq.copy_table(bk_table_id, table_id)
copy_job.result()
raise e
finally:
# 関数が成功しても失敗してもバックアップは最後に削除
print('@rollback: Delete backup table')
bq.delete_table(bk_table_id)
return _wrapper
return wrapper
ざっくり説明しますと、対象のテーブル(table_id
)が存在していれば、copy_table
メソッドを使用してバックアップを作成しておき、失敗時にはバックアップテーブルに切り替えを行う、というデコレータです。以下より、構成を解説していきます。
構成解説
導入部です。使用するライブラリは、標準ライブラリのuuid
と、Google公式のgoogle-cloud-bigquery
と例外処理用のNotFound
です。uuid
はバックアップテーブル名をユニークにするために使用します。GCPのPythonクライアントは非常に良くできているので、Clientクラスをそのまま継承させてしまいました。BqClientクラスを呼び出す時はproject_id
のみ引数で受け付けます。credentials
も欲しいなどあれば、各自で拡張してください。
import uuid
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
class BqClient(bigquery.Client):
def __init__(self, project_id='haruta-takumi'):
super().__init__(project_id)
デコレータ定義の部分です。BqClientクラスからも直接使えるように@classmethod
をくっつけておきます。デコレータの引数table_id
で指定したテーブルをバックアップしておくよう処理させます。バックアップのテーブル名はユニークになるようにuuidの一部を付与しておきます。
ちなみにこのロールバック処理は並列実行を想定したものではないです。実行中のテーブルをロックしているわけではないので、並列実行時に時系列でどの段階のテーブルがバックアップされるかは不明です。
@classmethod
def rollback(cls, table_id):
'''
データロード時などにロールバックを適用させるデコレータ
ロード前のテーブルのバックアップとして取っておく
'''
bq = cls()
def wrapper(func):
def _wrapper(*args, **kwargs):
# 並列実行時にバックアップテーブルの重複を避けるためのユニーク
unique = str(uuid.uuid4())[:8]
bk_table_id = table_id + '_bk_' + unique
try-except句からロールバック処理が始まります。最初にテーブルが存在するかどうかをget_tableメソッドで確認します。存在しない場合はNot Found
が返されるので、この場合は何もせずラッピングした関数をそのまま実行します。
それ以外の場合はelse
に分岐し、テーブルのバックアップを作成します。BigQueryにはテーブルコピーの機能があるので、copy_tableで対象テーブルをまるっとコピーしておきます。
その後、ラッピングした関数を実行し、エラーが発生した際は既存テーブルを削除し、バックアップテーブルを再コピーして復元します。成功・失敗どちらの場合でもバックアップテーブルが残っているので、finally
句で必ず削除させるようにしておきます。
try:
bq.get_table(table_id)
except NotFound:
# テーブルが存在しない→バックアップの必要がないのでそのまま関数を起動
print('@rollback: Table {} is not found'.format(table_id))
func(*args, **kwargs)
else:
# テーブルが存在する→バックアップを作成
print('@rollback: Copy table for backup: {}'.format(bk_table_id))
copy_job = bq.copy_table(table_id, bk_table_id)
copy_job.result()
try:
func(*args, **kwargs)
except Exception as e:
# 関数失敗時→現在のテーブルを削除してバックアップを復元
print('@rollback: Rollback table')
bq.delete_table(table_id, not_found_ok=True)
copy_job = bq.copy_table(bk_table_id, table_id)
copy_job.result()
raise e
finally:
# 関数が成功しても失敗してもバックアップは最後に削除
print('@rollback: Delete backup table')
bq.delete_table(bk_table_id)
ちなみにこのロールバック処理は、バックアップテーブルのサイズと保持していた時間に対して課金が発生されます。関数内部の処理時間が早いほど安くなるという感じですね。
実例
上記デコレータはこのような感じで使っています。独自メソッドがちらほらありますが、update_all
のやりたいことは「GCS上のParquetデータを、Parquetのデータ型を利用して動的にテーブル情報を決定し、BQのテーブルとしてロードする」です。(Parquetロードの詳細に関しては、今回は割愛します。)
def update_all(table_id, gs_source_uri):
@BqClient.rollback(table_id)
def _update_all(table_id, uri):
'''
全件ロード用の関数
Parquetのメタデータでスキーマ情報を上書きしつつデータをロードする
'''
print('----- Start loading all data -----')
bq = BqClient()
job_config = BqLoadJobConfig() # LoadJobConfigの継承クラス
# 既存テーブルを削除
bq.delete_table(table_id, not_found_ok=True)
# スキーマ情報なしの空テーブル作成
bq.create_init_table(table_id)
# スキーマ変更を受け付ける設定を付与
job_config.load_setting_for_parquet()
# Parquetのデータ型を利用して、スキーマ情報を上書きしつつBQのテーブルへロード
bq.load_table_from_uri_(table_id, uri, job_config)
print('----- End loading all data -----')
_update_all(table_id, gs_source_uri)
基本的には、table_id
単位で連続して実施する処理をまとめていきます。デコレータの引数にもラッピングする関数と同じtable_id
を与えたいため、いささか冗長な書き方になっていますが、ロールバックのような共通かつ、前後で挟まれるような処理はPythonのデコレータが活きてきますね。
上例では関数で書いてしまいましたが、もちろんBqClient
クラスのメソッドにデコレータを付与して定義するのもOKでしょう。どこに変数や引数を配置すべきかあれこれ考えましたが、もっと良い書き方があればご指摘ください。m(_ _)m
最後に
BigQueryヌルヌル使えて良いですね〜。