BigQueryのテーブルをParquet出力する(Python / Apache Beam / Dataflow)

良い子のみなさーん!列指向でスキーマ情報も柔軟に変更できる万能なファイルフォーマットと言えばー?

せーの!?

「「「Parquet!!」」」

そんな男は黙ってApache Parquetな皆さんに向けて、BigQueryテーブルのParquet出力方法をご紹介していきたいと思います。

ちなみにですが、BigQuery自体にはなんとParquet出力のオプションがありません。2020年12月現在は、CSV、JSON、Avroのみです。Avroもリッチなファイルフォーマットですが、行指向のためそのまま分析業務に使用するのは不向きです。

BigQueryのテーブルをParquet出力するには、Google Cloud Dataflowを使用する必要があるのですが、その実装が結構骨折れるんですよね…。

Google Cloud DataflowとApache Beam

Dataflowは、データ処理・バッチ処理を得意としたサーバレスコンピューティングサービスです。バッチ処理において割と頭を使うプロビジョニングやスケーリングを自動化してくれます。

Dataflowの一番の特徴といえば、オープンソースのApache Beamをエンジンに採用している点でしょうか。Apache Beamはバッチ・ストリーミング処理のための統合プログラミングモデルで、大規模なデータ処理パイプラインをシンプルに記述することができます。

※出典:Apache Flink: Apache Beam: How Beam Runs on Top of Flink

興味深い話、Googleは過去OSSコミュニティとのイノベーションの共有に関して冷淡な姿勢を取っていたそうですが、このDataflowを開発する際にApache Beamを共同で立ち上げ、イノベーションを共有していく方向へ転換したそうです。

Apache BeamのコンセプトはDataflowの公式ドキュメントにまとめられているので、興味ある方は見てみてください。

今回はデータ変換処理を行わず、テーブルをParquet形式でエクスポートするだけなので、Apache Beamを深く理解していなくても実装可能です。

事前準備

  • 環境
    • Python 3.8.6
    • google-cloud-bigquery==2.4.0
    • google-apitools==0.5.31
    • pyarrow==1.0.1
    • apache-beam==2.24.0

このPythonライブラリのバージョン依存関係が結構シビアなんですよね。上に加えてpandasも使おうとすると更に競合関係が増えるかと思います。pip installで以下のようなErrorが出ても、動けばOKと思っておいてください。

apache-beam 2.24.0 requires pyarrow<0.18.0,>=0.15.1; python_version >= "3.0" or platform_system != "Windows", but you'll have pyarrow 1.0.1 which is incompatible.

BigQuery側にも適当にテーブルとデータを用意しておきます。まずはデータセットを作成します。今回はデータセット名はtmpで、ロケーションはUSにしておきました。

その後以下のクエリを発行して、BigQuery公式パブリックデータのwikipediaから100件だけ抽出してテーブルを作成します。

CREATE TABLE tmp.test_arquet_export AS
SELECT * FROM bigquery-public-data.samples.wikipedia LIMIT 100;

このtmp.test_parquet_exportのデータをParquet出力していきます。Parquetの出力には、Dataflowで使用するGCSのバケットが必要なので、test_parquet_exportという名前で作成しました。設定は全部デフォルトです。

続いて、使用するプロジェクトのBigQuery APIとDataflow APIを有効にしておきます。コンソールのAPI & Servicesから、ENABLE API AND SERVICESをクリックし、BigQuery APIとDataflow APIを追加してください。

最後に、権限の設定を行います。以下の権限を追加すればOKです。Dataflowを動かすための権限数って結構多いんですよねー。

使用するPythonコード

作成したPythonコードは以下の通りです。

# bq_parquet_export.py
import pyarrow as pa
import apache_beam as beam

from google.cloud import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions



class DataFlowOptions():
    def __init__(self, project_id, service_account_email, job_name, staging_location, temp_location):
        self.options = PipelineOptions()
        self.options.view_as(GoogleCloudOptions).project = project_id
        self.options.view_as(GoogleCloudOptions).job_name = job_name
        self.options.view_as(GoogleCloudOptions).staging_location = staging_location
        self.options.view_as(GoogleCloudOptions).temp_location = temp_location
        self.options.view_as(GoogleCloudOptions).region = 'us-central1'
        self.options.view_as(GoogleCloudOptions).service_account_email = service_account_email
        self.options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'  # スループットに合わせて自動でスケーリング
        self.options.view_as(StandardOptions).runner = 'DataflowRunner'  # データフローのみでの実行を想定


