Google Cloud に統合された Dataform を使って BigQuery にテーブル&ビューを作成してみた。
こんにちは、みかみです。
キッチンの窓の網戸に20cm級のナナフシがとまっていて、思わず見惚れてしまいました。
やりたいこと
- Google Cloud に統合された Dataform をさわってみたい
- Dataform をバッチ処理で使いたい
- Cloud Composer から Dataform を実行したい
Dataform とは
SQL like なコード(SQLX)でテーブルやビュー作成クエリを記述することで、テーブル間の依存関係を管理しながら DWH に SQL を実行できる、データモデリングツールです。 以前は独立した SaaS サービスでしたが、2020年12月に Google 傘下に加わったことにより、現在は BigQuery データのモデリングツールとして、Google Cloud 管理コンソールから実行できるようになりました。
なお、2022/09 現在はまだプレビューのため、ご利用の際にはご留意ください。
- Dataform
- Overview of Dataform | Dataform ドキュメント
- Dataform が Google Cloud の傘下に: BigQuery で SQL を使用してデータ変換をデプロイする | Google Cloud ブログ
- Google quietly acquires Dataform, the UK startup helping businesses manage data warehouses
- Dataformとdbtで楽するデータモデリング | Speaker Deck
Dataform の料金
Dataform 自体は無料で利用することができます。 ただし、BigQuery への SQL 実行にかかる料金や、Composer などのサービスと連携して利用する場合は連携サービスの利用料金が必要になります。
前提
課金が有効な Google Cloud プロジェクトがあり、Dataform API は有効化済みです。 また、Dataform リポジトリやワークスペースの作成、その他操作に必要な権限は持っているものとします。
Dataform リポジトリとワークスペースを準備
Google Cloud 管理コンソール Dataform 画面の「リポジトリを作成」リンクから、新規リポジトリを作成します。
リポジトリ ID に任意のリポジトリ名を入力して、プルダウンからリージョンを選択します。
現時点では、リージョンは europe-west4
または us-central1
のどちらかを選択可能なようです。
リポジトリが正常に作成できました。
続いてワークスペースを作成します。 リポジトリ一覧から作成したリポジトリのリンクをクリックし、「CREATE DEVELOPMENT WORKSPACE」リンクから、新規ワークスペースを作成します。
任意のワークスペース ID を入力して「CREATE」
作成したワークスペース に入り、「INITIALIZE WORKSPACE」ボタンからワークスペースを初期化します。
初期化が完了すると、必要な構成ファイルやサンプルコードが自動作成されました。
Dataform を手動実行
まずは、先ほど自動作成されたサンプルコードを手動実行してみます。
クイックスタートの記載にしたがって、Google Cloud 管理コンソール IAM 画面の「追加」リンクから、Dataform 実行用サービスアカウントを追加して、必要なロールを付与しました。
Dataform のワークスペース画面に戻り、画面上部の「START EXECUTION」プルダウンから「すべてのアクション」をクリック。
実行内容を確認して、「START EXECUTION」ボタンをクリック。
実行ログは、リポジトリ画面「WORKFLOW EXECUTION LOGS」タブから確認できます。
「詳細を表示」リンクからは、実際に BigQuery に対して実行されたクエリも確認できます。
クエリによると、US マルチリージョンの dataform
データセットを作成し、サンプルコードで定義されていた first_view
と second_view
2 つのビューを作成したようです。
BigQuery コンソールからも確認してみます。
期待通り、データセットとビューが新規作成されたことが確認できました。
Dataform を Cloud Composer から実行
管理コンソールからの手動実行は確認できましたが、実際のシステムではスケジューリングされたバッチ処理などで実行するケースが多いのではないでしょうか。 また、Dataform 単体ではなく、BigQuery にロードしたファイルデータに対して SQL を実行して、マートテーブルやビューを作成するようなユースケースが想定されるのではないかと思います。
ということで、Datafrom を Cloud Composer から実行したいと思います。 また、Dataform を実行する前に、GCS から BigQuery にファイルデータをロードする処理を追加し、ロードしたテーブルのデータからマートテーブルとビューを作成する処理を Dataform で実装してみます。
- サービスの概要 | Cloud Composer ドキュメント
- Cloud Composer の概要 | Cloud Composer ドキュメント
- Cloud Composer で Airflow 環境を立ち上げて、GCS から BigQuery へのデータロードを実行してみた | DevelopersIO
Composer から実行する、以下の Airflow DAG ファイルを作成しました。
from datetime import datetime from airflow import models from airflow.contrib.operators import gcs_to_bq from airflow.providers.google.cloud.operators.dataform import ( DataformCreateCompilationResultOperator, DataformCreateWorkflowInvocationOperator, ) GCS_BUCKET = "dataform-test-us" GCS_OBJECT = "avocado.csv" LOAD_TABLE = "dataform.avocado_load" DAG_ID = "dataform" PROJECT_ID = "cm-da-mikami-yuki-xxxx" REPOSITORY_ID = "dataform_sample" REGION = "us-central1" WORKSPACE_ID = "df-test-mikami" with models.DAG( DAG_ID, schedule_interval='@once', start_date=datetime(2022, 1, 1), catchup=False, tags=['dataform'], ) as dag: # load data to bigquery load_data = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( task_id='load_data', bucket=GCS_BUCKET, source_objects=[GCS_OBJECT], destination_project_dataset_table=LOAD_TABLE, skip_leading_rows=1, write_disposition='WRITE_TRUNCATE', autodetect=True, ) # execute dataform create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": "main", "workspace": ( f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/" f"workspaces/{WORKSPACE_ID}" ), }, ) create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}" }, ) load_data >> create_compilation_result >> create_workflow_invocation
※今回は動作確認億滴のため、スケジュール実行の設定は入れていません。また、一部伏せ字に変更しています。
ロードデータとして、Kaggle データセットからいただいてきた以下のアメリカのアボカドの価格と販売数データを、GCS に格納済みです。
GCS に配置した CSV ファイルデータはこんな感じです。
index,Date,AveragePrice,Total_Volume,_4046,_4225,_4770,Total_Bags,Small_Bags,Large_Bags,XLarge_Bags,type,year,region 0,2015-12-27,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany 1,2015-12-20,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany 2,2015-12-13,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany (省略) 9,2018-01-21,1.87,13766.76,1191.92,2452.79,727.94,9394.11,9351.8,42.31,0.0,organic,2018,WestTexNewMexico 10,2018-01-14,1.93,16205.22,1527.63,2981.04,727.01,10969.54,10919.54,50.0,0.0,organic,2018,WestTexNewMexico 11,2018-01-07,1.62,17489.58,2894.77,2356.13,224.53,12014.15,11988.14,26.01,0.0,organic,2018,WestTexNewMexico
Composer 環境構築および DAG ファイル配置手順は以下をご参照ください。
Composer 実行用のサービスアカウントには、Composer ワーカーと Dataform 編集者に追加して、GCS から BigQuery にデータロードするためのストレージ管理者と BigQuery 管理者ロールを付与しました。
続いて Dataform ワークスペースで、初期化時に自動作成されたサンプルコード(first_view.sqlx
, second_view.sqlx
)を削除し、ロードテーブルから年別&リージョン別のアボカド販売数と平均価格を集計したマートテーブルを作成する以下のコードを、sum_avocado.sqlx というファイル名で保存しました
config { type: "table", columns: { year: "Year of sale", region: "Region of sale", total: "Number of sold", price: "Average price", } } SELECT year, region, SUM(Total_Volume) AS total, AVG(AveragePrice) AS price FROM dataform.avocado_load group by year, region order by year, region
さらに、作成したマートテーブルからサンフランシスコの年別の平均価格を抽出する以下のビュー作成用のコードを、v_avocado_price_sfo.sqlx というファイル名で保存しました。
config { type: "view" } SELECT year, region, price from ${ref("sum_avocado")} WHERE region = 'SanFrancisco'
Composer から DAG を実行し、正常終了されたことを確認します。
BigQuery 管理コンソールからも、マートテーブルとビューが作成できたか確認してみます。
期待通り、Cloud Composer から Dataform を実行して、マートテーブルとビューを作成することができました。
まとめ(所感)
Dataform をさわったことはなく SQLX を書いたこともありませんでしたが、ワークスペース初期化時に自動作成されるのディレクトリ構成やサンプルコードとクイックスタートを参照しながら、簡単に実行することができました。
現状、スケジュール実行する場合は Composer や Workflow などの他のサービスと連携する必要があるようですが、BigQuery のスケジュールドクエリのように Dataform だけでもスケジュール実行できればもっと便利なのではないかと思いました。
もしくは、もし Dataform コードで参照するソーステーブルへのデータロードイベントをトリガに実行することができれば、GCS へのファイル Put イベントトリガに Cloud Functions で BigQuery にデータロードした後 Dataform でマートテーブル作成クエリ実行して・・・などと、イベントドリブンアーキテクチャを妄想してしまいました(現状でも、対象テーブルの Insert トリガー自前で実装して Dataform 実行する UDF たたいたら実現できるかなー?わくわくw
Google Cloud のデータ関連サービスには「Data」で始まるものが多く、どれが何をするものなのか混乱することも多々ありですが、実際に使ったことがあるサービスであれば名前と機能がしっかり結びつくと思います。 前述の通り、Dataform は簡単に実行することができるので、実際のシステムで使う予定がなくても、面白そうと思っていただけましたら、是非さわってみてください。
参考
- Dataform
- SQLX | Dataform
- Overview of Dataform | Dataform ドキュメント
- Create and execute a SQL workflow| Dataform ドキュメント
- サービスの概要 | Cloud Composer ドキュメント
- Cloud Composer の概要 | Cloud Composer ドキュメント
- Cloud Composer 2 で Apache Airflow DAG を実行する | Cloud Composer ドキュメント
- Dataform が Google Cloud の傘下に: BigQuery で SQL を使用してデータ変換をデプロイする | Google Cloud ブログ
- Google quietly acquires Dataform, the UK startup helping businesses manage data warehouses | Google Cloud ブログ
- Dataformとdbtで楽するデータモデリング | Speaker Deck
- 話題のデータパイプラインツール「Dataform」でBigQueryにアクセスしたら何ができるのかを確かめてみた | DevelopersIO