Aurora DSQL Python Connector がリリースされたので、Lambda で性能を実測比較してみた (psycopg2 vs asyncpg)

Aurora DSQL Python Connector がリリースされたので、Lambda で性能を実測比較してみた (psycopg2 vs asyncpg)

Aurora DSQLのPython公式コネクタをLambdaで最速検証!psycopg2とasyncpgの性能比較に加え、x86 vs Gravitonの実測データも公開。接続オーバーヘッドをなくすコネクション再利用の手法と、本番運用で直面する「接続レート制限」の回避策をまとめました。
2026.02.20

2026年2月19日、Aurora DSQL 向けの公式コネクタが Go / Python / Node.js の3言語でリリースされました。

https://aws.amazon.com/jp/about-aws/whats-new/2026/02/aurora-dsql-launches-go-python-nodejs-connectors/

https://dev.classmethod.jp/articles/aurora-dsql-launches-go-python-nodejs-connectors/

Python 向けパッケージ aurora-dsql-python-connector は psycopg / psycopg2 / asyncpg の3ドライバに対応し、DSQL 接続時の IAM 認証トークン生成を自動化する透過的な認証レイヤーです。

これまで DSQL に Python から接続するには、boto3 で DSQL クライアントを作成し、generate_db_connect_admin_auth_token でトークンを生成し、それを psycopg2.connectpassword に渡す必要がありました。

import psycopg2
import boto3

def get_dsql_connection():
    client = boto3.client('dsql', region_name='us-west-2')
    token = client.generate_db_connect_admin_auth_token(DSQL_ENDPOINT, 'us-west-2')
    return psycopg2.connect(
        host=DSQL_ENDPOINT, port=5432, database='postgres',
        user='admin', password=token, sslmode='require'
    )

新コネクタを使うと、これが1行になります。

import aurora_dsql_psycopg2 as dsql

conn = dsql.connect(host=DSQL_ENDPOINT, region='us-west-2', user='admin')

boto3 呼び出し、トークン生成、sslmode 指定がすべて不要になりました。asyncpg でも同様に aurora_dsql_asyncpg をインポートするだけで使えます。

動機: なぜ Lambda で検証したのか

新コネクタのリリースを受けて、psycopg2 を使っている Lambda ワークロードでの改善効果を把握したいと考えました。現在の環境には3つの課題がありました。

psycopg2-binary の x86 向け Lambda レイヤーに依存しており、arm64(Graviton)に移行できないこと。一部の Lambda は Step Functions 経由で最大 230 分実行されるため、DSQL のコネクション最大寿命(60分)を超えてトークン期限切れが起きうること。そして複数ファイルに同じ get_dsql_connection() がコピペされ、保守性に問題があったことです。

新コネクタが asyncpg にも対応していたため、ドライバ変更も含めた改善効果を Lambda の実環境で測ってみることにしました。

なお、新コネクタは psycopg(v3)にも対応していますが、今回は既存環境の移行を想定し psycopg2 と asyncpg の2つに絞りました。

検証方法

テスト構成

SAM で Lambda 関数を2つ作成しました。x86_64 と arm64(Graviton)の各1つで、それぞれに psycopg2 と asyncpg を含むレイヤーを紐づけました。Runtime は Python 3.12、メモリ 256 MB、DSQL クラスターは Lambda と同じ us-west-2 です。検証後は sam delete でスタックごと削除できます。

テストパターン

以下の6パターンを計測しました。

# パターン アーキテクチャ 各反復の処理
1 psycopg2(毎回接続) x86_64 トークン自動生成 → TCP → TLS → PG認証 → SQL → 切断
2 asyncpg(毎回接続) x86_64 同上(ドライバが asyncpg)
3 asyncpg(コネクション再利用) x86_64 初回のみ接続確立、以降は同一コネクションを再利用
4 psycopg2(毎回接続) arm64 パターン1を Graviton で実行
5 asyncpg(毎回接続) arm64 パターン2を Graviton で実行
6 asyncpg(コネクション再利用) arm64 パターン3を Graviton で実行

「毎回接続」パターンは実運用を想定したものではなく、接続確立1回あたりのオーバーヘッドを正確に計測するための意図的なアンチパターンです。

測定方法

クエリには SELECT 1 を使用し、DB 側の処理を最小化しました。1回の Lambda invoke 内で30回反復し、各反復で「接続確立時間」と「クエリ実行時間」を time.perf_counter() で個別に計測しました。各アーキテクチャで複数回 invoke し(x86: 6回、arm64: 3回)、中央値を採用しました。

