BigQueryのSparkストアドプロシージャでGCSのファイルを操作する(プレビュー)

Google Cloud Next '22でプレビューとなったApache Spark用BigQueryストアドプロシージャで、 Cloud Storage上のファイルの形式変換と、 ファイルパスをカラムに追加してのテーブルへの取込みを試す。
2022.11.07

データアナリティクス事業本部、池田です。
少し前に、BigQuery上で Apache Sparkストアドプロシージャを構築できる機能が プレビュー となりました。

BigQuery で無制限のワークロードを構築: SQL 以外の言語を使用した新機能

今回は、このSparkのストアドプロシージャでCloud Storage(以下GCS)に配置されたファイルを操作してみます。

この機能は、執筆時点(2022/11/07)では申請が必要なプレビューとなります。 フォームからの申請が必要でしたが、 私の場合は2022/10/14に簡単な入力で申請をして、 2022/10/19には承認メールが届き利用できました。

実装については、以下の公式のガイドを参考に進めました。
Work with stored procedures for Apache Spark

メタストアSpark 履歴サーバーといった機能もあるようですが、 今回は触りません… シンプル?な「GCS上のCSVをJSONにして書き出すパターン」と 「GCS上のCSVをBigQueryに取り込む時にファイルパスをカラムとして追加するパターン」 をまずは実装してみました。


※当ブログの以下の操作は、プロジェクトのオーナー権限を持つアカウントで、 コマンドは Cloud Shell 、 SQLはBigQueryのコンソールで実行しています。

接続の作成

BigQuery Connection API を有効化します。


bqコマンドでUSマルチリージョンにSparkの接続を作成します。

bq mk --connection --connection_type='SPARK' \
--location=US \
spark-conn

接続に紐づくサービスアカウントが必要になるので、bqコマンドで取得します。

bq show --connection us.spark-conn

この結果が↓

Connection {プロジェクトID}.us.spark-conn

             name                    friendlyName   description    Last modified    type    hasCredential                                            properties                                           
 ---------------------------------- -------------- ------------- ----------------- ------- --------------- ---------------------------------------------------------------------------------------------- 
  {プロジェクト番号}.us.spark-conn                                07 Nov 01:30:17   SPARK   False           {"serviceAccountId": "{プロジェクト番号を含む文字列}@gcp-sa-bigquery-consp.iam.gserviceaccount.com"}

↑一番右にサービスアカウントがありました。

↓接続作成後、コンソール上だとこんな感じ。 こちらでもサービスアカウントは確認できそうです。

この接続を使って、2パターンのストアドプロシージャを作成します。

ストアドプロシージャ1:GCS上のCSVをJSONにして書き出す

なんとなくのイメージでGCS上のファイルを扱うことが多そうな気がしたので、 GCSを絡めたインプットとアウトプットを試してみます。
また、呼び出し時のパラメータの渡し方も確認してみます。

※ただGCSと読み書きするだけであれば、 LOAD DATAEXPORT DATA構文などで十分 です。実際の利用時にはSparkでやりたい処理が間に入るイメージかなと思います。

実装

まず、ストアドプロシージャはデータセット配下に作成するので、 データセットを作成します。

CREATE SCHEMA my_spark
OPTIONS (
    location = 'US'
);

あとはストアドプロシージャを作成します。

GCS上に.pyファイルを置く方法もあるようですが、 今回はインライン コードで記述しています。
内容は、引数で指定されたGCSバケットのCSVファイルをロードし、別パスにJSONとして書き出すだけです。

CREATE PROCEDURE my_spark.my_spark_gcs_sample(sample STRING, bucket STRING)
WITH CONNECTION `us.spark-conn`
OPTIONS (
    engine="SPARK"
)
LANGUAGE PYTHON AS R"""
import os
from pyspark.sql import SparkSession


# 引数出力
print("PROCEDURE START. param: {}".format(os.environ["BIGQUERY_PROC_PARAM.sample"].strip("'\"")))
bucket_name = os.environ["BIGQUERY_PROC_PARAM.bucket"].strip("'\"")

# セッションを作成
spark = SparkSession.builder.appName("my_spark_sample").getOrCreate()

# GCSを参照
df = spark.read.option("header",True).csv(f"gs://{bucket_name}/inputs/")
# データ表示
df.show()
# GCS出力
df.write.json(f"gs://{bucket_name}/outputs/")
"""
;

