[Python] ijsonライブラリで大容量JSON(7GB)をストリーミング処理してみた

[Python] ijsonライブラリで大容量JSON(7GB)をストリーミング処理してみた

2025.10.31

こんにちは。サービス開発室の武田です。

AWS Price List APIのJSONファイルは巨大です。EC2の価格リストは約7GBもあります(2025年10月時点で6.94GB)。どのようにこのファイルをLambda環境で処理しようかと困っていたのですが、Pythonのストリーミング型JSONパーサ「ijson」を使うことで解決できました。今回は実際のプロジェクトでの活用事例と実装パターンをまとめます。

やりたかったこと

AWS Price List APIから最新の価格情報を定期的に取得して、EC2インスタンスタイプの情報を抽出・分析するサーバーレスシステムを作っていました。ただ、提供されるJSONファイルの大きさが想像以上で、従来の方法では処理できません。

EC2価格リストのJSONファイルは次のような特徴があります(リンク先は巨大なので注意)。

https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json

  • ファイルサイズ: 約7GB(2025年10月時点で6.94GB)
    • 2015年: 39.90 MB
    • 2016年: 43.04 MB
    • 2017年: 96.47 MB
    • 2018年: 204.50 MB
    • 2019年: 582.61 MB
    • 2020年: 1.22 GB
    • 2021年: 1.54 GB
    • 2022年: 2.59 GB
    • 2023年: 3.71 GB
    • 2024年: 5.29 GB
    • 2025年: 6.94 GB(10月時点)
    • 新しいインスタンスタイプやリージョンの追加により年々増加傾向
  • データ量: 約40万件の製品情報、1000万行以上
  • 更新頻度: 非定期(新インスタンスタイプ追加時など)

Lambda環境でこのファイルを処理しようとすると、いくつか問題が出てきます。

  • ファイル全体をメモリに読み込むと数GBのメモリを消費する
  • JSON全体のパースに時間がかかる
  • メモリ使用量とCPU時間に応じて課金される
  • 実際に必要なのはインスタンスタイプ名だけなのに全部読む必要がある

ijsonライブラリ

ijsonは、Pythonで大容量JSONファイルをストリーミング処理するためのライブラリです。ファイル全体をメモリに読み込まず、イベントストリーム形式で逐次的にパースしてくれます。

次の特徴があります。

  • ファイルサイズにかかわらず、一定のメモリ使用量
  • 必要なデータが見つかった時点で処理を停止できる
  • パフォーマンスに応じた実装を自動選択
    • yajl2_c: 最速(C拡張)
    • yajl2_cffi: 高速(CFFI経由)
    • yajl2: 中速(ctypes経由)
    • python: 低速(純Python実装)

依存関係

[project]
dependencies = [
    "ijson>=3.4.0",
]

ijsonは利用可能な最速のバックエンドを自動選択します。私たちのプロジェクトでは、yajl2_cバックエンド(最速)が利用されています。

実装パターン

パターン1: メタデータのみ抽出(早期終了)

重複処理を避けるため、まずファイルのバージョン情報のみを取得します。

import ijson
import urllib.request

def extract_metadata_only(url: str, timeout: int = 60) -> dict[str, str | None]:
    """Extract only version and publicationDate from AWS Price List JSON.

    早期終了により、数GBのファイルから数KB分のみ読み取って処理完了。
    """
    with urllib.request.urlopen(url, timeout=timeout) as response:
        version = None
        publication_date = None

        # イベントストリーム形式でパース
        parser = ijson.parse(response)

        for prefix, event, value in parser:
            # トップレベルのversionフィールドを検出
            if prefix == "version" and event == "string":
                version = value
            # トップレベルのpublicationDateフィールドを検出
            elif prefix == "publicationDate" and event == "string":
                publication_date = value

            # 両方見つかったら即座に終了(重要!)
            if version and publication_date:
                break

        return {
            "version": version,
            "publicationDate": publication_date,
        }

このパターンの利点は、ファイル先頭数KB分だけ読んで終われることです。約7GBのダウンロードが不要になるので、処理時間は数秒以内、メモリ使用量も数MB程度で済みます。重複バージョンの場合はフル処理をスキップできるので効率的です。

パターン2: 特定データのストリーミング抽出

必要なデータ(インスタンスタイプ名)のみを抽出します。

