この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部、池田です。
少し前に、BigQuery上で Apache Spark
でストアドプロシージャを構築できる機能が プレビュー
となりました。
【 BigQuery で無制限のワークロードを構築: SQL 以外の言語を使用した新機能 】
Spark in BigQueryが発表?BigQueryの画面でSparkジョブを書いて、BigQueryのストアドプロシージャとしてSparkを実行。料金はBigQueryのスロットに統合。Pythonで書きたい処理をストアド化したり、非構造化データやクレンジングもBigQueryで出来る。https://t.co/yos3CZ42ca#GoogleCloudNext pic.twitter.com/XX9qXVVoJz
— Yuta.H (@yutah_3) October 11, 2022
今回は、このSparkのストアドプロシージャでCloud Storage(以下GCS)に配置されたファイルを操作してみます。
この機能は、執筆時点(2022/11/07)では申請が必要なプレビューとなります。 フォームからの申請が必要でしたが、 私の場合は2022/10/14に簡単な入力で申請をして、 2022/10/19には承認メールが届き利用できました。
実装については、以下の公式のガイドを参考に進めました。
【 Work with stored procedures for Apache Spark 】
メタストアやSpark 履歴サーバーといった機能もあるようですが、 今回は触りません… シンプル?な「GCS上のCSVをJSONにして書き出すパターン」と 「GCS上のCSVをBigQueryに取り込む時にファイルパスをカラムとして追加するパターン」 をまずは実装してみました。
※当ブログの以下の操作は、プロジェクトのオーナー権限を持つアカウントで、 コマンドは Cloud Shell 、 SQLはBigQueryのコンソールで実行しています。
接続の作成
BigQuery Connection API
を有効化します。
bqコマンドでUSマルチリージョンにSparkの接続を作成します。
bq mk --connection --connection_type='SPARK' \
--location=US \
spark-conn
接続に紐づくサービスアカウントが必要になるので、bqコマンドで取得します。
bq show --connection us.spark-conn
この結果が↓
Connection {プロジェクトID}.us.spark-conn
name friendlyName description Last modified type hasCredential properties
---------------------------------- -------------- ------------- ----------------- ------- --------------- ----------------------------------------------------------------------------------------------
{プロジェクト番号}.us.spark-conn 07 Nov 01:30:17 SPARK False {"serviceAccountId": "{プロジェクト番号を含む文字列}@gcp-sa-bigquery-consp.iam.gserviceaccount.com"}
↑一番右にサービスアカウントがありました。
↓接続作成後、コンソール上だとこんな感じ。
こちらでもサービスアカウントは確認できそうです。
この接続を使って、2パターンのストアドプロシージャを作成します。
ストアドプロシージャ1:GCS上のCSVをJSONにして書き出す
なんとなくのイメージでGCS上のファイルを扱うことが多そうな気がしたので、
GCSを絡めたインプットとアウトプットを試してみます。
また、呼び出し時のパラメータの渡し方も確認してみます。
※ただGCSと読み書きするだけであれば、 LOAD DATA
や EXPORT DATA
の
構文などで十分
です。実際の利用時にはSparkでやりたい処理が間に入るイメージかなと思います。
実装
まず、ストアドプロシージャはデータセット配下に作成するので、 データセットを作成します。
CREATE SCHEMA my_spark
OPTIONS (
location = 'US'
);
あとはストアドプロシージャを作成します。
GCS上に.pyファイルを置く方法もあるようですが、
今回はインライン コードで記述しています。
内容は、引数で指定されたGCSバケットのCSVファイルをロードし、別パスにJSONとして書き出すだけです。
CREATE PROCEDURE my_spark.my_spark_gcs_sample(sample STRING, bucket STRING)
WITH CONNECTION `us.spark-conn`
OPTIONS (
engine="SPARK"
)
LANGUAGE PYTHON AS R"""
import os
from pyspark.sql import SparkSession
# 引数出力
print("PROCEDURE START. param: {}".format(os.environ["BIGQUERY_PROC_PARAM.sample"].strip("'\"")))
bucket_name = os.environ["BIGQUERY_PROC_PARAM.bucket"].strip("'\"")
# セッションを作成
spark = SparkSession.builder.appName("my_spark_sample").getOrCreate()
# GCSを参照
df = spark.read.option("header",True).csv(f"gs://{bucket_name}/inputs/")
# データ表示
df.show()
# GCS出力
df.write.json(f"gs://{bucket_name}/outputs/")
"""
;
呼び出し時の引数は、
os.environ["BIGQUERY_PROC_PARAM.{引数名}"]
のように参照できるそうですが、
私の場合文字列が " "
で括られてしまっていたので、
strip()
を噛ませています。
readのcsv() でファイルをロードし、 writeのjson() でそのまま書き出しています。
↓コンソール上はこんな感じ。
このパターンの場合、接続のサービスアカウントにGCSバケットを扱う権限が必要になるので、
前述の公式ガイドに倣って、 roles/storage.objectAdmin
を付与します。
gsutil iam ch serviceAccount:{サービスアカウント}:objectAdmin \
gs://{バケット名}
この権限が無いと呼び出し時に Permission 'storage.objects.get' denied
みたいなエラーになりました。
ロールの扱いだけ上手くできれば、割とさくっと実装できました。
実行
インプットのファイルは3行ほどのヘッダー有りのCSVです。
gs://{バケット名}/inputs/foo.csv
col1,col2,col3
aaa,123,true
bbb,456,false
CALL
で引数を渡しつつ呼び出します。
CALL my_spark.my_spark_gcs_sample('Sparkのストアドプロシージャを使ってみる', '{バケット名}');
↑ print()
や df.show()
の結果はBigQueryのコンソールからも確認できるようです。便利!
↓完了すると、成功を示すと思われる _SUCCESS
という空ファイルと、
データのJSONファイルが出力されていました。
gs://{バケット名}/outputs/part-00000-5732b15d-accb-4020-bd4c-96264fe3e7f1-c000.json
{"col1":"aaa","col2":"123","col3":"true"}
{"col1":"bbb","col2":"456","col3":"false"}
私の場合はコンソール上での実行から100秒くらいで完了になりました。 ※あくまでプレビュー段階です。
ストアドプロシージャ2:GSC上のCSVをBigQueryに取り込む時にファイルパスをカラムとして追加する
もう一つのパターンは、 BigQueryのテーブルにロードするのですが、 どのファイルからロードされたのか分かるように、 元のファイルのパスをカラムとして追加してみます。
実装
内容は、引数で指定されたGCSバケットのCSVファイル群を、 ファイルパスをデータセットにカラムとして追加してから、 別の引数として指定されたBigQueryテーブルに出力します。
CREATE PROCEDURE my_spark.my_spark_import_sample(bucket STRING, table STRING)
WITH CONNECTION `us.spark-conn`
OPTIONS (
engine="SPARK"
)
LANGUAGE PYTHON AS R"""
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
# 引数
bucket_name = os.environ["BIGQUERY_PROC_PARAM.bucket"].strip("'\"")
table_name = os.environ["BIGQUERY_PROC_PARAM.table"].strip("'\"")
# セッションを作成
spark = SparkSession.builder.appName("my_spark_import_sample").getOrCreate()
# GCSを参照
df = spark.read.option("header",True).csv(f"gs://{bucket_name}/inputs/")
# パスを設定
df = df.withColumn("uri", input_file_name())
# BQへ格納
df.write.format("bigquery") \
.option("writeMethod", "direct") \
.save(table_name)
"""
;
input_file_name()
でさくっと取れてしまうんですね…知りませんでした…
このパターンの場合、接続のサービスアカウントに前章のGCSの権限に加えて、
BigQueryを扱う権限が必要になるので、
前述の公式ガイドに倣って、取込み先のデータセットで roles/bigquery.admin
を付与します。
GRANT `roles/bigquery.admin`
ON SCHEMA my_spark
TO 'serviceAccount:{サービスアカウント}';
これが無いと、実行時に Access Denied
的なことを言われます。
実行
インプットのファイルは、前章の3行のCSVを複製して、 2ファイルをロードしてみます。
実行してみます。(事前にテーブルは作成していません。)
CALL my_spark.my_spark_import_sample('{バケット名}', 'my_spark.imported');
↑ファイルのパスがカラムとして追加されて、ロードできています!
私の場合はコンソール上での実行から2分弱くらいで完了になりました。 ※あくまでプレビュー段階です。
おわりに
いろいろ使えそうな機能ですね。
今回使わなかった機能や、 データ量が多い時の所要時間なども気になりますね。 気が向いたらブログにします。