BigQueryテーブルのロールバック処理をPythonのデコレータで実装してみた

どうも!DA部の春田です。

BigQueryと言えば、複数ステートメントでのトランザクション処理がないことで有名かと思います。

上記のブログでは、SQLやbqコマンドでの代用方法が記載されていますが、最近業務でPythonでのロールバック処理が必要となったので、本ブログではその時書いたPythonコードをご紹介します。

BQでロールバックを行うデコレータ関数

こちらが今回作成したコードです。

import uuid
from google.cloud import bigquery
from google.api_core.exceptions import NotFound


class BqClient(bigquery.Client):
    def __init__(self, project_id='haruta-takumi'):
        super().__init__(project_id)

    @classmethod
    def rollback(cls, table_id):
        '''
        データロード時などにロールバックを適用させるデコレータ
        ロード前のテーブルのバックアップとして取っておく
        '''
        bq = cls()

        def wrapper(func):
            def _wrapper(*args, **kwargs):
                # 並列実行時にバックアップテーブルの重複を避けるためのユニーク
                unique = str(uuid.uuid4())[:8]
                bk_table_id = table_id + '_bk_' + unique
                try:
                    bq.get_table(table_id)
                except NotFound:
                    # テーブルが存在しない→バックアップの必要がないのでそのまま関数を起動
                    print('@rollback: Table {} is not found'.format(table_id))
                    func(*args, **kwargs)
                else:
                    # テーブルが存在する→バックアップを作成
                    print('@rollback: Copy table for backup: {}'.format(bk_table_id))
                    copy_job = bq.copy_table(table_id, bk_table_id)
                    copy_job.result()

                    try:
                        func(*args, **kwargs)
                    except Exception as e:
                        # 関数失敗時→現在のテーブルを削除してバックアップを復元
                        print('@rollback: Rollback table')
                        bq.delete_table(table_id, not_found_ok=True)
                        copy_job = bq.copy_table(bk_table_id, table_id)
                        copy_job.result()
                        raise e
                    finally:
                        # 関数が成功しても失敗してもバックアップは最後に削除
                        print('@rollback: Delete backup table')
                        bq.delete_table(bk_table_id)
            return _wrapper

        return wrapper

ざっくり説明しますと、対象のテーブル(table_id)が存在していれば、copy_tableメソッドを使用してバックアップを作成しておき、失敗時にはバックアップテーブルに切り替えを行う、というデコレータです。以下より、構成を解説していきます。

構成解説

導入部です。使用するライブラリは、標準ライブラリのuuidと、Google公式のgoogle-cloud-bigqueryと例外処理用のNotFoundです。uuidはバックアップテーブル名をユニークにするために使用します。GCPのPythonクライアントは非常に良くできているので、Clientクラスをそのまま継承させてしまいました。BqClientクラスを呼び出す時はproject_idのみ引数で受け付けます。credentialsも欲しいなどあれば、各自で拡張してください。

import uuid
from google.cloud import bigquery
from google.api_core.exceptions import NotFound


class BqClient(bigquery.Client):
    def __init__(self, project_id='haruta-takumi'):
        super().__init__(project_id)

デコレータ定義の部分です。BqClientクラスからも直接使えるように@classmethodをくっつけておきます。デコレータの引数table_idで指定したテーブルをバックアップしておくよう処理させます。バックアップのテーブル名はユニークになるようにuuidの一部を付与しておきます。

ちなみにこのロールバック処理は並列実行を想定したものではないです。実行中のテーブルをロックしているわけではないので、並列実行時に時系列でどの段階のテーブルがバックアップされるかは不明です。

    @classmethod
    def rollback(cls, table_id):
        '''
        データロード時などにロールバックを適用させるデコレータ
        ロード前のテーブルのバックアップとして取っておく
        '''
        bq = cls()

        def wrapper(func):
            def _wrapper(*args, **kwargs):
                # 並列実行時にバックアップテーブルの重複を避けるためのユニーク
                unique = str(uuid.uuid4())[:8]
                bk_table_id = table_id + '_bk_' + unique