def stream_extract_instance_types(url: str, timeout: int = 900) -> dict[str, Any]:
    """Extract instance types from AWS Price List JSON using streaming parser.

    メモリに全体を展開せず、必要な情報のみを逐次抽出。
    """
    with urllib.request.urlopen(url, timeout=timeout) as response:
        instance_types = set()
        version = None
        publication_date = None
        total_products = 0
        instance_products = 0

        parser = ijson.parse(response)
        current_product_family = None

        for prefix, event, value in parser:
            # メタデータ取得
            if prefix == "version" and event == "string":
                version = value
            elif prefix == "publicationDate" and event == "string":
                publication_date = value

            # products配下の処理
            elif prefix.startswith("products."):
                parts = prefix.split(".")

                # products.{SKU} の終了(1製品完了)
                if len(parts) == 2 and event == "end_map":
                    total_products += 1
                    current_product_family = None

                # products.{SKU}.productFamily の値
                elif len(parts) == 3 and parts[2] == "productFamily" and event == "string":
                    current_product_family = value
                    # "Compute Instance" または "Compute Instance (bare metal)"
                    if value.startswith("Compute Instance"):
                        instance_products += 1

                # products.{SKU}.attributes.instanceType の値
                elif (
                    len(parts) == 4
                    and parts[2] == "attributes"
                    and parts[3] == "instanceType"
                    and event == "string"
                ):
                    # Compute Instanceファミリーの場合のみ追加
                    if current_product_family and current_product_family.startswith("Compute Instance"):
                        instance_types.add(value)

        return {
            "instanceTypes": sorted(instance_types),
            "version": version,
            "publicationDate": publication_date,
            "totalProducts": total_products,
            "instanceProducts": instance_products,
        }

メモリ使用量は数十MB程度で、ファイルサイズに依存しません。約40万製品から数千のインスタンスタイプを抽出しますが、必要なデータのみをsetで保持するので重複も自動で除去されます。メタデータも同時に取得できるので、追加のHTTPリクエストも不要です。

Lambda関数での実装

実際にLambda関数でこれらのパターンを組み合わせてみました。

def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
    """Lambda handler for EC2 pricing data extraction."""
    url = "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"

    # ステップ1: メタデータのみ抽出(軽量・高速)
    metadata = extract_metadata_only(url)
    version = metadata["version"]

    # バージョンチェック: 既に処理済みならスキップ
    if check_version_exists(s3_client, bucket, version):
        return {"skipped": True, "version": version}

    # ステップ2: フルストリーミング処理(必要な場合のみ)
    extracted = stream_extract_instance_types(url)
    instance_types = extracted["instanceTypes"]

    # S3へ保存
    save_full_list(s3_client, bucket, version, instance_types)

    return {
        "success": True,
        "version": version,
        "totalCount": len(instance_types)
    }

まずメタデータだけ取得して、バージョンが変わっていなければ処理をスキップします。新しいバージョンの場合だけフルでストリーミング処理を実行する形です。

パフォーマンス比較

従来の方法とijsonでどれくらい違うのか比較してみました。

従来の方法(json.load)

# ❌ メモリ効率が悪い
with urllib.request.urlopen(url) as response:
    data = response.read().decode("utf-8")  # 約7GBをメモリに展開
    price_list = json.loads(data)           # さらにPythonオブジェクトに展開

この方法だと、Pythonオブジェクトへの展開時にメモリが数倍に膨れ上がります。想定では数十GB以上が必要になってしまい、Lambda最大メモリ10GBでは処理できません。

ijsonの方法(ストリーミング)

# ✅ メモリ効率的
with urllib.request.urlopen(url) as response:
    parser = ijson.parse(response)  # ストリーミング処理
    # 逐次的にイベントを処理

ijsonではメモリ使用量は数十MB程度で一定です。ファイルダウンロードと同時並行でパースが進むので、Lambda設定は256MB〜512MBで十分動きます。

効果まとめ

指標 json.load ijson 改善効果
メモリピーク 数十GB以上(処理不可) 60MB程度 99%以上削減
Lambda実行時間 - 数分程度 処理可能に
Lambda実行コスト 不可能 大幅削減

まとめ

ijsonを使うことで、Lambda環境でも7GBのJSONファイルを処理できました。最後にどんなときijsonを使うべきか整理しておきます。

ijsonを使うべきケース

  • 大容量JSONファイル(数十MB以上、特にGB級)
  • 必要なデータが一部のみ
  • Lambda等のメモリ制約がある環境
  • 早期終了が可能な処理
  • ストリーミング処理が可能なデータ構造

従来の方法(json.load)でいいケース

  • 小容量JSONファイル(数MB以下)
  • データ全体が必要
  • ランダムアクセスが必要
  • JSON全体の構造解析が必要
  • シンプルな実装を優先

実際に得られた効果

今回のプロジェクトでは次の効果が得られました。

  1. メモリ効率: 99%以上削減(想定数十GB以上 → 60MB程度)
  2. 処理可能性: 従来は不可能だった処理が可能に
  3. Lambda設定: 最小構成で処理可能(256MB〜512MB)
  4. コスト: Lambda実行コストを大幅削減
  5. 重複処理回避: メタデータ早期抽出により、不要な処理をスキップ

同じような課題に直面している方の参考になれば幸いです。

参考資料

この記事をシェアする

FacebookHatena blogX

関連記事