GCS の Parquet データを BigQuery から参照してみた ~ Parquet データのロードと外部テーブルによる参照~

2020.04.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

やりたいこと

  • GCS に配置した Parquet データを BigQuery で参照したい
  • BigQuery に Parquet データをロードしたい
  • BigQuery に Parquet データをロードする場合、意図通りのデータ型を自動検出してテーブル作成してくれるかどうか確認したい
  • BigQuery に Parquet データをロードする場合の制限事項を知りたい
  • パーティショニングされた Parquet データを BigQuery から参照するにはどうすればいいのか知りたい

Parquet データを準備

以下のサイトで動作確認用の CSV データを作成しました。

できたサンプルデータはこんな感じです。

id,name,kana,postal,tel,mail,password,ip,url,create_timestamp,reg_date,valid,bit,code
1,大和 貫一,ヤマト カンイチ,262-8517,080-6081-7739,pzRz6cx6B@test.org,mp5x_0uQ,150.110.95.198,http://sample.net,1988/2/2 7:09:28,1991/1/28,False,0,2607
2,篠原 茂志,シノハラ シゲシ,025-6473,090-7899-2524,BWeLhiJPN@test.jp,rzFsiiYl,16.169.224.193,http://test.jp,2008/11/15 8:42:14,1972/4/25,True,0,6750
3,近江 智恵子,オウミ チエコ,129-0310,080-6093-0838,qDnzBhS@test.net,QAOoF2NK,161.55.51.87,http://test.com,2016/4/14 18:47:33,2000/8/19,True,1,4208
4,藤原 長治,フヂワラ チョウジ,138-8408,090-7532-4618,CGxHpmaS2@sample.net,uwSE1kHr,171.132.206.6,http://sample.net,2018/11/18 12:18:54,1972/3/4,True,0,8333
5,湯川 秀明,ユカワ ヒデアキ,326-4046,080-1220-3065,IgJh4@example.com,lOuOvfr1,247.118.177.127,http://test.net,2015/11/26 23:13:43,1981/4/2,False,0,3527
6,仁平 章,ヒトヘイ アキラ,142-2086,080-8072-0869,IPMKUcb9J@test.co.jp,FkLBw3NQ,94.42.204.93,http://sample.com,2004/5/27 19:59:07,2012/5/17,True,0,7302
7,芦沢 治,アシザワ オサム,400-1426,080-1336-6892,R2wkyu@sample.net,orBU7WTv,15.158.107.210,http://sample.com,2008/10/28 18:02:33,1973/8/5,False,1,5045
8,甲田 隆之,コウダ タカユキ,001-5573,090-1434-8132,R8kzaAnh@sample.net,PbqK_QQf,84.66.207.170,http://example.net,2015/7/10 15:22:05,1972/11/4,False,0,3327
9,萩原 佳奈,ハギワラ ヨシナ,979-5021,090-8108-0632,XZcZdl0@test.co.jp,AR_OdmXO,13.55.171.197,http://example.com,2015/12/5 12:16:58,1987/3/13,False,1,279
10,大岡 桃香,オオオカ モモカ,691-8324,090-2101-2791,BTZSU@example.co.jp,J4rR75pj,11.97.118.39,http://sample.net,1993/1/25 8:29:46,2009/4/4,True,0,5110

※データ型やマルチバイト文字確認のために個人情報っぽい CSV になりましたが、全てツールで作成した疑似データです。

続いて、以下の Python コードで CSV → Parquet に変換しました。

import argparse
import os.path

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

parser = argparse.ArgumentParser(description='conv file2parquet')
parser.add_argument('file', help='file name')
args = parser.parse_args()
file_name = os.path.splitext(os.path.basename(args.file))[0]
file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), args.file)

df = pd.read_csv(file_path)
table = pa.Table.from_pandas(df)
pq.write_table(table, './{}.parquet'.format(file_name))

Parquet データを BigQuery にロード

作成した Parquet ファイルを GCS に配置して、BigQuery にロードしてみます。

Python クライアントライブラリを使用した以下のコードを実行しました。

from google.cloud import bigquery

client = bigquery.Client()
dataset_id = 'dataset_2'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.PARQUET
uri = "gs://test-mikami/data_test/sample.parquet"
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("load_parquet"), job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("load_parquet"))
print("Loaded {} rows.".format(destination_table.num_rows))
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet.py
Starting job d3bf92f2-3b30-4a19-b9b9-ef6ab8f5503e
Job finished.
Loaded 10 rows.