try-except句からロールバック処理が始まります。最初にテーブルが存在するかどうかをget_tableメソッドで確認します。存在しない場合はNot Foundが返されるので、この場合は何もせずラッピングした関数をそのまま実行します。

それ以外の場合はelseに分岐し、テーブルのバックアップを作成します。BigQueryにはテーブルコピーの機能があるので、copy_tableで対象テーブルをまるっとコピーしておきます。

その後、ラッピングした関数を実行し、エラーが発生した際は既存テーブルを削除し、バックアップテーブルを再コピーして復元します。成功・失敗どちらの場合でもバックアップテーブルが残っているので、finally句で必ず削除させるようにしておきます。

                try:
                    bq.get_table(table_id)
                except NotFound:
                    # テーブルが存在しない→バックアップの必要がないのでそのまま関数を起動
                    print('@rollback: Table {} is not found'.format(table_id))
                    func(*args, **kwargs)
                else:
                    # テーブルが存在する→バックアップを作成
                    print('@rollback: Copy table for backup: {}'.format(bk_table_id))
                    copy_job = bq.copy_table(table_id, bk_table_id)
                    copy_job.result()

                    try:
                        func(*args, **kwargs)
                    except Exception as e:
                        # 関数失敗時→現在のテーブルを削除してバックアップを復元
                        print('@rollback: Rollback table')
                        bq.delete_table(table_id, not_found_ok=True)
                        copy_job = bq.copy_table(bk_table_id, table_id)
                        copy_job.result()
                        raise e
                    finally:
                        # 関数が成功しても失敗してもバックアップは最後に削除
                        print('@rollback: Delete backup table')
                        bq.delete_table(bk_table_id)

ちなみにこのロールバック処理は、バックアップテーブルのサイズと保持していた時間に対して課金が発生されます。関数内部の処理時間が早いほど安くなるという感じですね。

実例

上記デコレータはこのような感じで使っています。独自メソッドがちらほらありますが、update_allのやりたいことは「GCS上のParquetデータを、Parquetのデータ型を利用して動的にテーブル情報を決定し、BQのテーブルとしてロードする」です。(Parquetロードの詳細に関しては、今回は割愛します。)

def update_all(table_id, gs_source_uri):

    @BqClient.rollback(table_id)
    def _update_all(table_id, uri):
        '''
        全件ロード用の関数
        Parquetのメタデータでスキーマ情報を上書きしつつデータをロードする
        '''
        print('----- Start loading all data -----')
        bq = BqClient()
        job_config = BqLoadJobConfig()  # LoadJobConfigの継承クラス

        # 既存テーブルを削除
        bq.delete_table(table_id, not_found_ok=True)
        # スキーマ情報なしの空テーブル作成
        bq.create_init_table(table_id)

        # スキーマ変更を受け付ける設定を付与
        job_config.load_setting_for_parquet()
        # Parquetのデータ型を利用して、スキーマ情報を上書きしつつBQのテーブルへロード
        bq.load_table_from_uri_(table_id, uri, job_config)
        print('----- End loading all data -----')

    _update_all(table_id, gs_source_uri)

基本的には、table_id単位で連続して実施する処理をまとめていきます。デコレータの引数にもラッピングする関数と同じtable_idを与えたいため、いささか冗長な書き方になっていますが、ロールバックのような共通かつ、前後で挟まれるような処理はPythonのデコレータが活きてきますね。

上例では関数で書いてしまいましたが、もちろんBqClientクラスのメソッドにデコレータを付与して定義するのもOKでしょう。どこに変数や引数を配置すべきかあれこれ考えましたが、もっと良い書き方があればご指摘ください。m(_ _)m

最後に

BigQueryヌルヌル使えて良いですね〜。