
pytestでBigQueryのフィクスチャーを使って簡単にSmallテストをする
はじめに
データ事業本部のkobayashiです。
Google BigQueryを使用したアプリケーションを開発する際、ローカル環境でのSmallテストが課題になることがあります。本番環境のBigQueryを使用するとコストがかかり、またテストデータの管理も複雑になっていまします。
そのような課題を解決するためにPytestのプラグインであるpytest-databasesを使用して、BigQueryのテストを効率的に行う方法を試してみました。
pytest-databasesとは
pytest-databasesは、pytestテストのための「すぐに使えるデータベースフィクスチャ」を提供するプラグインです。PostgreSQL、MySQL、BigQueryなど、様々なデータベースをサポートしています。仕組みとしてはDockerコンテナを使用してBigQueryエミュレータを起動し、分離されたテスト環境でpytestを実行する仕組みになります。
主な特徴:
- Pytestのフィクスチャーとして指定することで簡単に使える
- 様々なデータベースに対応
- 対応している主なデータベース
- Postgres: Version 12, 13, 14, 15, 16 and 17 are available
- MySQL: Version 5.6, 5.7 and 8 are available
- Oracle: Version 18c XE and 23C Free are available
- SQL Server: Version 2022 is available
- Google AlloyDB Omni: Simplified Omni installation for easy testing.
- Google Spanner: The latest cloud-emulator from Google is available
- Google BigQuery: Unofficial BigQuery emulator
- Redis: Latest version
- Valkey: Latest version
- Elasticsearch: Version 7 and 8 are available
- Azure blob storage: Via azurite
 