呼び出し時の引数は、 os.environ["BIGQUERY_PROC_PARAM.{引数名}"] のように参照できるそうですが、 私の場合文字列が " " で括られてしまっていたので、 strip() を噛ませています。

readのcsv() でファイルをロードし、 writeのjson() でそのまま書き出しています。

↓コンソール上はこんな感じ。


このパターンの場合、接続のサービスアカウントにGCSバケットを扱う権限が必要になるので、 前述の公式ガイドに倣って、 roles/storage.objectAdmin を付与します。

gsutil iam ch serviceAccount:{サービスアカウント}:objectAdmin \
gs://{バケット名}

この権限が無いと呼び出し時に Permission 'storage.objects.get' denied みたいなエラーになりました。

ロールの扱いだけ上手くできれば、割とさくっと実装できました。

実行

インプットのファイルは3行ほどのヘッダー有りのCSVです。

gs://{バケット名}/inputs/foo.csv

col1,col2,col3
aaa,123,true
bbb,456,false

CALL で引数を渡しつつ呼び出します。
CALL my_spark.my_spark_gcs_sample('Sparkのストアドプロシージャを使ってみる', '{バケット名}');

print()df.show() の結果はBigQueryのコンソールからも確認できるようです。便利!

↓完了すると、成功を示すと思われる _SUCCESS という空ファイルと、 データのJSONファイルが出力されていました。

gs://{バケット名}/outputs/part-00000-5732b15d-accb-4020-bd4c-96264fe3e7f1-c000.json

{"col1":"aaa","col2":"123","col3":"true"}
{"col1":"bbb","col2":"456","col3":"false"}

私の場合はコンソール上での実行から100秒くらいで完了になりました。 ※あくまでプレビュー段階です。

ストアドプロシージャ2:GSC上のCSVをBigQueryに取り込む時にファイルパスをカラムとして追加する

もう一つのパターンは、 BigQueryのテーブルにロードするのですが、 どのファイルからロードされたのか分かるように、 元のファイルのパスをカラムとして追加してみます。

実装

内容は、引数で指定されたGCSバケットのCSVファイル群を、 ファイルパスをデータセットにカラムとして追加してから、 別の引数として指定されたBigQueryテーブルに出力します。

CREATE PROCEDURE my_spark.my_spark_import_sample(bucket STRING, table STRING)
WITH CONNECTION `us.spark-conn`
OPTIONS (
    engine="SPARK"
)
LANGUAGE PYTHON AS R"""
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name


# 引数
bucket_name = os.environ["BIGQUERY_PROC_PARAM.bucket"].strip("'\"")
table_name = os.environ["BIGQUERY_PROC_PARAM.table"].strip("'\"")

# セッションを作成
spark = SparkSession.builder.appName("my_spark_import_sample").getOrCreate()

# GCSを参照
df = spark.read.option("header",True).csv(f"gs://{bucket_name}/inputs/")

# パスを設定
df = df.withColumn("uri", input_file_name())

# BQへ格納
df.write.format("bigquery") \
        .option("writeMethod", "direct") \
        .save(table_name)
"""
;

input_file_name() でさくっと取れてしまうんですね…知りませんでした…


このパターンの場合、接続のサービスアカウントに前章のGCSの権限に加えて、 BigQueryを扱う権限が必要になるので、 前述の公式ガイドに倣って、取込み先のデータセットで roles/bigquery.admin を付与します。

GRANT `roles/bigquery.admin`
ON SCHEMA my_spark
TO 'serviceAccount:{サービスアカウント}';

これが無いと、実行時に Access Denied 的なことを言われます。

実行

インプットのファイルは、前章の3行のCSVを複製して、 2ファイルをロードしてみます。


実行してみます。(事前にテーブルは作成していません。)
CALL my_spark.my_spark_import_sample('{バケット名}', 'my_spark.imported');

↑ファイルのパスがカラムとして追加されて、ロードできています!

私の場合はコンソール上での実行から2分弱くらいで完了になりました。 ※あくまでプレビュー段階です。

おわりに

いろいろ使えそうな機能ですね。

今回使わなかった機能や、 データ量が多い時の所要時間なども気になりますね。 気が向いたらブログにします。

関連情報/参考にさせていただいたページ