Amazon DyndmoDBにたまったデータをDuckDBに取り込んでみた

Amazon DyndmoDBにたまったデータをDuckDBに取り込んでみた

Clock Icon2025.02.28

はじめに

データ事業本部のkobayashiです。Amazon DynamoDBのテーブルに蓄積したデータをローカル端末上でSQLを使ってどうにかできないかと考えていたところローカルのDuckDBに取り込んで分析してみたらどうかと思い試してみたのでその内奥をまとめます。

DuckBDとは

DuckDBは、分析処理に特化したエンベデッド型のリレーショナルデータベースシステムです。SQLiteのように単一のプロセス内で動作しますが、OLAP(Online Analytical Processing)ワークロードに最適化されているのが特徴です。

  • プロセス内で動作し外部サーバーが不要
  • カラム指向ストレージでベクトル化実行による並列処理を行うため分析クエリを高速に処理
  • Pandas、Apache ArrowなどのPythonデータ分析ライブラリとシームレスに連携
  • 標準SQLに準拠しているのでSQL文で分析が可能
  • メモリ効率が高く、大規模データセットでも高いパフォーマンスを発揮

といった特徴があり、 データサイエンティストやアナリストにとって、PandasデータフレームとSQLの両方を活用できる点が魅力的です。また、インストールも簡単でPythonではpip install duckdbだけで始められることも利点です。

DynamoDBのデータをDuckDBに取り込む

では早速DynamoDBのデータをDuckDBに取り込んでみます。
今回行うのは愚直にDynamoDBのデータをScanしてDuckDBに取り込む方法です。この方法だとDynamoDBのScanを使っているのでキャパシティユニットを消費するので注意が必要なので、DynamoDBからAmazon S3 へのデータエクスポート機能を使ったほうが良いかと思います。

DynamoDBからDuckDBに取り込むコードになります。今回はListやMapは除外してスカラー型の数値、文字列、Boolのみを取り込むようにしています。

d2d.py
import boto3
import pandas as pd
import duckdb
from typing import List, Dict, Any

class DuckDBManager:
    def __init__(self, db_path: str, is_read_only: bool = True):
        """
        Args:
            db_path: データベースファイルのパス
            is_read_only: コネクションのタイプ
        """
        self.db_path = db_path
        self.is_read_only = is_read_only
        self.connection = None

    def __enter__(self):
        """コンテキストマネージャーの開始でデータベースに接続"""
        try:
            self.connection = duckdb.connect(self.db_path, read_only=self.is_read_only)
        except Exception as e:
            print(e)
            raise
        return self.connection

    def __exit__(self, exc_type, exc_val, exc_tb):
        """コンテキストマネージャーの終了で接続を閉じる"""
        if self.connection:
            try:
                self.connection.close()
            except Exception as e:
                print(e)

def _deserialize_dynamodb_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """DynamoDBのデータ型をPythonのネイティブ型に変換"""
    result = {}
    for key, value in item.items():
        # DynamoDBの型に応じて変換
        if "S" in value:  # 文字列
            result[key] = value["S"]
        elif "N" in value:  # 数値
            result[key] = float(value["N"])
        elif "BOOL" in value:  # ブール値
            result[key] = value["BOOL"]
    return result

def fetch_from_dynamodb(table_name: str) -> List[Dict[str, Any]]:
    """DynamoDBからデータを取得"""
    dynamodb = boto3.client("dynamodb")

    items = []
    paginator = dynamodb.get_paginator("scan")

    try:
        # ページネーションを使用して全データを取得
        for page in paginator.paginate(TableName=table_name):
            for item in page["Items"]:
                deserialized_item = _deserialize_dynamodb_item(item)
                items.append(deserialized_item)

        return items

    except Exception as e:
        print(f"Error fetching data from DynamoDB: {e}")
        raise