計測はウォームスタート時のハンドラ内ループで、Init Duration は含みません。「接続時間」は IAM トークン生成 + TCP 接続 + TLS ハンドシェイク + PostgreSQL 認証の合計です。

参考値: コールドスタート

x86_64 arm64
Init Duration 149ms 123ms
初回接続 約1.0〜1.5秒 約1.0秒
コールドスタート合計 約1.2〜1.7秒 約1.1〜1.2秒

API Gateway のデフォルトタイムアウト(29秒)には十分な余裕がありました。

以下は Lambda のライフサイクルと、コネクション確立タイミングの概念図です。

今回の検証で計測したのは、このウォームスタート時のハンドラ内ループです。

検証結果

x86_64 Lambda(中央値、6回実行)

パターン 接続(avg) クエリ(avg)
psycopg2(毎回接続) 873.0ms 12.95ms
asyncpg(毎回接続) 807.7ms 7.53ms
asyncpg(コネクション再利用) 0.02ms 2.50ms

arm64(Graviton)Lambda(中央値、3回実行)

パターン 接続(avg) クエリ(avg)
psycopg2(毎回接続) 673.6ms 12.56ms
asyncpg(毎回接続) 633.1ms 6.67ms
asyncpg(コネクション再利用) 0.02ms 2.44ms

考察

ドライバ比較: 接続差は小さく、クエリ差は明確でした

接続確立時間のドライバ間の差は x86 で 7%、arm64 で 6% と小さいものでした。接続時間の大部分は IAM トークン生成(boto3 経由の STS 呼び出し)が占めており、ドライバ固有の処理はその一部に過ぎないためです。

一方、クエリ実行時間には明確な差が出ました。asyncpg は x86 で 42%、arm64 で 47% 速い結果でした。PostgreSQL のバイナリプロトコルと Cython による結果パースが効いています。

ただし Lambda 環境では接続確立(600〜870ms)が処理時間の大部分を占めるため、クエリの数ミリ秒差は全体に対して誤差レベルです。

コネクション再利用: 接続コストをゼロにできました

コネクション再利用パターンでは、接続取得が 600〜870ms から 0.02ms に短縮されました。確立済みのコネクションをそのまま使うため、TCP 接続もトークン生成も発生しません。

実装としては asyncpg.create_pool(min_size=1) を使用しましたが、Lambda は1リクエストを直列に処理するため、実質的にはグローバル変数で単一コネクションを保持するのと同じ効果です。Lambda ではプールライブラリを使わず、以下のようにグローバルスコープでコネクションを保持するのがシンプルです。

import aurora_dsql_asyncpg as dsql

conn = None

async def handler(event, context):
    global conn
    if conn is None or conn.is_closed():
        conn = await dsql.connect(host=DSQL_ENDPOINT, region='us-west-2', user='admin')
    row = await conn.fetchrow("SELECT ...")
    return row

ウォームスタート時にはグローバル変数が維持されるため、2回目以降の invoke では接続確立をスキップできます。

アーキテクチャ比較: arm64 が接続確立で 22〜23% 速い結果でした

psycopg2 で 23%(873→674ms)、asyncpg で 22%(808→633ms)の短縮でした。IAM トークン生成の署名計算など CPU バウンドな処理で Graviton が速く処理できたと考えられます。クエリ実行時間はネットワーク I/O が支配的なため、ほぼ同等でした。

Lambda × DSQL 運用の注意点

接続レート制限: 最大の罠

DSQL には接続レート 100接続/秒のハードリミットがあります。Lambda がスパイクアクセスでスケールアウトすると、各コンテナが一斉にコネクションを張りにいき、100/sec の上限に一瞬で到達します。RDS Proxy のような中継地点が DSQL にはないため、Lambda 側での制御が必須です。

対策としては、Reserved Concurrency で同時実行数を制限する、接続エラー時にエクスポネンシャルバックオフでリトライする、SQS や EventBridge を挟んでスパイクを吸収する、といった方法が考えられます。

DSQL の主な制約と Lambda での影響をまとめます。

DSQL 制約 Lambda での影響
接続レート 100接続/秒 同時スケールアウト時に最も危険
コネクション最大寿命 60分 長時間バッチでは再接続が必要
トランザクション最大時間 5分 大量 INSERT は分割が必要
トランザクション最大行数 3,000行 バッチサイズの設計に影響
最大接続数 10,000 Lambda の大規模運用では意識が必要