念のためデータを確認してみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq query --use_legacy_sql=false 'SELECT * FROM `cm-da-mikami-yuki-258308`.dataset_2.load_parquet ORDER BY id'
Waiting on bqjob_r6503d3677d28bcbd_00000171a0414e50_1 ... (0s) Current status: DONE
+----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+
| id |     name    |       kana        |  postal  |      tel      |         mail         | password |       ip        |        url         |  create_timestamp   | reg_date  | valid | bit | code |
+----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+
|  1 | 大和 貫一   | ヤマト カンイチ   | 262-8517 | 080-6081-7739 | pzRz6cx6B@test.org   | mp5x_0uQ | 150.110.95.198  | http://sample.net  | 1988/2/2 7:09:28    | 1991/1/28 | false |   0 | 2607 |
|  2 | 篠原 茂志   | シノハラ シゲシ   | 025-6473 | 090-7899-2524 | BWeLhiJPN@test.jp    | rzFsiiYl | 16.169.224.193  | http://test.jp     | 2008/11/15 8:42:14  | 1972/4/25 |  true |   0 | 6750 |
|  3 | 近江 智恵子 | オウミ チエコ     | 129-0310 | 080-6093-0838 | qDnzBhS@test.net     | QAOoF2NK | 161.55.51.87    | http://test.com    | 2016/4/14 18:47:33  | 2000/8/19 |  true |   1 | 4208 |
|  4 | 藤原 長治   | フヂワラ チョウジ | 138-8408 | 090-7532-4618 | CGxHpmaS2@sample.net | uwSE1kHr | 171.132.206.6   | http://sample.net  | 2018/11/18 12:18:54 | 1972/3/4  |  true |   0 | 8333 |
|  5 | 湯川 秀明   | ユカワ ヒデアキ   | 326-4046 | 080-1220-3065 | IgJh4@example.com    | lOuOvfr1 | 247.118.177.127 | http://test.net    | 2015/11/26 23:13:43 | 1981/4/2  | false |   0 | 3527 |
|  6 | 仁平 章     | ヒトヘイ アキラ   | 142-2086 | 080-8072-0869 | IPMKUcb9J@test.co.jp | FkLBw3NQ | 94.42.204.93    | http://sample.com  | 2004/5/27 19:59:07  | 2012/5/17 |  true |   0 | 7302 |
|  7 | 芦沢 治     | アシザワ オサム   | 400-1426 | 080-1336-6892 | R2wkyu@sample.net    | orBU7WTv | 15.158.107.210  | http://sample.com  | 2008/10/28 18:02:33 | 1973/8/5  | false |   1 | 5045 |
|  8 | 甲田 隆之   | コウダ タカユキ   | 001-5573 | 090-1434-8132 | R8kzaAnh@sample.net  | PbqK_QQf | 84.66.207.170   | http://example.net | 2015/7/10 15:22:05  | 1972/11/4 | false |   0 | 3327 |
|  9 | 萩原 佳奈   | ハギワラ ヨシナ   | 979-5021 | 090-8108-0632 | XZcZdl0@test.co.jp   | AR_OdmXO | 13.55.171.197   | http://example.com | 2015/12/5 12:16:58  | 1987/3/13 | false |   1 |  279 |
| 10 | 大岡 桃香   | オオオカ モモカ   | 691-8324 | 090-2101-2791 | BTZSU@example.co.jp  | J4rR75pj | 11.97.118.39    | http://sample.net  | 1993/1/25 8:29:46   | 2009/4/4  |  true |   0 | 5110 |
+----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+

正常にロードできています。

テーブルスキーマを確認してみると bool 型は自動判定してくれましたが、TIMESTAMP 型を判定してくれてないようです。。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq show --schema --format=prettyjson cm-da-mikami-yuki-258308:dataset_2.load_parquet
[
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "kana",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "postal",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "tel",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "mail",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "password",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ip",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "create_timestamp",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "reg_date",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "valid",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "bit",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "code",
    "type": "INTEGER"
  }
]

日付フォーマットの問題かと、元の CSV ファイル日付項目もフォーマットをハイフン( - )区切りに変更してみましたが、一向に TIMESTAMP 型で検出してくれず。。