def load_to_duckdb(db_path: str, table_name: str):
    """データをDuckDBに読み込む"""
    try:
        data = fetch_from_dynamodb(table_name)
        # DataFrameに変換
        df = pd.DataFrame(data)

        with DuckDBManager(db_path, False) as conn:
            # DuckDBにテーブルとして保存
            conn.register("temp_view", df)
            conn.execute(f"""
                CREATE TABLE IF NOT EXISTS {table_name} AS 
                SELECT * FROM temp_view
            """)

    except Exception as e:
        print(f"Error loading data to DuckDB: {e}")
        raise

def main():
    load_to_duckdb("dynamodb.db", "テーブル名")

if __name__ == "__main__":
    main()

上記のスクリプトを実行することでDyanmoDBのテーブルをローカル端末のDuckDBとしてdynamodb.dbファイルに取り込むことができます。
DuckDBManagerクラスは後続のDuckDBでSQLを使う際に再度使うためクラス化してwithブロック内で安全にDuckDBにクエリを実行出来るようにしてあります。

DynamoDBから取り込んだデータにDuckDBでSQLを使ってみた

では取り込んだデータに対してDuckDBでSQLを実行してみます。SQLを実行するには先程取り込みで使用したDuckDBManagerクラスを使って実行します。

例1: 日付別のアクティビティ集計

table_name = "{DuckDB上のテーブル名}"

with DuckDBManager("dynamodb.db") as conn:
    # DuckDBでクエリを実行して結果をDataFrameに変換
    result = conn.execute(f"""
        SELECT 
            DATE_TRUNC('day', start_datetime::TIMESTAMP) as activity_date,
            COUNT(*) as activity_count,
            COUNT(DISTINCT user_id) as unique_users
        FROM {table_name}
        GROUP BY DATE_TRUNC('day', start_datetime::TIMESTAMP)
        ORDER BY activity_date DESC
        LIMIT 7;
    """).fetchdf()
    print(result)

実行結果

  activity_date  activity_count  unique_users
0    2025-02-20             225           220
1    2025-02-19             221           219
2    2025-02-18             225           220
3    2025-02-17             219           219
4    2025-02-16             219           219
5    2025-02-15             219           219
6    2025-02-14             219           219

例1: ユーザーごとの行動パターン分析

table_name = "{DuckDB上のテーブル名}"

with DuckDBManager("dynamodb.db") as conn:
    # DuckDBでクエリを実行して結果をDataFrameに変換
    result = conn.execute(f"""
        WITH user_stats AS (
            SELECT 
                user_id,
                COUNT(*) as total_actions,
                MIN(timestamp::TIMESTAMP) as first_action,
                MAX(timestamp::TIMESTAMP) as last_action
            FROM {table_name}
            GROUP BY user_id
        )
        SELECT 
            CASE 
                WHEN total_actions >= 100 THEN 'Heavy User'
                WHEN total_actions >= 50 THEN 'Active User'
                ELSE 'Light User'
            END as user_category,
            COUNT(*) as user_count,
            AVG(total_actions) as avg_actions
        FROM user_stats
        GROUP BY 1
        ORDER BY avg_actions DESC;
    """).fetchdf()
    print(result)

実行結果

  user_category  user_count  avg_actions
0    Heavy User         245       250.5
1    Active User        678         72.3    
2    Light User        2675         14.5

このような形でDynamoDBから取り込んだデータに対してDuckDBでSQLを実行して分析することができました。

まとめ

Amazon DynamoDBのテーブルに蓄積したデータをローカル端末上でSQLを使うためにローカル端末上のDuckDBに取り込んで分析してみました。
最小限のコードで簡単にデータを取り込め、直感的なSQLクエリでDynamoDBの非構造化データを分析することができました。また、PandasとシームレスにデータをやりとりできるためPythonでの分析作業も非常にスムーズだと思います。他にはローカル環境で完結するため、開発やテストのサイクルを素早く回せる点も実務で使う上で大きなメリットだと感じました。

最後まで読んで頂いてありがとうございました。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.