Google Cloud に統合された Dataform を使って BigQuery にテーブル&ビューを作成してみた。

2022.09.26

こんにちは、みかみです。

キッチンの窓の網戸に20cm級のナナフシがとまっていて、思わず見惚れてしまいました。

やりたいこと

  • Google Cloud に統合された Dataform をさわってみたい
  • Dataform をバッチ処理で使いたい
  • Cloud Composer から Dataform を実行したい

Dataform とは

SQL like なコード(SQLX)でテーブルやビュー作成クエリを記述することで、テーブル間の依存関係を管理しながら DWH に SQL を実行できる、データモデリングツールです。 以前は独立した SaaS サービスでしたが、2020年12月に Google 傘下に加わったことにより、現在は BigQuery データのモデリングツールとして、Google Cloud 管理コンソールから実行できるようになりました。

なお、2022/09 現在はまだプレビューのため、ご利用の際にはご留意ください。

Dataform の料金

Dataform 自体は無料で利用することができます。 ただし、BigQuery への SQL 実行にかかる料金や、Composer などのサービスと連携して利用する場合は連携サービスの利用料金が必要になります。

前提

課金が有効な Google Cloud プロジェクトがあり、Dataform API は有効化済みです。 また、Dataform リポジトリやワークスペースの作成、その他操作に必要な権限は持っているものとします。

Dataform リポジトリとワークスペースを準備

Google Cloud 管理コンソール Dataform 画面の「リポジトリを作成」リンクから、新規リポジトリを作成します。 リポジトリ ID に任意のリポジトリ名を入力して、プルダウンからリージョンを選択します。 現時点では、リージョンは europe-west4 または us-central1 のどちらかを選択可能なようです。

リポジトリが正常に作成できました。

続いてワークスペースを作成します。 リポジトリ一覧から作成したリポジトリのリンクをクリックし、「CREATE DEVELOPMENT WORKSPACE」リンクから、新規ワークスペースを作成します。

任意のワークスペース ID を入力して「CREATE」

作成したワークスペース に入り、「INITIALIZE WORKSPACE」ボタンからワークスペースを初期化します。

初期化が完了すると、必要な構成ファイルやサンプルコードが自動作成されました。

Dataform を手動実行

まずは、先ほど自動作成されたサンプルコードを手動実行してみます。

クイックスタートの記載にしたがって、Google Cloud 管理コンソール IAM 画面の「追加」リンクから、Dataform 実行用サービスアカウントを追加して、必要なロールを付与しました。

Dataform のワークスペース画面に戻り、画面上部の「START EXECUTION」プルダウンから「すべてのアクション」をクリック。

実行内容を確認して、「START EXECUTION」ボタンをクリック。

実行ログは、リポジトリ画面「WORKFLOW EXECUTION LOGS」タブから確認できます。

「詳細を表示」リンクからは、実際に BigQuery に対して実行されたクエリも確認できます。

クエリによると、US マルチリージョンの dataform データセットを作成し、サンプルコードで定義されていた first_viewsecond_view 2 つのビューを作成したようです。

BigQuery コンソールからも確認してみます。

期待通り、データセットとビューが新規作成されたことが確認できました。

Dataform を Cloud Composer から実行

管理コンソールからの手動実行は確認できましたが、実際のシステムではスケジューリングされたバッチ処理などで実行するケースが多いのではないでしょうか。 また、Dataform 単体ではなく、BigQuery にロードしたファイルデータに対して SQL を実行して、マートテーブルやビューを作成するようなユースケースが想定されるのではないかと思います。

ということで、Datafrom を Cloud Composer から実行したいと思います。 また、Dataform を実行する前に、GCS から BigQuery にファイルデータをロードする処理を追加し、ロードしたテーブルのデータからマートテーブルとビューを作成する処理を Dataform で実装してみます。

Composer から実行する、以下の Airflow DAG ファイルを作成しました。

from datetime import datetime
from airflow import models
from airflow.contrib.operators import gcs_to_bq
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

GCS_BUCKET = "dataform-test-us"
GCS_OBJECT = "avocado.csv"
LOAD_TABLE = "dataform.avocado_load"

DAG_ID = "dataform"
PROJECT_ID = "cm-da-mikami-yuki-xxxx"
REPOSITORY_ID = "dataform_sample"
REGION = "us-central1"
WORKSPACE_ID = "df-test-mikami"


