Amazon DyndmoDBにたまったデータをDuckDBに取り込んでみた
はじめに
データ事業本部のkobayashiです。Amazon DynamoDBのテーブルに蓄積したデータをローカル端末上でSQLを使ってどうにかできないかと考えていたところローカルのDuckDBに取り込んで分析してみたらどうかと思い試してみたのでその内奥をまとめます。
DuckBDとは
DuckDBは、分析処理に特化したエンベデッド型のリレーショナルデータベースシステムです。SQLiteのように単一のプロセス内で動作しますが、OLAP(Online Analytical Processing)ワークロードに最適化されているのが特徴です。
- プロセス内で動作し外部サーバーが不要
- カラム指向ストレージでベクトル化実行による並列処理を行うため分析クエリを高速に処理
- Pandas、Apache ArrowなどのPythonデータ分析ライブラリとシームレスに連携
- 標準SQLに準拠しているのでSQL文で分析が可能
- メモリ効率が高く、大規模データセットでも高いパフォーマンスを発揮
といった特徴があり、 データサイエンティストやアナリストにとって、PandasデータフレームとSQLの両方を活用できる点が魅力的です。また、インストールも簡単でPythonではpip install duckdb
だけで始められることも利点です。
DynamoDBのデータをDuckDBに取り込む
では早速DynamoDBのデータをDuckDBに取り込んでみます。
今回行うのは愚直にDynamoDBのデータをScanしてDuckDBに取り込む方法です。この方法だとDynamoDBのScanを使っているのでキャパシティユニットを消費するので注意が必要なので、DynamoDBからAmazon S3 へのデータエクスポート機能を使ったほうが良いかと思います。
DynamoDBからDuckDBに取り込むコードになります。今回はListやMapは除外してスカラー型の数値、文字列、Boolのみを取り込むようにしています。
import boto3
import pandas as pd
import duckdb
from typing import List, Dict, Any
class DuckDBManager:
def __init__(self, db_path: str, is_read_only: bool = True):
"""
Args:
db_path: データベースファイルのパス
is_read_only: コネクションのタイプ
"""
self.db_path = db_path
self.is_read_only = is_read_only
self.connection = None
def __enter__(self):
"""コンテキストマネージャーの開始でデータベースに接続"""
try:
self.connection = duckdb.connect(self.db_path, read_only=self.is_read_only)
except Exception as e:
print(e)
raise
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
"""コンテキストマネージャーの終了で接続を閉じる"""
if self.connection:
try:
self.connection.close()
except Exception as e:
print(e)
def _deserialize_dynamodb_item(item: Dict[str, Any]) -> Dict[str, Any]:
"""DynamoDBのデータ型をPythonのネイティブ型に変換"""
result = {}
for key, value in item.items():
# DynamoDBの型に応じて変換
if "S" in value: # 文字列
result[key] = value["S"]
elif "N" in value: # 数値
result[key] = float(value["N"])
elif "BOOL" in value: # ブール値
result[key] = value["BOOL"]
return result
def fetch_from_dynamodb(table_name: str) -> List[Dict[str, Any]]:
"""DynamoDBからデータを取得"""
dynamodb = boto3.client("dynamodb")
items = []
paginator = dynamodb.get_paginator("scan")
try:
# ページネーションを使用して全データを取得
for page in paginator.paginate(TableName=table_name):
for item in page["Items"]:
deserialized_item = _deserialize_dynamodb_item(item)
items.append(deserialized_item)
return items
except Exception as e:
print(f"Error fetching data from DynamoDB: {e}")
raise
def load_to_duckdb(db_path: str, table_name: str):
"""データをDuckDBに読み込む"""
try:
data = fetch_from_dynamodb(table_name)
# DataFrameに変換
df = pd.DataFrame(data)
with DuckDBManager(db_path, False) as conn:
# DuckDBにテーブルとして保存
conn.register("temp_view", df)
conn.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} AS
SELECT * FROM temp_view
""")
except Exception as e:
print(f"Error loading data to DuckDB: {e}")
raise
def main():
load_to_duckdb("dynamodb.db", "テーブル名")
if __name__ == "__main__":
main()
上記のスクリプトを実行することでDyanmoDBのテーブルをローカル端末のDuckDBとしてdynamodb.dbファイルに取り込むことができます。
DuckDBManagerクラスは後続のDuckDBでSQLを使う際に再度使うためクラス化してwithブロック内で安全にDuckDBにクエリを実行出来るようにしてあります。
DynamoDBから取り込んだデータにDuckDBでSQLを使ってみた
では取り込んだデータに対してDuckDBでSQLを実行してみます。SQLを実行するには先程取り込みで使用したDuckDBManagerクラスを使って実行します。
例1: 日付別のアクティビティ集計
table_name = "{DuckDB上のテーブル名}"
with DuckDBManager("dynamodb.db") as conn:
# DuckDBでクエリを実行して結果をDataFrameに変換
result = conn.execute(f"""
SELECT
DATE_TRUNC('day', start_datetime::TIMESTAMP) as activity_date,
COUNT(*) as activity_count,
COUNT(DISTINCT user_id) as unique_users
FROM {table_name}
GROUP BY DATE_TRUNC('day', start_datetime::TIMESTAMP)
ORDER BY activity_date DESC
LIMIT 7;
""").fetchdf()
print(result)
実行結果
activity_date activity_count unique_users
0 2025-02-20 225 220
1 2025-02-19 221 219
2 2025-02-18 225 220
3 2025-02-17 219 219
4 2025-02-16 219 219
5 2025-02-15 219 219
6 2025-02-14 219 219
例1: ユーザーごとの行動パターン分析
table_name = "{DuckDB上のテーブル名}"
with DuckDBManager("dynamodb.db") as conn:
# DuckDBでクエリを実行して結果をDataFrameに変換
result = conn.execute(f"""
WITH user_stats AS (
SELECT
user_id,
COUNT(*) as total_actions,
MIN(timestamp::TIMESTAMP) as first_action,
MAX(timestamp::TIMESTAMP) as last_action
FROM {table_name}
GROUP BY user_id
)
SELECT
CASE
WHEN total_actions >= 100 THEN 'Heavy User'
WHEN total_actions >= 50 THEN 'Active User'
ELSE 'Light User'
END as user_category,
COUNT(*) as user_count,
AVG(total_actions) as avg_actions
FROM user_stats
GROUP BY 1
ORDER BY avg_actions DESC;
""").fetchdf()
print(result)
実行結果
user_category user_count avg_actions
0 Heavy User 245 250.5
1 Active User 678 72.3
2 Light User 2675 14.5
このような形でDynamoDBから取り込んだデータに対してDuckDBでSQLを実行して分析することができました。
まとめ
Amazon DynamoDBのテーブルに蓄積したデータをローカル端末上でSQLを使うためにローカル端末上のDuckDBに取り込んで分析してみました。
最小限のコードで簡単にデータを取り込め、直感的なSQLクエリでDynamoDBの非構造化データを分析することができました。また、PandasとシームレスにデータをやりとりできるためPythonでの分析作業も非常にスムーズだと思います。他にはローカル環境で完結するため、開発やテストのサイクルを素早く回せる点も実務で使う上で大きなメリットだと感じました。
最後まで読んで頂いてありがとうございました。