class BqClient(bigquery.Client):
    def __init__(self, project_id, service_account_email):
        self.project_id = project_id
        self.service_account_email = service_account_email
        super().__init__(self.project_id)

    def unload_table_as_parquet(self, table_id, job_name, gs_bucket, gs_staging, gs_temp, gs_output):
        staging_location = 'gs://{}/{}'.format(gs_bucket, gs_staging)
        temp_location = 'gs://{}/{}'.format(gs_bucket, gs_temp)
        output_location = 'gs://{}/{}'.format(gs_bucket, gs_output)
        query = "SELECT * FROM {}".format(table_id)
        print('Start unload from {}'.format(table_id))
        print('               to {}'.format(output_location))
        # BQのデータ型 → Parquetのデータ型にマッピング
        data_type_mapping = {
            'STRING': pa.string(),
            'BYTES': pa.string(),
            'INTEGER': pa.int64(),
            'NUMERIC': pa.decimal128(18, 2),  # fixed_len_byte_array化を避けるための18,2
            'FLOAT': pa.float64(),
            'BOOLEAN': pa.bool_(),
            'TIMESTAMP': pa.timestamp(unit='s'),
            'DATE': pa.date64(),
            'DATETIME': pa.timestamp(unit='s'),
            #'ARRAY': pa.list_(),
            #'STRUCT': pa.struct()
        }
        table = self.get_table(table_id)
        print([(f.name, f.field_type) for f in table.schema])
        parquet_schema = pa.schema(
            [pa.field(f.name, data_type_mapping[f.field_type]) for f in table.schema])
        # オプションの設定
        options = DataFlowOptions(self.project_id, self.service_account_email, job_name, staging_location, temp_location)
        p = beam.Pipeline(options=options.options)
        # DataFlowのPipelineを定義
        # Apache Beamは記法がシェルっぽく独特
        (
            p
            | 'Input: ReadTable' >> beam.io.gcp.bigquery.ReadFromBigQuery(
                query=query,
                use_standard_sql=True)
            | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
                file_path_prefix=output_location,
                schema=parquet_schema,
                file_name_suffix='.parquet')
        )

        print('Run pipeline')
        print('Query: {}'.format(query))
        result = p.run()
        result.wait_until_finish()
        print('Pipeline successfully finished')


if __name__ == '__main__':
    table_id = 'tmp.test_parquet_export'
    job_name = 'test-parquet-export'
    gs_bucket = 'test_parquet_export'
    gs_staging = 'dataflow/staging/'
    gs_temp = 'dataflow/temp/'
    gs_output = 'dataflow/output/'
    bq = BqClient(project_id='haruta-takumi', service_account_email='**************@haruta-takumi.iam.gserviceaccount.com')
    bq.unload_table_as_parquet(table_id, job_name, gs_bucket, gs_staging, gs_temp, gs_output)

解説は後回しとしまして、とりあえず実行してみます。if __name__ == '__main__':内の各種変数を設定してもらえれば、python bq_parquet_export.pyでデフォルトのサービスアカウントのPathから認証情報を自動で取得してDataflowワーカーが起動します。Dataflowのコンソールからログの確認も可能です。

出力ファイルは指定したGCSバケットに配置されます。ダウンロードして中身を確認してみると、ちゃんとParquetで出力されていることが確認できます。

$ parquet-tools head dataflow_output_-00000-of-00001.parquet
title = Cricket
id = 25675557
language = 
wp_namespace = 0
revision_id = 171533936
contributor_ip = 128.36.84.151
timestamp = 1195079595
num_characters = 56665

title = Sepp Herberger
id = 1465240
language = 
wp_namespace = 0
revision_id = 58775695
contributor_ip = 132.187.253.19
timestamp = 1150385515
num_characters = 1295

title = Necronomicon (H. R. Giger)
id = 10972018
language = 
wp_namespace = 0
revision_id = 209477280
contributor_ip = 87.41.57.166
timestamp = 1209657004
num_characters = 1490

title = Daisy Meadows
id = 8627561
language = 
wp_namespace = 0
revision_id = 247988524
contributor_ip = 86.156.164.72
timestamp = 1225118943
num_characters = 3271

title = Angela Bassett
id = 508609
language = 
wp_namespace = 0
revision_id = 156106750
contributor_ip = 209.116.125.226
timestamp = 1189105044
num_characters = 12867

デモが見れたところで、コードの構成を解説していきます。

コード解説

使用するライブラリは、先ほどご紹介した通り、pyarrowapache_beambigqueryの3つです。pyarrowはParquet出力、apache_beamはDataFlowの制御、bigqueryはテーブルの読み込みに使用しています。

Apache BeamでDataflowを起動する場合は、かなりの数のオプションの指定が必要です。今回はクラスのインスタンス変数としてまとめてしまいました。Parquet出力用の関数は、bigquery.Clientを継承したBqClientクラスに定義していきます。

