ストリーミングインサートで重複レコードを削除しながら BigQuery にデータをロードしてみた

2020.06.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

BigQuery にデータをロードする場合、GCS や local のデータを BigQuery ジョブでロードするケースが多いと思いますが、他に tabledata.insertAll メソッドを使用してデータを1行ずつストリーミングインサートすることができます。

やりたいこと

  • ストリーミングインサート( tabledata.insertAll )の挙動を確認したい
  • ストリーミングインサートで insertId を指定した場合の挙動を確認したい
  • insertId 指定で、本当に重複レコードが BigQuery に格納されなくなるか確認したい

前提

BigQuery Python クライアントライブラリ( insert_rows_json )経由で tabledata.insertAll メソッドをコールして、ストリーミングインサートを実行します。

Python クライアントライブラリ実行環境は準備済みで、実行時に使用するサービスアカウントには BigQuery へのデータインサートが可能な権限を付与済みです。

ストリーミングの料金と制限事項

BigQuery ジョブで GCS などからデータをロードする場合、課金は発生しませんが、tabledata.insertAll でストリーミングインサートを行う場合には若干料金がかかります。 そのため、請求先情報を登録していない GCP アカウントでは、ストリーミングインサートの実行はエラーになるとのことです。 2020/06/24 時点の東京( ashia-northeast1 )の料金は、インサートデータ 200MB ごとに $0.012 とのことなので、動作確認程度ではそれほど気にする必要はないかと思いますが、実際に大量のストリーミングデータを処理する場合にはコスト面の考慮も必要かと思います。

また、1秒あたりの最大バイト数や最大行数、1リクエストあたりの最大行数やインサートデータ1行あたりのデータサイズなどの制限事項あるので、確認が必要です。

ストリーミングインサートを実行

まずは以下の Python コードで、テーブルを作成後 insert_rows_json を使って BigQuery にデータをストリーミングインサートしてみます。

from google.cloud import bigquery

client = bigquery.Client()

table_id = "cm-da-mikami-yuki-258308.dataset_1.animal_kind"
schema = [
    bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("kind", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
errors = client.insert_rows_json(table, json_rows)
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
Created table cm-da-mikami-yuki-258308.dataset_1.animal_kind
New rows have been added.

GCP 管理コンソールからも、データがロードされたことが確認できました。

テーブル作成処理を省略して、同じデータをもう一度 insert してみます。

from google.cloud import bigquery

client = bigquery.Client()

table_id = "cm-da-mikami-yuki-258308.dataset_1.animal_kind"
table = client.get_table(table_id)

json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
errors = client.insert_rows_json(table, json_rows)
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
New rows have been added.

特に重複チェックキーを指定していないので、同じレコードが重複して insert されました。

重複チェックキーを指定

insertId を指定した場合のストリーミングインサートの挙動を確認してみます。

先ほど同様テーブル作成を行った後、row_ids パラメータで insert するデータの id 項目値を重複チェックキーに指定しました。

from google.cloud import bigquery

client = bigquery.Client()

table_id = "cm-da-mikami-yuki-258308.dataset_1.animals"
schema = [
    bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("kind", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows])
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
Created table cm-da-mikami-yuki-258308.dataset_1.animals
New rows have been added.

次に、同じく id 項目値を row_ids に指定して、重複レコードと重複していないレコードを insert してみます。 指定したキーで重複チェックされれば、重複レコードは insert されず、重複していないレコードのみ insert されるはずです。

(省略)
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
json_rows = [{"id": 1, "kind": "dog"}, {"id": 3, "kind": "bird"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows])
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
New rows have been added.

期待通り、重複していないレコードだけ insert されたことが確認できました。

同じコードをもう一度実行してみます。 今度は2レコードとも id キーが重複しているため、insert されないはずです。

期待通り、テーブルデータに変化はありません。

なお、row_ids パラメータには、レコード数分のキーを配列で指定する必要があります。 ためしに、1レコード目のキーのみ指定し、2レコード目のキーは指定なしで実行してみます。

(省略)
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
json_rows = [{"id": 1, "kind": "dog"}, {"id": 3, "kind": "bird"}]
#errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows])
errors = client.insert_rows_json(table, json_rows, row_ids=[json_rows[0]['id']])
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
Traceback (most recent call last):
  File "streaming_insert.py", line 26, in <module>
    errors = client.insert_rows_json(table, json_rows, row_ids=[json_rows[0]['id']])
  File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 2549, in insert_rows_json
    info["insertId"] = row_ids[index]
IndexError: list index out of range

insert するレコード数と row_ids で指定するキーの数が一致していない場合はエラーになりました。 パラメータの指定ミスで意図せずレコードが重複してしまうようなことはなさそうです。

さらに、以下の2点を確認してみます。

  • キーが重複するレコードを insert した場合、BigQuery に格納されるのは既にロード済みのレコードと新しく insert したレコードのどちらか
  • insert データ内にキー重複レコードがある場合でも重複チェックは行われるのか、どのレコードが insert されるのか

まずは、キーが重複するデータを insert し、すでに格納済みのレコードと新しく insert したレコードのどちらが BiQuery に格納されるか確認します。

(省略)
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 3, "kind": "bird"}]
json_rows = [{"id": 1, "kind": "inu"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows])
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
New rows have been added.