- 分離されたテスト環境をDockerベースで実現
pytest-databasesを使ったBigQueryテスト
では早速BigQueryテストを行ってみます。
環境
今回使用した環境は以下の通りです。
Python 3.11.5
pytest 8.3.1
pytest-databases 0.14.0
google-cloud-bigquery  3.11.4
Docker 28.1.1
インストール
BigQueryのモジュールとpytest-databasesプラグインをインストールします。
$ pip install google-cloud-bigquery
$ pip install "pytest-databases[bigquery]"
基本的な使い方
はじめにテスト対象となるBigQueryを使用するアプリケーションコードを用意します。
from google.cloud import bigquery
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class SalesAnalyzer:
    """売上データを分析するクラス"""
    def __init__(self, client: bigquery.Client, dataset_id: str):
        self.client = client
        self.dataset_id = dataset_id
    def create_sales_table(self) -> None:
        """売上テーブルを作成"""
        table_id = f"{self.client.project}.{self.dataset_id}.sales"
        schema = [
            bigquery.SchemaField("sale_id", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("product_name", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("amount", "FLOAT", mode="REQUIRED"),
            bigquery.SchemaField("sale_date", "DATE", mode="REQUIRED"),
            bigquery.SchemaField("region", "STRING", mode="REQUIRED"),
        ]
        table = bigquery.Table(table_id, schema=schema)
        table = self.client.create_table(table)
        logger.info(f"テーブル {table_id} を作成しました")
    def insert_sales_data(self, sales_data: List[Dict[str, Any]]) -> None:
        """売上データを挿入"""
        table_id = f"{self.client.project}.{self.dataset_id}.sales"
        errors = self.client.insert_rows_json(table_id, sales_data)
        if errors:
            raise Exception(f"データ挿入エラー: {errors}")
        logger.info(f"{len(sales_data)}件のデータを挿入しました")
    def get_sales_data(self) -> List[Dict[str, Any]]:
        """地域別売上サマリーを取得"""
        query = f"""
        SELECT 
            *
        FROM `{self.client.project}.{self.dataset_id}.sales`
        """
        query_job = self.client.query(query)
        results = query_job.result()
        return [dict(row) for row in results]
    def get_regional_sales_summary(self) -> List[Dict[str, Any]]:
        """地域別売上サマリーを取得"""
        query = f"""
        SELECT 
            region,
            COUNT(*) as transaction_count,
            SUM(amount) as total_sales,
            AVG(amount) as avg_sales,
            MAX(amount) as max_sale,
            MIN(amount) as min_sale
        FROM `{self.client.project}.{self.dataset_id}.sales`
        GROUP BY region
        ORDER BY total_sales DESC
        """
        query_job = self.client.query(query)
        results = query_job.result()
        return [dict(row) for row in results]
    def get_monthly_sales_trend(self) -> List[Dict[str, Any]]:
        """月別売上トレンドを取得"""
        query = f"""
        SELECT 
            FORMAT_DATE('%Y-%m', sale_date) as month,
            COUNT(*) as transaction_count,
            SUM(amount) as total_sales
        FROM `{self.client.project}.{self.dataset_id}.sales`
        GROUP BY month
        ORDER BY month
        """
        query_job = self.client.query(query)
        results = query_job.result()
        return [dict(row) for row in results]
    def get_top_products(self, limit: int = 10) -> List[Dict[str, Any]]:
        """売上上位の商品を取得"""
        query = f"""
        SELECT 
            product_name,
            COUNT(*) as sales_count,
            SUM(amount) as total_revenue,
            ROUND(AVG(amount), 2) as avg_price
        FROM `{self.client.project}.{self.dataset_id}.sales`
        GROUP BY product_name
        ORDER BY total_revenue DESC
        LIMIT {limit}
        """
        query_job = self.client.query(query)
        results = query_job.result()
        return [dict(row) for row in results]
処理内容としては
- データセットに売上テーブルを作成する
- 作成した売上テーブルに売上データをInsertする
- 売上テーブルのデータを取得する
- 地域別売上サマリーを取得する
- 月別売上トレンドを取得する
- 売上上位の商品を取得する
です。
テストの作成
ではこのスクリプトのテストを記述していきます。
まず、conftest.pyでpytestプラグインを有効にする必要があります。
pytest_plugins = ["pytest_databases.docker.bigquery"]
次に pytest-databasesを使用してBigQueryのテストを作成します。
import pytest
from google.cloud import bigquery
from pytest_databases.docker.bigquery import BigQueryService
from bigquery_app import SalesAnalyzer
import os
@pytest.fixture
def bigquery_client(bigquery_service: BigQueryService):
    """BigQueryクライアントを作成するフィクスチャ"""
    # エミュレータの接続情報を環境変数に設定
    os.environ["BIGQUERY_EMULATOR_HOST"] = (
        f"http://{bigquery_service.host}:{bigquery_service.port}"
    )
    client = bigquery.Client(
        project=bigquery_service.project,
        # エミュレータ使用時は認証情報をスキップ
        credentials=None,
    )
    return client
@pytest.fixture
def test_dataset(bigquery_client: bigquery.Client):
    """テスト用データセットを作成するフィクスチャ"""
    dataset_id = "test_sales_dataset"
    dataset = bigquery.Dataset(f"{bigquery_client.project}.{dataset_id}")
    dataset.location = "US"
    # データセットを作成
    dataset = bigquery_client.create_dataset(dataset, exists_ok=True)
    yield dataset_id
    # テスト後にクリーンアップ
    bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
def test_create_sales_table(bigquery_client: bigquery.Client, test_dataset: str):
    """売上テーブルの作成をテスト"""
    analyzer = SalesAnalyzer(bigquery_client, test_dataset)
    # テーブルを作成
    analyzer.create_sales_table()
    # テーブルが存在することを確認
    table_id = f"{bigquery_client.project}.{test_dataset}.sales"
    table = bigquery_client.get_table(table_id)
    assert table is not None
    assert len(table.schema) == 5
    assert table.schema[0].name == "sale_id"
    assert table.schema[1].name == "product_name"
    assert table.schema[2].name == "amount"
    assert table.schema[3].name == "sale_date"
    assert table.schema[4].name == "region"
def test_insert_and_query_sales_data(
    bigquery_client: bigquery.Client, test_dataset: str
):
    """売上データの挿入とクエリをテスト"""
    analyzer = SalesAnalyzer(bigquery_client, test_dataset)
    # テーブルを作成
    analyzer.create_sales_table()
    # テストデータを準備
    test_sales_data = [
        {
            "sale_id": "S001",
            "product_name": "ノートPC",
            "amount": 120000.0,
            "sale_date": "2024-01-15",
            "region": "関東",
        },
        {
            "sale_id": "S002",
            "product_name": "モニター",
            "amount": 35000.0,
            "sale_date": "2024-01-16",
            "region": "関西",
        },
        {
            "sale_id": "S003",
            "product_name": "キーボード",
            "amount": 8000.0,
            "sale_date": "2024-01-17",
            "region": "関東",
        },
        {
            "sale_id": "S004",
            "product_name": "ノートPC",
            "amount": 115000.0,
            "sale_date": "2024-02-01",
            "region": "関西",
        },
        {
            "sale_id": "S005",
            "product_name": "マウス",
            "amount": 3500.0,
            "sale_date": "2024-02-05",
            "region": "中部",
        },
    ]
    # データを挿入
    analyzer.insert_sales_data(test_sales_data)
    # 地域別売上サマリーを取得
    regional_summary = analyzer.get_regional_sales_summary()
    assert len(regional_summary) == 3  # 関東、関西、中部
    # 関東の売上を確認
    kanto_sales = next(r for r in regional_summary if r["region"] == "関東")
    assert kanto_sales["transaction_count"] == 2
    assert kanto_sales["total_sales"] == 128000.0
    assert kanto_sales["avg_sales"] == 64000.0
    # 関西の売上を確認
    kansai_sales = next(r for r in regional_summary if r["region"] == "関西")
    assert kansai_sales["transaction_count"] == 2
    assert kansai_sales["total_sales"] == 150000.0
def test_monthly_sales_trend(bigquery_client: bigquery.Client, test_dataset: str):
    """月別売上トレンドの取得をテスト"""
    analyzer = SalesAnalyzer(bigquery_client, test_dataset)
    # テーブルを作成
    analyzer.create_sales_table()
    # 複数月のテストデータを準備
    test_sales_data = [
        {
            "sale_id": f"S{i:03d}",
            "product_name": "商品A",
            "amount": 10000.0 * (i % 3 + 1),
            "sale_date": f"2024-{(i % 3) + 1:02d}-{(i % 28) + 1:02d}",
            "region": "関東",
        }
        for i in range(15)
    ]
    analyzer.insert_sales_data(test_sales_data)
    # 月別トレンドを取得
    monthly_trend = analyzer.get_monthly_sales_trend()
    assert len(monthly_trend) == 3  # 1月、2月、3月
    assert all("month" in record for record in monthly_trend)
    assert all("total_sales" in record for record in monthly_trend)
    # 月の順序が正しいことを確認
    months = [record["month"] for record in monthly_trend]
    assert months == sorted(months)
def test_top_products(bigquery_client: bigquery.Client, test_dataset: str):
    """売上上位商品の取得をテスト"""
    analyzer = SalesAnalyzer(bigquery_client, test_dataset)
    # テーブルを作成
    analyzer.create_sales_table()
    # 商品別のテストデータを準備
    products = ["ノートPC", "モニター", "キーボード", "マウス", "USBメモリ"]
    test_sales_data = []
    for i in range(20):
        test_sales_data.append(
            {
                "sale_id": f"S{i:03d}",
                "product_name": products[i % len(products)],
                "amount": 10000.0 * (len(products) - (i % len(products))),
                "sale_date": "2024-01-15",
                "region": "関東",
            }
        )
    analyzer.insert_sales_data(test_sales_data)
    # 上位3商品を取得
    top_products = analyzer.get_top_products(limit=3)
    assert len(top_products) == 3
    assert top_products[0]["product_name"] == "ノートPC"
    assert top_products[0]["sales_count"] == 4
    assert top_products[0]["total_revenue"] == 200000.0
    # 売上順になっていることを確認
    revenues = [p["total_revenue"] for p in top_products]
    assert revenues == sorted(revenues, reverse=True)
先ほど作成したアプリケーションファイルの各処理
- データセットに売上テーブルを作成する
- 作成した売上テーブルに売上データをInsertする
- 地域別売上サマリーを取得する
- 月別売上トレンドを取得する
- 売上上位の商品を取得する
に対してテストを記述しています。
pytest-databasesの使い方ですが、はじめに以下の2つのPytestのフィクスチャーを作成します。
- bigquery_clientでBiqQueryClientを作成する
- test_datasetでテスト用データセットを作成する
ここで作成したフィクスチャーを使って
analyzer = SalesAnalyzer(bigquery_client, test_dataset)
の形で売上データを分析するクラスをインスタンス化しそれぞれの関数のテストを実行しています。
テストの実行
pytest-databasesはDockerベースで動くのでテストを実行する前に、Dockerが起動していることを確認します。
$ docker --version
Docker version 28.1.1, build 4eba377
$ pytest test_bigquery_app.py -v
============================= test session starts ==============================
collecting ... collected 4 items
test_bigquery_app.py::test_create_sales_table PASSED                     [ 25%]
test_bigquery_app.py::test_insert_and_query_sales_data PASSED            [ 50%]
test_bigquery_app.py::test_monthly_sales_trend PASSED                    [ 75%]
test_bigquery_app.py::test_top_products PASSED                           [100%]
============================== 4 passed in 10.77s ==============================
このような形で実際のBiqQueryのリソースを使わずにテストを実行することができます。
複数のデータセットを使用したテスト
複数のデータセットやプロジェクトをまたいだテストも可能です。
import pytest
from google.cloud import bigquery
@pytest.fixture
def multi_datasets(bigquery_client: bigquery.Client):
    """複数のデータセットを作成"""
    datasets = ["raw_data", "processed_data", "analytics"]
    created_datasets = []
    for ds_name in datasets:
        dataset = bigquery.Dataset(f"{bigquery_client.project}.{ds_name}")
        dataset.location = "US"
        dataset = bigquery_client.create_dataset(dataset, exists_ok=True)
        created_datasets.append(ds_name)
    yield created_datasets
    # クリーンアップ
    for ds_name in created_datasets:
        bigquery_client.delete_dataset(ds_name, delete_contents=True, not_found_ok=True)
def test_cross_dataset_query(bigquery_client: bigquery.Client, multi_datasets):
    """複数データセット間のクエリをテスト"""
    # raw_dataにテーブルを作成
    table_id = f"{bigquery_client.project}.raw_data.source_table"
    schema = [
        bigquery.SchemaField("id", "STRING"),
        bigquery.SchemaField("value", "INTEGER"),
    ]
    table = bigquery.Table(table_id, schema=schema)
    bigquery_client.create_table(table)
    # データを挿入
    rows = [{"id": "1", "value": 100}, {"id": "2", "value": 200}]
    bigquery_client.insert_rows_json(table_id, rows)
    # processed_dataに集計結果を作成
    query = f"""
    CREATE OR REPLACE TABLE `{bigquery_client.project}.processed_data.summary` AS
    SELECT 
        COUNT(*) as record_count,
        SUM(value) as total_value
    FROM `{bigquery_client.project}.raw_data.source_table`
    """
    query_job = bigquery_client.query(query)
    query_job.result()  # クエリの完了を待つ
    # 結果を確認
    result_query = f"""
    SELECT * FROM `{bigquery_client.project}.processed_data.summary`
    """
    results = list(bigquery_client.query(result_query).result())
    assert len(results) == 1
    assert results[0]["record_count"] == 2
    assert results[0]["total_value"] == 300
$ pytest test_multi_dataset.py -v
============================= test session starts ==============================
collecting ... collected 1 item
test_multi_dataset.py::test_cross_dataset_query PASSED                   [100%]
============================== 1 passed in 1.40s ===============================
まとめ
pytest-databasesを使用することで、BigQueryのテストを効率的に行えることがわかりました。特に以下の点が便利だと感じました。
- Dockerベースのエミュレータにより、本番環境を使わずにローカルでテストが可能
- pytest標準のフィクスチャとして提供されるため、既存のテストフレームワークとの統合が容易
- セットアップとクリーンアップが自動化されており、テストの独立性が保たれる
- 実際のBigQuery APIと同じインターフェースでテストが書ける
BigQueryを使用したアプリケーションの開発において、テスト環境の構築は重要な課題です。pytest-databasesを活用することで、コストを抑えながら信頼性の高いテストを実装できるようになります。
最後まで読んで頂いてありがとうございました。