ドキュメント確認すると、Parquet → BigQuery ロード時の型変換に関する記載がありました。

ということは、Parquet ファイルできちんと型定義ができていなかった?

CSV から Parquet に変換する Python コードに Parquet のデータ型定義を追加して、再度 Parquet ファイルを作成しました。

(省略)
column_types = {
    'id': int64(),
    'name': string(),
    'kana': string(), 
    'postal': string(), 
    'tel': string(), 
    'password': string(), 
    'ip': string(), 
    'url': string(),
    'create_timestamp': timestamp('s'),
    'reg_date': timestamp('s'),
    'valid': bool_(),
    'bit': float32(),
    'code': float64(),
}
(省略)

実行してみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet.py
Starting job 5e0aee11-aa45-4e5a-8514-75e64bbff435
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq query --use_legacy_sql=false 'SELECT create_timestamp, reg_date FROM `cm-da-mikami-yuki-258308`.dataset_2.load_parquet_3 ORDER BY id'
Waiting on bqjob_r5b022bd287d25855_00000171a0e6b1da_1 ... (0s) Current status: DONE
+---------------------+---------------------+
|  create_timestamp   |      reg_date       |
+---------------------+---------------------+
| 1988-02-02 07:09:28 | 1991-01-28 00:00:00 |
| 2008-11-15 08:42:14 | 1972-04-25 00:00:00 |
| 2016-04-14 18:47:33 | 2000-08-19 00:00:00 |
| 2018-11-18 12:18:54 | 1972-03-04 00:00:00 |
| 2015-11-26 23:13:43 | 1981-04-02 00:00:00 |
| 2004-05-27 19:59:07 | 2012-05-17 00:00:00 |
| 2008-10-28 18:02:33 | 1973-08-05 00:00:00 |
| 2015-07-10 15:22:05 | 1972-11-04 00:00:00 |
| 2015-12-05 12:16:58 | 1987-03-13 00:00:00 |
| 1993-01-25 08:29:46 | 2009-04-04 00:00:00 |
+---------------------+---------------------+
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq show --schema --format=prettyjson cm-da-mikami-yuki-258308:dataset_2.load_parquet_3
[
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "kana",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "postal",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "tel",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "mail",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "password",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ip",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "create_timestamp",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "reg_date",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "valid",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "bit",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "code",
    "type": "FLOAT"
  }
]

今度は、きちんと Parquet で定義したデータ型通りにテーブルスキーマが自動作成されました。

圧縮した Parquet データを BigQuery にロード

ドキュメントによると、Parquet データでは、以下の圧縮形式をサポートしているとのことです。

  • Snappy
  • GZip
  • LZO_1C and LZO_1X

SNAPPYGZIPLZ4ZSTD 4つの圧縮形式のファイルを準備しました。

それぞれ、BigQuery にロードしてみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py SNAPPY
Starting job d07720c7-107d-43bd-b08e-ff40caddca53
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py GZIP
Starting job 558c2d7d-4970-4591-b787-3442b811106c
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py LZ4
Starting job a6406969-5e65-4ec7-a932-0488ea8d7368
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py ZSTD
Starting job 6bd4a1a4-705a-423e-9ea0-cc6ab258735b
Job finished.
Loaded 10 rows.

LZ4ZSTD はドキュメントには記載がなかった形式ですが、ロード成功したようです。

データ内容も確認しましたが、問題なくロードできていました。

ドキュメントに記載のない圧縮形式でもロードはできましたが、データ型のバリエーションなど十分なサンプルデータではないので、サポート明記のある圧縮形式データを使用した方が無難でしょうか。

CSV と Parquet データロード時間を比較

体感的に、Parquet データのロードの場合、CSV や JSON データをロードした時よりも時間がかかっているように感じました。

ロードファイルが CSV の場合と、圧縮なしの Parquet の場合、SNAPPY 圧縮の Parquet の場合で、ロード処理処理時間にどのくらい差分があるのか確認してみます。

データロード用の Python コードを、テーブルがない場合は新規作成、ある場合は追記モードでロードするよう変更し、ロード完了のタイミングで処理時間を出力するよう修正しました。

from google.cloud import bigquery
import time

t1 = time.time()
(省略)
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("load_csv"), job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished. -> {} sec".format(time.time()-t1))

