Aurora DSQL Python Connector がリリースされたので、Lambda で性能を実測比較してみた (psycopg2 vs asyncpg)
2026年2月19日、Aurora DSQL 向けの公式コネクタが Go / Python / Node.js の3言語でリリースされました。
Python 向けパッケージ aurora-dsql-python-connector は psycopg / psycopg2 / asyncpg の3ドライバに対応し、DSQL 接続時の IAM 認証トークン生成を自動化する透過的な認証レイヤーです。
これまで DSQL に Python から接続するには、boto3 で DSQL クライアントを作成し、generate_db_connect_admin_auth_token でトークンを生成し、それを psycopg2.connect の password に渡す必要がありました。
import psycopg2
import boto3
def get_dsql_connection():
client = boto3.client('dsql', region_name='us-west-2')
token = client.generate_db_connect_admin_auth_token(DSQL_ENDPOINT, 'us-west-2')
return psycopg2.connect(
host=DSQL_ENDPOINT, port=5432, database='postgres',
user='admin', password=token, sslmode='require'
)
新コネクタを使うと、これが1行になります。
import aurora_dsql_psycopg2 as dsql
conn = dsql.connect(host=DSQL_ENDPOINT, region='us-west-2', user='admin')
boto3 呼び出し、トークン生成、sslmode 指定がすべて不要になりました。asyncpg でも同様に aurora_dsql_asyncpg をインポートするだけで使えます。
動機: なぜ Lambda で検証したのか
新コネクタのリリースを受けて、psycopg2 を使っている Lambda ワークロードでの改善効果を把握したいと考えました。現在の環境には3つの課題がありました。
psycopg2-binary の x86 向け Lambda レイヤーに依存しており、arm64(Graviton)に移行できないこと。一部の Lambda は Step Functions 経由で最大 230 分実行されるため、DSQL のコネクション最大寿命(60分)を超えてトークン期限切れが起きうること。そして複数ファイルに同じ get_dsql_connection() がコピペされ、保守性に問題があったことです。
新コネクタが asyncpg にも対応していたため、ドライバ変更も含めた改善効果を Lambda の実環境で測ってみることにしました。
なお、新コネクタは psycopg(v3)にも対応していますが、今回は既存環境の移行を想定し psycopg2 と asyncpg の2つに絞りました。
検証方法
テスト構成
SAM で Lambda 関数を2つ作成しました。x86_64 と arm64(Graviton)の各1つで、それぞれに psycopg2 と asyncpg を含むレイヤーを紐づけました。Runtime は Python 3.12、メモリ 256 MB、DSQL クラスターは Lambda と同じ us-west-2 です。検証後は sam delete でスタックごと削除できます。
テストパターン
以下の6パターンを計測しました。
| # | パターン | アーキテクチャ | 各反復の処理 |
|---|---|---|---|
| 1 | psycopg2(毎回接続) | x86_64 | トークン自動生成 → TCP → TLS → PG認証 → SQL → 切断 |
| 2 | asyncpg(毎回接続) | x86_64 | 同上(ドライバが asyncpg) |
| 3 | asyncpg(コネクション再利用) | x86_64 | 初回のみ接続確立、以降は同一コネクションを再利用 |
| 4 | psycopg2(毎回接続) | arm64 | パターン1を Graviton で実行 |
| 5 | asyncpg(毎回接続) | arm64 | パターン2を Graviton で実行 |
| 6 | asyncpg(コネクション再利用) | arm64 | パターン3を Graviton で実行 |
「毎回接続」パターンは実運用を想定したものではなく、接続確立1回あたりのオーバーヘッドを正確に計測するための意図的なアンチパターンです。
測定方法
クエリには SELECT 1 を使用し、DB 側の処理を最小化しました。1回の Lambda invoke 内で30回反復し、各反復で「接続確立時間」と「クエリ実行時間」を time.perf_counter() で個別に計測しました。各アーキテクチャで複数回 invoke し(x86: 6回、arm64: 3回)、中央値を採用しました。
計測はウォームスタート時のハンドラ内ループで、Init Duration は含みません。「接続時間」は IAM トークン生成 + TCP 接続 + TLS ハンドシェイク + PostgreSQL 認証の合計です。
参考値: コールドスタート
| x86_64 | arm64 | |
|---|---|---|
| Init Duration | 149ms | 123ms |
| 初回接続 | 約1.0〜1.5秒 | 約1.0秒 |
| コールドスタート合計 | 約1.2〜1.7秒 | 約1.1〜1.2秒 |
API Gateway のデフォルトタイムアウト(29秒)には十分な余裕がありました。
以下は Lambda のライフサイクルと、コネクション確立タイミングの概念図です。
今回の検証で計測したのは、このウォームスタート時のハンドラ内ループです。
検証結果
x86_64 Lambda(中央値、6回実行)
| パターン | 接続(avg) | クエリ(avg) |
|---|---|---|
| psycopg2(毎回接続) | 873.0ms | 12.95ms |
| asyncpg(毎回接続) | 807.7ms | 7.53ms |
| asyncpg(コネクション再利用) | 0.02ms | 2.50ms |
arm64(Graviton)Lambda(中央値、3回実行)
| パターン | 接続(avg) | クエリ(avg) |
|---|---|---|
| psycopg2(毎回接続) | 673.6ms | 12.56ms |
| asyncpg(毎回接続) | 633.1ms | 6.67ms |
| asyncpg(コネクション再利用) | 0.02ms | 2.44ms |
考察
ドライバ比較: 接続差は小さく、クエリ差は明確でした
接続確立時間のドライバ間の差は x86 で 7%、arm64 で 6% と小さいものでした。接続時間の大部分は IAM トークン生成(boto3 経由の STS 呼び出し)が占めており、ドライバ固有の処理はその一部に過ぎないためです。
一方、クエリ実行時間には明確な差が出ました。asyncpg は x86 で 42%、arm64 で 47% 速い結果でした。PostgreSQL のバイナリプロトコルと Cython による結果パースが効いています。
ただし Lambda 環境では接続確立(600〜870ms)が処理時間の大部分を占めるため、クエリの数ミリ秒差は全体に対して誤差レベルです。
コネクション再利用: 接続コストをゼロにできました
コネクション再利用パターンでは、接続取得が 600〜870ms から 0.02ms に短縮されました。確立済みのコネクションをそのまま使うため、TCP 接続もトークン生成も発生しません。
実装としては asyncpg.create_pool(min_size=1) を使用しましたが、Lambda は1リクエストを直列に処理するため、実質的にはグローバル変数で単一コネクションを保持するのと同じ効果です。Lambda ではプールライブラリを使わず、以下のようにグローバルスコープでコネクションを保持するのがシンプルです。
import aurora_dsql_asyncpg as dsql
conn = None
async def handler(event, context):
global conn
if conn is None or conn.is_closed():
conn = await dsql.connect(host=DSQL_ENDPOINT, region='us-west-2', user='admin')
row = await conn.fetchrow("SELECT ...")
return row
ウォームスタート時にはグローバル変数が維持されるため、2回目以降の invoke では接続確立をスキップできます。
アーキテクチャ比較: arm64 が接続確立で 22〜23% 速い結果でした
psycopg2 で 23%(873→674ms)、asyncpg で 22%(808→633ms)の短縮でした。IAM トークン生成の署名計算など CPU バウンドな処理で Graviton が速く処理できたと考えられます。クエリ実行時間はネットワーク I/O が支配的なため、ほぼ同等でした。
Lambda × DSQL 運用の注意点
接続レート制限: 最大の罠
DSQL には接続レート 100接続/秒のハードリミットがあります。Lambda がスパイクアクセスでスケールアウトすると、各コンテナが一斉にコネクションを張りにいき、100/sec の上限に一瞬で到達します。RDS Proxy のような中継地点が DSQL にはないため、Lambda 側での制御が必須です。
対策としては、Reserved Concurrency で同時実行数を制限する、接続エラー時にエクスポネンシャルバックオフでリトライする、SQS や EventBridge を挟んでスパイクを吸収する、といった方法が考えられます。
DSQL の主な制約と Lambda での影響をまとめます。
| DSQL 制約 | 値 | Lambda での影響 |
|---|---|---|
| 接続レート | 100接続/秒 | 同時スケールアウト時に最も危険 |
| コネクション最大寿命 | 60分 | 長時間バッチでは再接続が必要 |
| トランザクション最大時間 | 5分 | 大量 INSERT は分割が必要 |
| トランザクション最大行数 | 3,000行 | バッチサイズの設計に影響 |
| 最大接続数 | 10,000 | Lambda の大規模運用では意識が必要 |
asyncpg を採用すべきか
asyncpg のクエリ実行は 42〜47% 速い結果でしたが、Lambda での採用判断は速度だけでは決まりません。
asyncpg が適するのは、arm64 移行で psycopg2 の x86 レイヤー依存を断ち切りたい場合、asyncio.gather で複数クエリを並列実行する場合、バッチ処理でコネクション再利用が重要な場合です。
一方、1リクエスト1〜2クエリの単純な処理なら psycopg2 コネクタで十分です。SQL の書き換えが不要で導入コストが低く、手動トークン生成の排除とボイラープレート削減の恩恵はそれだけで得られます。
asyncpg に移行する場合の注意点
asyncpg を採用する場合、いくつかのコード変更が必要です。
SQL のプレースホルダが %s から $1, $2, ... に変わり、パラメータはタプルではなく位置引数で渡します。
# psycopg2
cur.execute("SELECT * FROM t WHERE id = %s AND status = %s", (article_id, 'pending'))
row = cur.fetchone()
# asyncpg
row = await conn.fetchrow("SELECT * FROM t WHERE id = $1 AND status = $2", article_id, 'pending')
メソッド名は fetchone → fetchrow、fetchall → fetch に変わります。
トランザクションの扱いも異なります。psycopg2 は autocommit=False がデフォルトで暗黙的にトランザクションが開始されますが、asyncpg は autocommit がデフォルトです。明示的にトランザクションを使う場合は async with conn.transaction(): で囲みます。
# psycopg2
conn.commit()
# asyncpg
async with conn.transaction():
await conn.execute("INSERT ...")
await conn.execute("UPDATE ...")
DSQL のトランザクション最大時間は5分のため、asyncpg の autocommit デフォルトの方がこの制限に引っかかりにくく、安全側に倒れます。
まとめ
Lambda 上での検証を通じて、新コネクタの効果を確認できました。
接続確立のコストは Lambda 環境では 600〜870ms と大きく、コネクション再利用でゼロにできます。arm64 では接続確立が x86 比で 22〜23% 速くなり、コスト面でも 20% の削減が見込めます。コネクタが IAM トークンを自動生成するため、長時間バッチでの認証切れリスクも解消されます。
DSQL への接続方法を探しているなら、まずは既存コードを変えずに導入できる psycopg2 コネクタから試すのがよいと思います。arm64 移行やバッチ処理の最適化が視野に入るなら、asyncpg コネクタへのステップアップを検討してみてください。
なお、今回の計測は同一リージョン(us-west-2)でのウォームスタート時の値で、30回反復 × 複数回 invoke の中央値です。ネットワーク環境や DSQL クラスターの負荷状態により結果は変動します。
参考情報: 検証用コード
検証に使用した SAM テンプレートと Lambda ハンドラを掲載します。sam build && sam deploy でデプロイし、sam remote invoke で実行、sam delete で撤去できます。
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Aurora DSQL Connector Benchmark on Lambda
Parameters:
DsqlEndpoint:
Type: String
DsqlRegion:
Type: String
Default: us-west-2
BenchIterations:
Type: String
Default: '30'
Globals:
Function:
Runtime: python3.12
Timeout: 300
MemorySize: 256
Environment:
Variables:
DSQL_ENDPOINT: !Ref DsqlEndpoint
DSQL_REGION: !Ref DsqlRegion
BENCH_ITERATIONS: !Ref BenchIterations
Resources:
Psycopg2Layer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: dsql-bench-psycopg2-x86
ContentUri: layers/psycopg2-x86/
CompatibleRuntimes: [python3.12]
CompatibleArchitectures: [x86_64]
Metadata:
BuildMethod: python3.12
BuildArchitecture: x86_64
AsyncpgLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: dsql-bench-asyncpg-arm64
ContentUri: layers/asyncpg-arm64/
CompatibleRuntimes: [python3.12]
CompatibleArchitectures: [arm64]
Metadata:
BuildMethod: python3.12
BuildArchitecture: arm64
BenchX86:
Type: AWS::Serverless::Function
Properties:
FunctionName: dsql-bench-x86
Handler: handler.lambda_handler
CodeUri: src/
Architectures: [x86_64]
Layers: [!Ref Psycopg2Layer]
Policies:
- Statement:
- Effect: Allow
Action: dsql:DbConnectAdmin
Resource: !Sub arn:aws:dsql:${DsqlRegion}:${AWS::AccountId}:cluster/*
BenchArm64:
Type: AWS::Serverless::Function
Properties:
FunctionName: dsql-bench-arm64
Handler: handler.lambda_handler
CodeUri: src/
Architectures: [arm64]
Layers: [!Ref AsyncpgLayer]
Policies:
- Statement:
- Effect: Allow
Action: dsql:DbConnectAdmin
Resource: !Sub arn:aws:dsql:${DsqlRegion}:${AWS::AccountId}:cluster/*
handler.py
import asyncio
import json
import os
import platform
import statistics
import time
DSQL_ENDPOINT = os.environ['DSQL_ENDPOINT']
DSQL_REGION = os.environ.get('DSQL_REGION', 'us-west-2')
ITERATIONS = int(os.environ.get('BENCH_ITERATIONS', '30'))
QUERY = 'SELECT 1'
ARCH = platform.machine()
def fmt(s):
return round(s * 1000, 2)
def calc_stats(connect_times, query_times):
n = len(connect_times)
total = sum(connect_times) + sum(query_times)
return {
'connect_avg_ms': fmt(statistics.mean(connect_times)),
'connect_p50_ms': fmt(statistics.median(connect_times)),
'connect_p95_ms': fmt(sorted(connect_times)[int(n * 0.95)]),
'query_avg_ms': fmt(statistics.mean(query_times)),
'query_p50_ms': fmt(statistics.median(query_times)),
'total_ms': fmt(total),
'qps': round(n / total, 1),
}
def bench_psycopg2():
import aurora_dsql_psycopg2 as dsql
ct, qt = [], []
for _ in range(ITERATIONS):
t0 = time.perf_counter()
conn = dsql.connect(host=DSQL_ENDPOINT, region=DSQL_REGION, user='admin')
t1 = time.perf_counter()
ct.append(t1 - t0)
cur = conn.cursor()
cur.execute(QUERY)
cur.fetchall()
qt.append(time.perf_counter() - t1)
cur.close()
conn.close()
return {'pattern': 'psycopg2 (every connect)', **calc_stats(ct, qt)}
async def bench_asyncpg_no_pool():
import aurora_dsql_asyncpg as dsql
ct, qt = [], []
for _ in range(ITERATIONS):
t0 = time.perf_counter()
conn = await dsql.connect(host=DSQL_ENDPOINT, region=DSQL_REGION, user='admin')
t1 = time.perf_counter()
ct.append(t1 - t0)
await conn.fetch(QUERY)
qt.append(time.perf_counter() - t1)
await conn.close()
return {'pattern': 'asyncpg (every connect)', **calc_stats(ct, qt)}
async def bench_asyncpg_reuse():
import aurora_dsql_asyncpg as dsql
t0 = time.perf_counter()
pool = await dsql.create_pool(
host=DSQL_ENDPOINT, region=DSQL_REGION, user='admin',
min_size=1, max_size=3)
pool_create = time.perf_counter() - t0
ct, qt = [], []
for _ in range(ITERATIONS):
t1 = time.perf_counter()
async with pool.acquire() as conn:
t2 = time.perf_counter()
ct.append(t2 - t1)
await conn.fetch(QUERY)
qt.append(time.perf_counter() - t2)
await pool.close()
stats = calc_stats(ct, qt)
stats['pool_create_ms'] = fmt(pool_create)
return {'pattern': 'asyncpg (connection reuse)', **stats}
def lambda_handler(event, context):
results = []
results.append(bench_psycopg2())
loop = asyncio.get_event_loop()
results.append(loop.run_until_complete(bench_asyncpg_no_pool()))
results.append(loop.run_until_complete(bench_asyncpg_reuse()))
body = {
'arch': ARCH,
'runtime': f"python{platform.python_version()}",
'iterations': ITERATIONS,
'query': QUERY,
'endpoint': DSQL_ENDPOINT,
'results': results,
}
print(json.dumps(body, ensure_ascii=False, indent=2))
return body







