Cloud DataflowでGCS→BQのパイプラインをPythonで試してみる

2022.05.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

GoogleCloudのETLサービスのDataflowを調査する機会があり、Google Cloud Storage(以降GCS)にあるCSVファイルをBigQueryにロードする処理をPythonスクリプトで試してみましたのでまとめます。

Cloud Storage Text to BigQuery

DataflowはマネージドなApache Beam Runnerでプログラミング言語としてはJava,Python,Goを使うことができます。今回やろうとしている「Cloud Storage Text to BigQuery」に関してはGoogle提供のバッチテンプレートがありますが、このテンプレートはJavaで記述されているため使用せずPythonで実装してみます。

環境

  • Python 3.9.10
  • apache-beam 2.37.0

GCSからBigQueryへのロードを実装

Apache BeamのRunnerとして最終的にDataflowを使いますが、開発する際はRunnerはローカル実行をします。したがって Cloud Storage Text to BigQueryを実装するために以下の手順で進めます。

  1. GCSからBigQueryへロードするApache BeamのPythonスクリプトを記述する
  2. 記述したPythonスクリプトをローカル実行で試してみる
  3. Google CloudのDataflowをRunnerとしてPythonスクリプトを実行してみる

今回GCSからBigQueryに移行しますが、ソースとなるGCSのパスはgs://example_bucket_name/jp_weather_2021.csvでターゲットとなるBigQueryのデータセット・テーブルはdata_set_test.jp_weatherです。

CSVの中身は以下の天候状況になります。

2021-11-13,11,名古屋,晴,10.8,0,9.7,3
2021-11-13,11,仙台,晴,11.3,0,8.1,1.8
2021-11-13,11,大阪,晴,12.1,0,8.8,5
2021-11-13,11,札幌,晴,7.8,0,1.7,7.5
2021-11-13,11,長野,晴,7.3,0,9.3,
2021-11-13,11,京都,晴,10.5,0,7.6,
2021-11-13,11,東京,晴,13.1,0,9.4,0.5
2021-11-13,11,横浜,晴,13.6,0,10.1,
2021-03-03,3,京都,晴,5.9,0,6.6,
2021-03-03,3,那覇,晴,18.4,0,8.5,4.3
2021-03-03,3,大阪,晴,6.8,0,8.2,5
2021-03-03,3,名古屋,晴,7.3,0,10.7,1
2021-03-03,3,横浜,晴,7.7,0,11,
2021-03-03,3,仙台,晴,3.1,0,9.7,4.3
....

データの中身は日付,月,都市名,天気,気温,湿度,日照時間,雲量となります。

BigQueryのテーブルは以下になります。

$ bq show --schema --format=prettyjson data_set_test.jp_weather
[
  {
    "mode": "NULLABLE",
    "name": "date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "month",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "city",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "w_type",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "temperature",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "precipitation",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "sunlight",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "cloudage",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "created_time_ts_utc",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "created_time_dt_jst",
    "type": "DATETIME"
  }
]

Apache Beamのインストール