destination_table = client.get_table(dataset_ref.table("load_csv"))
print("Loaded {} rows.".format(destination_table.num_rows))

まずは CSV ファイルをロードします。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 13369eaa-2784-4b52-8037-32ffccd9a883
Job finished. -> 1.900064468383789 sec
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 0b222a3e-306a-4b1d-9622-6fd9af32d3d6
Job finished. -> 2.299731731414795 sec
Loaded 20 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 7562ade7-a855-4347-bf4e-9221526df9ca
Job finished. -> 1.862321376800537 sec
Loaded 30 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 3363bb79-65a5-4d0b-831b-17bb560c377c
Job finished. -> 1.6943325996398926 sec
Loaded 40 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 77b740d4-4c8a-46fb-8acd-3bcc5bb89c48
Job finished. -> 1.9005563259124756 sec
Loaded 50 rows.

クライアントライブラリ経由なので API 通信が必要になり、ネットワーク速度の影響もあるため純粋にデータロード時間だけというわけではありませんが、CSV ファイルの場合、ロード時間はだいたい 2 秒ほどでした。

次に、圧縮なしの Parquet ファイルを同様にロードしてみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job 0426ca2b-e9f2-4fe0-a90d-a804b4272d01
Job finished. -> 8.229813814163208 sec
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job 7837e5a7-fe70-43cf-8538-75740b48be0b
Job finished. -> 8.890910625457764 sec
Loaded 20 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job b8a69558-b8e6-4ef8-b783-f0bf5b24d8c1
Job finished. -> 14.763366937637329 sec
Loaded 30 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job ef34757c-af56-47e3-aa5e-58ed0282f879
Job finished. -> 6.311228036880493 sec
Loaded 40 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job de632b3b-e65c-4c48-bbdf-20d964becc99
Job finished. -> 9.726810693740845 sec
Loaded 50 rows.

API通信の問題か外れ値はあるものの、約 8 秒ほどかかっています。。

最後に、SNAPPY 圧縮の Parquet データです。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 0a9c59a2-5b37-440f-a5c1-402fa37c29e0
Job finished. -> 11.509429454803467 sec
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 72733262-a77c-445f-810b-374169abf989
Job finished. -> 6.40911602973938 sec
Loaded 20 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 091a6961-3420-4cf6-be80-3da06206522f
Job finished. -> 6.870298385620117 sec
Loaded 30 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 8c3061ca-d7b6-4248-8b7f-e24dcba9496f
Job finished. -> 10.682614088058472 sec
Loaded 40 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job a12066af-c910-4dbb-97be-38a46f3e9fc2
Job finished. -> 11.705443859100342 sec
Loaded 50 rows.

圧縮なしの Parquet よりも、1 秒くらい多く時間がかかっているようです。。

列指向の BigQuery なので、同じく列指向フォーマットの Parquet の方がロード処理が速いかと思いましたが、データロード時にはおそらく型変換などのために一度ファイルデータをパースする必要があるため、シンプルなフォーマットのソースデータファイルの方がより高速に処理できるようです。

GCS 上でパティショニングされた Parquet ファイルをロード

Hive や Spark で使用される Parquet ファイルは、ディレクトリパスに年、月、日などの指定を含むパーティショニング状態で配置されることが多いと思います。

BigQuery へのデータロードでは、Parquet に限らず、パーティショニングデータのロードをオプションで指定することができます。

以下の、hive_partitioning オプション指定のある Python コードで Parquet データをロードしてみます。

※2020/04 現在、Python クライアントライブラリでは hive_partitioning オプションはベータ版とのことです。

(省略)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.PARQUET
opt = bigquery.external_config.HivePartitioningOptions()
opt.mode = 'AUTO'
opt.source_uri_prefix = 'gs://test-mikami/data_test/ext_parquet/'
job_config.hive_partitioning = opt
uri = 'gs://test-mikami/data_test/ext_parquet/*'
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("load_parquet_partition"), job_config=job_config
)  # API request
(省略)

GCS には、/dt=2020-04-21//dt=2020-04-22//dt=2020-04-23/ の3つのパスの配下に、それぞれ1ファイルずつ配置してある状態です。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ gsutil ls -r gs://test-mikami/data_test/ext_parquet
gs://test-mikami/data_test/ext_parquet/:
gs://test-mikami/data_test/ext_parquet/

gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/sample_p1.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/sample_p2.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/sample_p3.parquet

実行してみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py
Starting job 1c9fcdf3-89dd-4a43-9323-0c114eb87451
Job finished.
Loaded 10 rows.

パス分割されていた全ての配置ファイルのデータがロードされていることが確認できました。

hive_partitioning オプションを指定する場合、Parquet ファイルの配置パスは Hive パーティショニングレイアウトに従う必要があります。

試しに、パスが不正なファイルを追加してみます。

uri_prefix の直下と dt=YYYY-MM-DD を含まないパスの下にも、Parquet ファイルを配置しました。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ gsutil ls -r gs://test-mikami/data_test/ext_parquet
gs://test-mikami/data_test/ext_parquet/:
gs://test-mikami/data_test/ext_parquet/
gs://test-mikami/data_test/ext_parquet/sample_p1.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/sample_p1.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/sample_p2.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/sample_p3.parquet

gs://test-mikami/data_test/ext_parquet/work/:
gs://test-mikami/data_test/ext_parquet/work/
gs://test-mikami/data_test/ext_parquet/work/sample_p1.parquet

ロード実行してみると

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py
Starting job 8bf21632-6f36-432e-aa2b-14290245d773
Traceback (most recent call last):
  File "load_parquet_partition.py", line 21, in <module>
    load_job.result()  # Waits for table load to complete.
  File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 818, in result
    return super(_AsyncJob, self).result(timeout=timeout)
  File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/api_core/future/polling.py", line 127, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Partition keys should be invariant from table creation across all partitions, with the number of partition keys held constant with invariant names.  Expected 0 partition keys ([]), but 1 ([dt]) were encountered along path /bigstore/test-mikami/data_test/ext_parquet/dt=2020-04-22.

パーティションキーが不正だと怒られました。。

なお、ロード対象のソースデータファイルをワイルドカードで指定すれば、パーティションキーにかかわらず、複数ファイルを一度にロードすることもできます。

(省略)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.PARQUET
#opt = bigquery.external_config.HivePartitioningOptions()
#opt.mode = 'AUTO'
#opt.source_uri_prefix = 'gs://test-mikami/data_test/ext_parquet/'
#job_config.hive_partitioning = opt
uri = 'gs://test-mikami/data_test/ext_parquet/*'
load_job = client.load_table_from_uri(
#    uri, dataset_ref.table("load_parquet_partition"), job_config=job_config
    uri, dataset_ref.table("load_parquet_multi"), job_config=job_config
)  # API request
(省略)
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py
Starting job 1e93d04f-ead5-4dbe-8b6f-28adae1aef43
Job finished.
Loaded 16 rows.

ファイルストレージ上のパーティショニングレイアウトに一致するファイルのみをロードしたい場合は、パーティショニングオプションを指定しておくと良さそうです。

パーティショニングされた Parquet ファイルを参照する外部テーブルを作成

BigQuery では、GCS 上のファイルなどの外部リソースに対して、通常のテーブル同様に SQL クエリを実行することができます。

先ほどは GCS から BigQuery にデータをロードして参照しましたが、GCS にパーティショニングされている Parquet ファイルをロードなしで直接参照する、外部テーブルを定義してみます。

※現時点では、Python クライアントライブラリ経由での Parquet ファイルの外部テーブル作成は未サポートのようです。

BigQuery 管理画面から、「ソースデータパーティショニング」を指定して、「外部テーブル」を作成しました。

SQL を実行してデータを確認してみます。

複数のパスに配置された全ての Parquet ファイルのデータが参照可能なことが確認できました。

外部テーブル作成後に GCS に dt=YYYY-MM-DD フォーマットの新しいパスを作成し、Parquet ファイルを追加して、再度 BigQuery テーブルに SQL クエリを実行してみます。

後から追加した Parquet ファイルのデータも BigQuery の外部テーブルで参照できることが確認できました。

まとめ(所感)

特にデータ分析業務では Parquet ファイルを扱うケースも多々あるかと思います。

BigQuery で外部テーブルを定義しておけば、例えば他のシステムからデイリーで Parquet ファイルが出力される場合など、ロード処理なしに GCS 上のパーティショニングデータを BigQuery のテーブルデータとして参照できるので、便利なのではないかと思いました。

参考