with models.DAG(
    DAG_ID,
    schedule_interval='@once',
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=['dataform'],
) as dag:

    # load data to bigquery
    load_data = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
        task_id='load_data',
        bucket=GCS_BUCKET,
        source_objects=[GCS_OBJECT],
        destination_project_dataset_table=LOAD_TABLE,
        skip_leading_rows=1,
        write_disposition='WRITE_TRUNCATE',
        autodetect=True,
    )

    # execute dataform
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

load_data >> create_compilation_result >> create_workflow_invocation

※今回は動作確認億滴のため、スケジュール実行の設定は入れていません。また、一部伏せ字に変更しています。

ロードデータとして、Kaggle データセットからいただいてきた以下のアメリカのアボカドの価格と販売数データを、GCS に格納済みです。

GCS に配置した CSV ファイルデータはこんな感じです。

index,Date,AveragePrice,Total_Volume,_4046,_4225,_4770,Total_Bags,Small_Bags,Large_Bags,XLarge_Bags,type,year,region
0,2015-12-27,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
(省略)
9,2018-01-21,1.87,13766.76,1191.92,2452.79,727.94,9394.11,9351.8,42.31,0.0,organic,2018,WestTexNewMexico
10,2018-01-14,1.93,16205.22,1527.63,2981.04,727.01,10969.54,10919.54,50.0,0.0,organic,2018,WestTexNewMexico
11,2018-01-07,1.62,17489.58,2894.77,2356.13,224.53,12014.15,11988.14,26.01,0.0,organic,2018,WestTexNewMexico

Composer 環境構築および DAG ファイル配置手順は以下をご参照ください。

Composer 実行用のサービスアカウントには、Composer ワーカーと Dataform 編集者に追加して、GCS から BigQuery にデータロードするためのストレージ管理者と BigQuery 管理者ロールを付与しました。

続いて Dataform ワークスペースで、初期化時に自動作成されたサンプルコード(first_view.sqlx, second_view.sqlx)を削除し、ロードテーブルから年別&リージョン別のアボカド販売数と平均価格を集計したマートテーブルを作成する以下のコードを、sum_avocado.sqlx というファイル名で保存しました

config {
  type: "table",
  columns: {
    year: "Year of sale",
    region: "Region of sale",
    total: "Number of sold",
    price: "Average price",
  }
}

SELECT
    year,
    region,
    SUM(Total_Volume) AS total,
    AVG(AveragePrice) AS price
FROM
    dataform.avocado_load
group by
    year,
    region
order by
    year,
    region

さらに、作成したマートテーブルからサンフランシスコの年別の平均価格を抽出する以下のビュー作成用のコードを、v_avocado_price_sfo.sqlx というファイル名で保存しました。

config { type: "view" }

SELECT
    year,
    region,
    price
from
    ${ref("sum_avocado")}
WHERE
    region = 'SanFrancisco'

Composer から DAG を実行し、正常終了されたことを確認します。

BigQuery 管理コンソールからも、マートテーブルとビューが作成できたか確認してみます。

期待通り、Cloud Composer から Dataform を実行して、マートテーブルとビューを作成することができました。

まとめ(所感)

Dataform をさわったことはなく SQLX を書いたこともありませんでしたが、ワークスペース初期化時に自動作成されるのディレクトリ構成やサンプルコードとクイックスタートを参照しながら、簡単に実行することができました。

現状、スケジュール実行する場合は Composer や Workflow などの他のサービスと連携する必要があるようですが、BigQuery のスケジュールドクエリのように Dataform だけでもスケジュール実行できればもっと便利なのではないかと思いました。

もしくは、もし Dataform コードで参照するソーステーブルへのデータロードイベントをトリガに実行することができれば、GCS へのファイル Put イベントトリガに Cloud Functions で BigQuery にデータロードした後 Dataform でマートテーブル作成クエリ実行して・・・などと、イベントドリブンアーキテクチャを妄想してしまいました(現状でも、対象テーブルの Insert トリガー自前で実装して Dataform 実行する UDF たたいたら実現できるかなー?わくわくw

Google Cloud のデータ関連サービスには「Data」で始まるものが多く、どれが何をするものなのか混乱することも多々ありですが、実際に使ったことがあるサービスであれば名前と機能がしっかり結びつくと思います。 前述の通り、Dataform は簡単に実行することができるので、実際のシステムで使う予定がなくても、面白そうと思っていただけましたら、是非さわってみてください。

参考