asyncpg を採用すべきか

asyncpg のクエリ実行は 42〜47% 速い結果でしたが、Lambda での採用判断は速度だけでは決まりません。

asyncpg が適するのは、arm64 移行で psycopg2 の x86 レイヤー依存を断ち切りたい場合、asyncio.gather で複数クエリを並列実行する場合、バッチ処理でコネクション再利用が重要な場合です。

一方、1リクエスト1〜2クエリの単純な処理なら psycopg2 コネクタで十分です。SQL の書き換えが不要で導入コストが低く、手動トークン生成の排除とボイラープレート削減の恩恵はそれだけで得られます。

asyncpg に移行する場合の注意点

asyncpg を採用する場合、いくつかのコード変更が必要です。

SQL のプレースホルダが %s から $1, $2, ... に変わり、パラメータはタプルではなく位置引数で渡します。

# psycopg2
cur.execute("SELECT * FROM t WHERE id = %s AND status = %s", (article_id, 'pending'))
row = cur.fetchone()

# asyncpg
row = await conn.fetchrow("SELECT * FROM t WHERE id = $1 AND status = $2", article_id, 'pending')

メソッド名は fetchonefetchrowfetchallfetch に変わります。

トランザクションの扱いも異なります。psycopg2 は autocommit=False がデフォルトで暗黙的にトランザクションが開始されますが、asyncpg は autocommit がデフォルトです。明示的にトランザクションを使う場合は async with conn.transaction(): で囲みます。

# psycopg2
conn.commit()

# asyncpg
async with conn.transaction():
    await conn.execute("INSERT ...")
    await conn.execute("UPDATE ...")

DSQL のトランザクション最大時間は5分のため、asyncpg の autocommit デフォルトの方がこの制限に引っかかりにくく、安全側に倒れます。

まとめ

Lambda 上での検証を通じて、新コネクタの効果を確認できました。

接続確立のコストは Lambda 環境では 600〜870ms と大きく、コネクション再利用でゼロにできます。arm64 では接続確立が x86 比で 22〜23% 速くなり、コスト面でも 20% の削減が見込めます。コネクタが IAM トークンを自動生成するため、長時間バッチでの認証切れリスクも解消されます。

DSQL への接続方法を探しているなら、まずは既存コードを変えずに導入できる psycopg2 コネクタから試すのがよいと思います。arm64 移行やバッチ処理の最適化が視野に入るなら、asyncpg コネクタへのステップアップを検討してみてください。

なお、今回の計測は同一リージョン(us-west-2)でのウォームスタート時の値で、30回反復 × 複数回 invoke の中央値です。ネットワーク環境や DSQL クラスターの負荷状態により結果は変動します。

参考情報: 検証用コード

検証に使用した SAM テンプレートと Lambda ハンドラを掲載します。sam build && sam deploy でデプロイし、sam remote invoke で実行、sam delete で撤去できます。

template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Aurora DSQL Connector Benchmark on Lambda

Parameters:
  DsqlEndpoint:
    Type: String
  DsqlRegion:
    Type: String
    Default: us-west-2
  BenchIterations:
    Type: String
    Default: '30'

Globals:
  Function:
    Runtime: python3.12
    Timeout: 300
    MemorySize: 256
    Environment:
      Variables:
        DSQL_ENDPOINT: !Ref DsqlEndpoint
        DSQL_REGION: !Ref DsqlRegion
        BENCH_ITERATIONS: !Ref BenchIterations

