はじめに
データアナリティクス事業本部のkobayashiです。
GoogleCloudのETLサービスのDataflowを調査する機会があり、Google Cloud Storage(以降GCS)にあるCSVファイルをBigQueryにロードする処理をPythonスクリプトで試してみましたのでまとめます。
Cloud Storage Text to BigQuery
DataflowはマネージドなApache Beam Runnerでプログラミング言語としてはJava,Python,Goを使うことができます。今回やろうとしている「Cloud Storage Text to BigQuery」に関してはGoogle提供のバッチテンプレートがありますが、このテンプレートはJavaで記述されているため使用せずPythonで実装してみます。
環境
- Python 3.9.10
- apache-beam 2.37.0
GCSからBigQueryへのロードを実装
Apache BeamのRunnerとして最終的にDataflowを使いますが、開発する際はRunnerはローカル実行をします。したがって Cloud Storage Text to BigQueryを実装するために以下の手順で進めます。
- GCSからBigQueryへロードするApache BeamのPythonスクリプトを記述する
- 記述したPythonスクリプトをローカル実行で試してみる
- Google CloudのDataflowをRunnerとしてPythonスクリプトを実行してみる
今回GCSからBigQueryに移行しますが、ソースとなるGCSのパスはgs://example_bucket_name/jp_weather_2021.csv
でターゲットとなるBigQueryのデータセット・テーブルはdata_set_test.jp_weather
です。
CSVの中身は以下の天候状況になります。
2021-11-13,11,名古屋,晴,10.8,0,9.7,3
2021-11-13,11,仙台,晴,11.3,0,8.1,1.8
2021-11-13,11,大阪,晴,12.1,0,8.8,5
2021-11-13,11,札幌,晴,7.8,0,1.7,7.5
2021-11-13,11,長野,晴,7.3,0,9.3,
2021-11-13,11,京都,晴,10.5,0,7.6,
2021-11-13,11,東京,晴,13.1,0,9.4,0.5
2021-11-13,11,横浜,晴,13.6,0,10.1,
2021-03-03,3,京都,晴,5.9,0,6.6,
2021-03-03,3,那覇,晴,18.4,0,8.5,4.3
2021-03-03,3,大阪,晴,6.8,0,8.2,5
2021-03-03,3,名古屋,晴,7.3,0,10.7,1
2021-03-03,3,横浜,晴,7.7,0,11,
2021-03-03,3,仙台,晴,3.1,0,9.7,4.3
....
データの中身は日付,月,都市名,天気,気温,湿度,日照時間,雲量
となります。
BigQueryのテーブルは以下になります。
$ bq show --schema --format=prettyjson data_set_test.jp_weather
[
{
"mode": "NULLABLE",
"name": "date",
"type": "DATE"
},
{
"mode": "NULLABLE",
"name": "month",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "w_type",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "temperature",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "precipitation",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "sunlight",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "cloudage",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "created_time_ts_utc",
"type": "TIMESTAMP"
},
{
"mode": "NULLABLE",
"name": "created_time_dt_jst",
"type": "DATETIME"
}
]
Apache Beamのインストール
はじめにapache-beamのモジュールをインストールしておきます。ローカル実行だけを行うのであればPythonやApache Beamのバージョンは気にしなくても良いですが、Dataflowでも動かすためGoogle Cloudの公式ドキュメントからランタイムサポートを確認して起きます。(Apache Beam のランタイム サポート | Cloud Dataflow | Google Cloud )
今回はApache Beamのバーションは2.37.0を使いますのでpipでインストールします。
pip install apache-beam==2.37.0
pip install 'apache-beam [gcp]'
これで準備完了です。
Apache BeamのPythonスクリプトを記述
Dataflowでの処理をPythonで記述します。
gcs_to_bq.py
import logging
import csv
from datetime import datetime, timezone, timedelta
import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
TZ_JST = timezone(timedelta(hours=9))
class BeamOptions:
def __init__(self,runner):
self.options = PipelineOptions()
# GoogleCloud Option
self.gcloud_options = self.options.view_as(GoogleCloudOptions)
self.gcloud_options.job_name = "{Dataflow名}"
self.gcloud_options.project = "{プロジェクト名}"
self.gcloud_options.temp_location = "gs://example_bucket_name/tmp" # 処理する際にGCSに一時ファイルを作成するのでその保管先のGCS URI
self.gcloud_options.region = "asia-northeast1"
# Setup Option
self.options.view_as(SetupOptions).save_main_session = True
# Standard Option
self.options.view_as(StandardOptions).runner = runner
def parse_file(element):
for line in csv.reader(
[element],
quotechar='"',
delimiter=",",
quoting=csv.QUOTE_ALL,
skipinitialspace=True,
):
return line
def convert_dict(element):
now_utc = datetime.now(timezone.utc)
now = datetime.now(TZ_JST)
return {
"date": element[0],
"month": element[1],
"city": element[2],
"w_type": element[3],
"temperature": element[4],
"precipitation": element[5],
"sunlight": element[6],
"cloudage": element[7],
# データの書き込み日時を付加する
"created_time_ts_utc": now_utc.strftime("%Y-%m-%d %H:%M:%S.%f"),
"created_time_dt_jst": now.strftime("%Y-%m-%d %H:%M:%S.%f"),
}
# ジョブ実行時のメイン処理部分
def run(gcs_uri, table_spec, runner):
# パイプラインの生成
with beam.Pipeline(options=BeamOptions(runner).options) as p:
# GCSからファイル読み込み
raw_datas = p | "Read from GCS" >> beam.io.ReadFromText(gcs_uri)
# 変換処理
tran_datas = (
raw_datas
| "transform csv" >> beam.Map(parse_file)
| "transform dict" >> beam.Map(convert_dict)
)
# BigQueryへデータ登録
tran_datas | "Write to BigQuery" >> WriteToBigQuery(
table_spec,
create_disposition=BigQueryDisposition.CREATE_NEVER,
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run(
"gs://example_bucket_name/jp_weather_2021.csv", # ソースとなるファイルのGCS URI
"data_set_test.jp_weather", # ターゲットとなるBigQueryのデータセット・テーブル
# runnerを指定する
"DirectRunner", # ローカル実行
# "DataflowRunner", # Cloud Dataflow実行
)
処理の中身を解説します。
- メイン処理のrun内でGCSからソースとなるCSVファイルからReadFromTextを使いデータを読み込みPCollection
raw_datas
を作成します。 - PCollection
raw_datas
にTransformを行い新たなPCollectiontran_datas
を作成します。- beam.Mapでcsvモジュールを使いcsvのパースを行います。処理はDataflowワーカーで行われるため
save_main_session
のオプションをTrue
にしてピクル化する必要があります。(よくある質問(NameError を処理するにはどうすればよいですか?) | Cloud Dataflow | Google Cloud ) - beam.MapでBigQueryへ書き込むようにデータをdict化します。
- dict化する際に処理日時を付加しています。
- beam.Mapでcsvモジュールを使いcsvのパースを行います。処理はDataflowワーカーで行われるため
- PCollection
tran_datas
をWriteToBigQueryを使いBigQueryに書き込みます。create_disposition=BigQueryDisposition.CREATE_NEVER
でターゲットテーブルがない場合は行わないwrite_disposition=BigQueryDisposition.WRITE_TRUNCATE
でターゲットテーブをTruncateしてからデータを書き込む
それではこのスクリプトをローカルとDataflowで実行してみます。
記述したPythonスクリプトをローカル実行
はじめにローカル実行を行ってみます。スクリプトの最後の部分でApache BeamのrunnerをDirectRunner
にしてからpythonを実行します。
$ python gcs_to_bq.py
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
....
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseMonitorDestinationLoadJobs-Imp_74)+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseMonitorDestinationLoadJobs-Fla_75))+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseMonitorDestinationLoadJobs-Map_77))+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-WaitForDestinationLoadJobs_78)
INFO:root:Job status: DONE
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseEmptyPC-Impulse_13)+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseEmptyPC-FlatMap-lambda-at-core_14))+(ref_AppliedPTransform_Write-to-BigQuery-BigQueryBatchFileLoads-ImpulseEmptyPC-Map-decode-_16)
処理が終わりBigQueryで確認するとデータが登録されていることがわかります。
$ bq query "select * from data_set_test.jp_weather limit 5"
+------------+-------+------+--------+-------------+---------------+----------+----------+---------------------+----------------------------+
| date | month | city | w_type | temperature | precipitation | sunlight | cloudage | created_time_ts_utc | created_time_dt_jst |
+------------+-------+------+--------+-------------+---------------+----------+----------+---------------------+----------------------------+
| 2021-11-13 | 11 | 京都 | 晴 | 10.5 | 0 | 7.6 | NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.662826 |
| 2021-03-03 | 3 | 京都 | 晴 | 5.9 | 0 | 6.6 | NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.663407 |
| 2021-11-14 | 11 | 京都 | 晴 | 12.1 | 0 | 3.6 | NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.665853 |
| 2021-03-04 | 3 | 京都 | 晴 | 10 | 0 | 6.1 | NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.666451 |
| 2021-11-15 | 11 | 京都 | 晴 | 12.8 | 0 | 7.9 | NULL | 2022-05-01 19:44:57 | 2022-05-02T04:44:57.668426 |
+------------+-------+------+--------+-------------+---------------+----------+----------+---------------------+----------------------------+
Google CloudのDataflowで実行
次にDataflowで実行してみます。スクリプトの最後の部分でApache BeamのrunnerをDataflowRunner
にしてからpythonを実行します。
$ python gcs_to_bq.py
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
...
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.791Z: JOB_MESSAGE_DEBUG: Executing success step success48
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.871Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.908Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T19:59:22.935Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T20:00:06.914Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized worker pool from 1 to 0.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T20:00:06.950Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-05-06T20:00:06.976Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2022-05-06_12_55_22-18196576572105466017 is in state JOB_STATE_DONE
スクリプトの実行ログではわかりにくいですが、Google CloudのDataflowのコンソールから確認するとDataflowで実行されていることがわかります。
Dataflowのジョブが作成され、Pipelineが実行中であることがわかります。
実行が完了するとジョブグラフにてジョブ情報がグラフでわかりやすく表現されています。
まとめ
Dataflowを使ってGCSにあるCSVファイルをBigQueryにロードする処理をPythonスクリプトで試してみました。単純にロードするだけなら非常に簡単な記述ですし、データを加工する際もApache BeamのParDoやMapなどのTransformを使い簡単に加工を行えるのでETLツールとしてその概念さえ理解すれば便利ではないでしょうか。機会があれば積極的に使っていきたいと思います。
最後まで読んで頂いてありがとうございました。