はじめにapache-beamのモジュールをインストールしておきます。ローカル実行だけを行うのであればPythonやApache Beamのバージョンは気にしなくても良いですが、Dataflowでも動かすためGoogle Cloudの公式ドキュメントからランタイムサポートを確認して起きます。(Apache Beam のランタイム サポート  |  Cloud Dataflow  |  Google Cloud

今回はApache Beamのバーションは2.37.0を使いますのでpipでインストールします。

pip install apache-beam==2.37.0
pip install 'apache-beam [gcp]'

これで準備完了です。

Apache BeamのPythonスクリプトを記述

Dataflowでの処理をPythonで記述します。

gcs_to_bq.py

import logging
import csv
from datetime import datetime, timezone, timedelta

import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

TZ_JST = timezone(timedelta(hours=9))


class BeamOptions:
    def __init__(self,runner):
        self.options = PipelineOptions()
        # GoogleCloud Option
        self.gcloud_options = self.options.view_as(GoogleCloudOptions)
        self.gcloud_options.job_name = "{Dataflow名}"
        self.gcloud_options.project = "{プロジェクト名}"
        self.gcloud_options.temp_location = "gs://example_bucket_name/tmp" # 処理する際にGCSに一時ファイルを作成するのでその保管先のGCS URI
        self.gcloud_options.region = "asia-northeast1"
        # Setup Option
        self.options.view_as(SetupOptions).save_main_session = True
        # Standard Option
        self.options.view_as(StandardOptions).runner = runner


def parse_file(element):
    for line in csv.reader(
        [element],
        quotechar='"',
        delimiter=",",
        quoting=csv.QUOTE_ALL,
        skipinitialspace=True,
    ):
        return line


def convert_dict(element):
    now_utc = datetime.now(timezone.utc)
    now = datetime.now(TZ_JST)
    return {
        "date": element[0],
        "month": element[1],
        "city": element[2],
        "w_type": element[3],
        "temperature": element[4],
        "precipitation": element[5],
        "sunlight": element[6],
        "cloudage": element[7],
        # データの書き込み日時を付加する
        "created_time_ts_utc": now_utc.strftime("%Y-%m-%d %H:%M:%S.%f"),
        "created_time_dt_jst": now.strftime("%Y-%m-%d %H:%M:%S.%f"),
    }


# ジョブ実行時のメイン処理部分
def run(gcs_uri, table_spec, runner):
    # パイプラインの生成
    with beam.Pipeline(options=BeamOptions(runner).options) as p:

        # GCSからファイル読み込み
        raw_datas = p | "Read from GCS" >> beam.io.ReadFromText(gcs_uri)
        # 変換処理
        tran_datas = (
            raw_datas
            | "transform csv" >> beam.Map(parse_file)
            | "transform dict" >> beam.Map(convert_dict)
        )

        # BigQueryへデータ登録
        tran_datas | "Write to BigQuery" >> WriteToBigQuery(
            table_spec,
            create_disposition=BigQueryDisposition.CREATE_NEVER,
            write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run(
        "gs://example_bucket_name/jp_weather_2021.csv", # ソースとなるファイルのGCS URI
        "data_set_test.jp_weather", # ターゲットとなるBigQueryのデータセット・テーブル
        # runnerを指定する
        "DirectRunner",  # ローカル実行
        # "DataflowRunner",  # Cloud Dataflow実行
    )

処理の中身を解説します。

  1. メイン処理のrun内でGCSからソースとなるCSVファイルからReadFromTextを使いデータを読み込みPCollectionraw_datasを作成します。
  2. PCollectionraw_datasにTransformを行い新たなPCollectiontran_datasを作成します。
    1. beam.Mapでcsvモジュールを使いcsvのパースを行います。処理はDataflowワーカーで行われるためsave_main_sessionのオプションをTrueにしてピクル化する必要があります。(よくある質問(NameError を処理するにはどうすればよいですか?)  |  Cloud Dataflow  |  Google Cloud
    2. beam.MapでBigQueryへ書き込むようにデータをdict化します。
    3. dict化する際に処理日時を付加しています。
  3. PCollectiontran_datasをWriteToBigQueryを使いBigQueryに書き込みます。
    1. create_disposition=BigQueryDisposition.CREATE_NEVERでターゲットテーブルがない場合は行わない
    2. write_disposition=BigQueryDisposition.WRITE_TRUNCATEでターゲットテーブをTruncateしてからデータを書き込む

それではこのスクリプトをローカルとDataflowで実行してみます。

記述したPythonスクリプトをローカル実行

はじめにローカル実行を行ってみます。スクリプトの最後の部分でApache BeamのrunnerをDirectRunnerにしてからpythonを実行します。

$ python gcs_to_bq.py

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
....
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseMonitorDestinationLoadJobs-Imp_74)+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseMonitorDestinationLoadJobs-Fla_75))+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseMonitorDestinationLoadJobs-Map_77))+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-WaitForDestinationLoadJobs_78)
INFO:root:Job status: DONE
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseEmptyPC-Impulse_13)+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseEmptyPC-FlatMap-lambda-at-core_14))+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseEmptyPC-Map-decode-_16)

処理が終わりBigQueryで確認するとデータが登録されていることがわかります。

$ bq query "select * from data_set_test.jp_weather limit 5" 
+------------+-------+------+--------+-------------+---------------+----------+----------+---------------------+----------------------------+
|    date    | month | city | w_type | temperature | precipitation | sunlight | cloudage | created_time_ts_utc |    created_time_dt_jst     |
+------------+-------+------+--------+-------------+---------------+----------+----------+---------------------+----------------------------+
| 2021-11-13 |    11 | 京都 | 晴     |        10.5 |             0 |      7.6 |     NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.662826 |
| 2021-03-03 |     3 | 京都 | 晴     |         5.9 |             0 |      6.6 |     NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.663407 |
| 2021-11-14 |    11 | 京都 | 晴     |        12.1 |             0 |      3.6 |     NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.665853 |
| 2021-03-04 |     3 | 京都 | 晴     |          10 |             0 |      6.1 |     NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.666451 |
| 2021-11-15 |    11 | 京都 | 晴     |        12.8 |             0 |      7.9 |     NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.668426 |
+------------+-------+------+--------+-------------+---------------+----------+----------+---------------------+----------------------------+

Google CloudのDataflowで実行

次にDataflowで実行してみます。スクリプトの最後の部分でApache BeamのrunnerをDataflowRunnerにしてからpythonを実行します。

$ python gcs_to_bq.py

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
...
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.791Z: JOB_MESSAGE_DEBUG: Executing success step success48
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.871Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.908Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.935Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T20:00:06.914Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized worker pool from 1 to 0.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T20:00:06.950Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T20:00:06.976Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2022-05-06_12_55_22-18196576572105466017 is in state JOB_STATE_DONE

スクリプトの実行ログではわかりにくいですが、Google CloudのDataflowのコンソールから確認するとDataflowで実行されていることがわかります。

Dataflowのジョブが作成され、Pipelineが実行中であることがわかります。

実行が完了するとジョブグラフにてジョブ情報がグラフでわかりやすく表現されています。

まとめ

Dataflowを使ってGCSにあるCSVファイルをBigQueryにロードする処理をPythonスクリプトで試してみました。単純にロードするだけなら非常に簡単な記述ですし、データを加工する際もApache BeamのParDoやMapなどのTransformを使い簡単に加工を行えるのでETLツールとしてその概念さえ理解すれば便利ではないでしょうか。機会があれば積極的に使っていきたいと思います。

最後まで読んで頂いてありがとうございました。