新しく insert したレコードが格納されました。

続いて insert データ内にキー重複レコードがある場合の挙動を確認してみます。

(省略)
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 3, "kind": "bird"}]
#json_rows = [{"id": 1, "kind": "inu"}]
json_rows = [{"id": 4, "kind": "dog"}, {"id": 4, "kind": "rabbit"}, {"id": 4, "kind": "hedgehog"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows])
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert.py
New rows have been added.

insert データ内のキー重複もちゃんとチェックされ、そのうちの最新(一番最後の)レコードが BigQuery に insert されました。

テンプレートテーブルを使用

ストリーミングインサートでは、テンプレートテーブルの suffix をパラメータで指定可能です。

以下の Python コードで、テンプレートテーブルをパラメータ指定した場合の挙動を確認してみます。

from google.cloud import bigquery

client = bigquery.Client()

table_id = "cm-da-mikami-yuki-258308.dataset_1.animals"
table = client.get_table(table_id)

json_rows = [{"id": 5, "kind": "pig"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows], template_suffix ='mikami')
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert_temp.py
New rows have been added.

オリジナルのテーブルにはデータは insert されていません。

パラメータで指定した suffix のテーブルが新規作成され、データが insert されました。

続いて、オリジナルテーブルとテンプレートテーブル間では重複チェックが行われるかどうか確認するため、オリジナルテーブルに格納されているのと同じデータを insert してみます。

from google.cloud import bigquery

client = bigquery.Client()

table_id = "cm-da-mikami-yuki-258308.dataset_1.animals"
table = client.get_table(table_id)

#json_rows = [{"id": 5, "kind": "pig"}]
json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows], template_suffix ='mikami')
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert_temp.py
New rows have been added.

先ほど同様、オリジナルテーブルのデータには変更がありません。

テンプレートテーブルにはデータが insert されました。

さらに、テンプレートテーブル内でもちゃんと重複チェックが行われるか確認してみます。

from google.cloud import bigquery

client = bigquery.Client()

table_id = "cm-da-mikami-yuki-258308.dataset_1.animals"
table = client.get_table(table_id)

#json_rows = [{"id": 5, "kind": "pig"}]
#json_rows = [{"id": 1, "kind": "dog"}, {"id": 2, "kind": "cat"}]
json_rows = [{"id": 1, "kind": "wanko"}, {"id": 2, "kind": "cat"}]
errors = client.insert_rows_json(table, json_rows, row_ids=[row['id'] for row in json_rows], template_suffix ='mikami')
if errors == []:
    print("New rows have been added.")
(test_bq) [ec2-user@ip-10-0-43-239 test_stream]$ python streaming_insert_temp.py
New rows have been added.

テンプレートテーブルでも、ちゃんと後勝ちで重複チェックが実行されていることが確認できました。

ストリーミングデータ insert 後にオリジナルテーブルと結合などの処理をしたり、suffix に日付を指定して日付単位でテーブルを分割したりする場合に便利そうです。

まとめ(所感)

ストリーミングデータを扱う場合、データ送信元起因の重複データの考慮が必要なケースが多いと思います。

BigQuery ではトランザクション処理がサポートされていないため、エラー発生時のロールバック含めて自前で重複レコード削除処理を実装するのは少し手間がかかりますが、tabledata.insertAll メソッドで insertId を指定するだけで、自動的に最新(後から insert した)データ優先で重複削除してロードすることができました。

他にもオリジナルテーブルを元にしたテンプレートテーブルの自動作成もでき、ストリーミングデータ処理に適した便利なインターフェースだと思いました。

参考