Resources:
  Psycopg2Layer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: dsql-bench-psycopg2-x86
      ContentUri: layers/psycopg2-x86/
      CompatibleRuntimes: [python3.12]
      CompatibleArchitectures: [x86_64]
    Metadata:
      BuildMethod: python3.12
      BuildArchitecture: x86_64

  AsyncpgLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: dsql-bench-asyncpg-arm64
      ContentUri: layers/asyncpg-arm64/
      CompatibleRuntimes: [python3.12]
      CompatibleArchitectures: [arm64]
    Metadata:
      BuildMethod: python3.12
      BuildArchitecture: arm64

  BenchX86:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: dsql-bench-x86
      Handler: handler.lambda_handler
      CodeUri: src/
      Architectures: [x86_64]
      Layers: [!Ref Psycopg2Layer]
      Policies:
        - Statement:
            - Effect: Allow
              Action: dsql:DbConnectAdmin
              Resource: !Sub arn:aws:dsql:${DsqlRegion}:${AWS::AccountId}:cluster/*

  BenchArm64:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: dsql-bench-arm64
      Handler: handler.lambda_handler
      CodeUri: src/
      Architectures: [arm64]
      Layers: [!Ref AsyncpgLayer]
      Policies:
        - Statement:
            - Effect: Allow
              Action: dsql:DbConnectAdmin
              Resource: !Sub arn:aws:dsql:${DsqlRegion}:${AWS::AccountId}:cluster/*
handler.py
import asyncio
import json
import os
import platform
import statistics
import time

DSQL_ENDPOINT = os.environ['DSQL_ENDPOINT']
DSQL_REGION = os.environ.get('DSQL_REGION', 'us-west-2')
ITERATIONS = int(os.environ.get('BENCH_ITERATIONS', '30'))
QUERY = 'SELECT 1'
ARCH = platform.machine()

def fmt(s):
    return round(s * 1000, 2)

def calc_stats(connect_times, query_times):
    n = len(connect_times)
    total = sum(connect_times) + sum(query_times)
    return {
        'connect_avg_ms': fmt(statistics.mean(connect_times)),
        'connect_p50_ms': fmt(statistics.median(connect_times)),
        'connect_p95_ms': fmt(sorted(connect_times)[int(n * 0.95)]),
        'query_avg_ms': fmt(statistics.mean(query_times)),
        'query_p50_ms': fmt(statistics.median(query_times)),
        'total_ms': fmt(total),
        'qps': round(n / total, 1),
    }

def bench_psycopg2():
    import aurora_dsql_psycopg2 as dsql
    ct, qt = [], []
    for _ in range(ITERATIONS):
        t0 = time.perf_counter()
        conn = dsql.connect(host=DSQL_ENDPOINT, region=DSQL_REGION, user='admin')
        t1 = time.perf_counter()
        ct.append(t1 - t0)
        cur = conn.cursor()
        cur.execute(QUERY)
        cur.fetchall()
        qt.append(time.perf_counter() - t1)
        cur.close()
        conn.close()
    return {'pattern': 'psycopg2 (every connect)', **calc_stats(ct, qt)}

async def bench_asyncpg_no_pool():
    import aurora_dsql_asyncpg as dsql
    ct, qt = [], []
    for _ in range(ITERATIONS):
        t0 = time.perf_counter()
        conn = await dsql.connect(host=DSQL_ENDPOINT, region=DSQL_REGION, user='admin')
        t1 = time.perf_counter()
        ct.append(t1 - t0)
        await conn.fetch(QUERY)
        qt.append(time.perf_counter() - t1)
        await conn.close()
    return {'pattern': 'asyncpg (every connect)', **calc_stats(ct, qt)}

async def bench_asyncpg_reuse():
    import aurora_dsql_asyncpg as dsql
    t0 = time.perf_counter()
    pool = await dsql.create_pool(
        host=DSQL_ENDPOINT, region=DSQL_REGION, user='admin',
        min_size=1, max_size=3)
    pool_create = time.perf_counter() - t0
    ct, qt = [], []
    for _ in range(ITERATIONS):
        t1 = time.perf_counter()
        async with pool.acquire() as conn:
            t2 = time.perf_counter()
            ct.append(t2 - t1)
            await conn.fetch(QUERY)
            qt.append(time.perf_counter() - t2)
    await pool.close()
    stats = calc_stats(ct, qt)
    stats['pool_create_ms'] = fmt(pool_create)
    return {'pattern': 'asyncpg (connection reuse)', **stats}

def lambda_handler(event, context):
    results = []
    results.append(bench_psycopg2())
    loop = asyncio.get_event_loop()
    results.append(loop.run_until_complete(bench_asyncpg_no_pool()))
    results.append(loop.run_until_complete(bench_asyncpg_reuse()))
    body = {
        'arch': ARCH,
        'runtime': f"python{platform.python_version()}",
        'iterations': ITERATIONS,
        'query': QUERY,
        'endpoint': DSQL_ENDPOINT,
        'results': results,
    }
    print(json.dumps(body, ensure_ascii=False, indent=2))
    return body

参考リンク

この記事をシェアする

FacebookHatena blogX

関連記事