import pyarrow as pa
import apache_beam as beam

from google.cloud import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions


class DataFlowOptions():
    def __init__(self, project_id, service_account_email, job_name, staging_location, temp_location):
        self.options = PipelineOptions()
        self.options.view_as(GoogleCloudOptions).project = project_id
        self.options.view_as(GoogleCloudOptions).job_name = job_name
        self.options.view_as(GoogleCloudOptions).staging_location = staging_location
        self.options.view_as(GoogleCloudOptions).temp_location = temp_location
        self.options.view_as(GoogleCloudOptions).region = 'us-central1'
        self.options.view_as(GoogleCloudOptions).service_account_email = service_account_email
        self.options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'  # スループットに合わせて自動でスケーリング
        self.options.view_as(StandardOptions).runner = 'DataflowRunner'  # データフローのみでの実行を想定


class BqClient(bigquery.Client):
    def __init__(self, project_id, service_account_email):
        self.project_id = project_id
        self.service_account_email = service_account_email
        super().__init__(self.project_id)

Parquet出力用のメソッドです。前半部分ではDataflowで使用するGCSのパスの指定や、SQLクエリの作成を行っています。その後、BigQueryのカラム型をParquetのカラム型に変換するためのマッパーを作成します。この調節が結構大変なポイントですね…。使用できる型がpyarrowのバージョンにかなり依存しているので、実際に導入される際は、Apache Arrowの仕様、Parquetの仕様、BigQueryの仕様の3つをよく照らし合わせてください。

BqClientのget_tableで出力するテーブルのスキーマ情報を取得し、Parquet用のカラム型に変換していきます。

    def unload_table_as_parquet(self, table_id, job_name, gs_bucket, gs_staging, gs_temp, gs_output):
        staging_location = 'gs://{}/{}'.format(gs_bucket, gs_staging)
        temp_location = 'gs://{}/{}'.format(gs_bucket, gs_temp)
        output_location = 'gs://{}/{}'.format(gs_bucket, gs_output)
        query = "SELECT * FROM {}".format(table_id)
        print('Start unload from {}'.format(table_id))
        print('               to {}'.format(output_location))
        # BQのデータ型 → Parquetのデータ型にマッピング
        data_type_mapping = {
            'STRING': pa.string(),
            'BYTES': pa.string(),
            'INTEGER': pa.int64(),
            'NUMERIC': pa.decimal128(18, 2),  # fixed_len_byte_array化を避けるための18,2
            'FLOAT': pa.float64(),
            'BOOLEAN': pa.bool_(),
            'TIMESTAMP': pa.timestamp(unit='s'),
            'DATE': pa.date64(),
            'DATETIME': pa.timestamp(unit='s'),
            #'ARRAY': pa.list_(),
            #'STRUCT': pa.struct()
        }
        table = self.get_table(table_id)
        print([(f.name, f.field_type) for f in table.schema])
        parquet_schema = pa.schema(
            [pa.field(f.name, data_type_mapping[f.field_type]) for f in table.schema])

そして、先ほど定義したDataFlowOptionsクラスを使用してオプションを作成し、Apache Beamのパイプラインを定義します。このパイプラインの書き方がシェルを意識しているようで、かなり独特です。

Input: ReadTableはDataflow側で表示されるステップ名となり、ここではBigQueryのテーブルを読み込んでいます。ReadFromBigQuery には実に様々なオプションが用意されていますが、SQLを使用するのが一番融通が効くかなと思います。Output: Export to Parquetでは、pyarrowを用いたParquet出力を定義しており、先ほどマッピングしたスキーマ情報と出力先を指定します。

パイプラインを定義したら、run()メソッドで非同期実行し、wait_until_finish()を続けて実行することでジョブが完了するまで待機することができます。

        # オプションの設定
        options = DataFlowOptions(self.project_id, self.service_account_email, job_name, staging_location, temp_location)
        p = beam.Pipeline(options=options.options)
        # DataFlowのPipelineを定義
        # Apache Beamは記法がシェルっぽく独特
        (
            p
            | 'Input: ReadTable' >> beam.io.gcp.bigquery.ReadFromBigQuery(
                query=query,
                use_standard_sql=True)
            | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
                file_path_prefix=output_location,
                schema=parquet_schema,
                file_name_suffix='.parquet')
        )

        print('Run pipeline')
        print('Query: {}'.format(query))
        result = p.run()
        result.wait_until_finish()
        print('Pipeline successfully finished')

Parquet出力の所感

ここまで実装するのに、正直かなり時間がかかりました。Apache Beamのクセが強く保守性があまり良くないので、今後BigQueryの機能としてParquet出力が登場したら、そっちに切り替えたいところですね。