DynamoDB Accelerator (DAX)でのトランザクション処理について検証してみた
はじめに
みなさんこんにちは、先日DynamoDBのトランザクションの仕組みを改めて確認していると
「DynamoDB アクセラレーター (DAX) でのトランザクション API の使用」 という項目を発見しました。DAXに対してもトランザクションが有効なのかと思い調べてみると、だいぶ前にサポートしていました。
今回は上記公式ドキュメントの以下の2点の記述について具体的に検証を通して動きを確認してみます。
DAX は、TransactWriteItems オペレーション内の各項目に対してバックグラウンドで TransactGetItems をコールします。これにより、追加の読み込み容量単位が消費されます。
要約: DAXのトランザクション経由で更新した項目は裏側で自動的にDAXにキャッシュしてくれるのでその分のRCU消費するよ!
TransactGetItems コールは、項目がローカルにキャッシュされることなく DAX を通過します。これは、DAX の強い整合性のある読み込み API と同じです。
要約: TransactGetItemsコールはDynamoDBに「強い整合性のある読み込み」として取得しに行くしキャッシュもしないよ(キャッシュバイパス)
検証ポイント
- TransactWriteItems のバックグラウンドキャッシュ更新
- TransactGetItems のキャッシュバイパス動作
構成図
以下の構成をSAMで作成し、検証しました。
作成リソース
リソース | 説明 |
---|---|
DynamoDB テーブル x2 | Products (商品情報) と Orders (注文情報) |
DAX クラスター | test-cluster (t3.small × 2ノード) |
VPC | DAXクラスターとLambda関数用 |
サブネット x2 | 2つのアベイラビリティゾーンに配置 |
セキュリティグループ | DAXポート(8111)へのアクセスを許可 |
Lambda Layer | amazon-dax-client ライブラリを含む |
Lambda関数 x3 | TransactWriteItems の検証/ TransactGetItems の検証 / 業務的なコード |
DAXクラスター用IAMロール | DynamoDBへのアクセス |
Lambda関数用IAMロール | DynamoDBアクセス、VPCアクセス |
SAMフォルダ構成
.
├── README.md
├── __init__.py
├── functions
│ ├── ecommerce_test
│ │ └── app.py
│ ├── transact_get_test
│ │ └── app.py
│ └── transact_write_test
│ └── app.py
├── layers
│ └── amazon-dax-client
│ └── requirements.txt
├── samconfig.toml
└── template.yaml
テンプレートファイル
AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Description: DAX Transaction Test
Parameters:
Project:
Type: String
Default: dax-transaction-test
Globals:
Function:
Timeout: 30
MemorySize: 256
Runtime: python3.12
Layers:
- !Ref DaxClientLayer
Environment:
Variables:
DAX_ENDPOINT: !GetAtt DaxCluster.ClusterDiscoveryEndpoint
VpcConfig:
SecurityGroupIds:
- !GetAtt DaxSecurityGroup.GroupId
SubnetIds:
- !Ref Subnet1
- !Ref Subnet2
Resources:
## Layer
DaxClientLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: !Sub "${Project}-dax-client-layer"
ContentUri: layers/amazon-dax-client/
CompatibleArchitectures:
- x86_64
CompatibleRuntimes:
- python3.12
Metadata:
BuildMethod: python3.12
# DynamoDBテーブル
ProductsTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
UpdateReplacePolicy: Delete
Properties:
TableName: !Sub "${Project}-products"
BillingMode: PROVISIONED
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
AttributeDefinitions:
- AttributeName: ProductId
AttributeType: S
KeySchema:
- AttributeName: ProductId
KeyType: HASH
Tags:
- Key: Project
Value: !Ref Project
OrdersTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
UpdateReplacePolicy: Delete
Properties:
TableName: !Sub "${Project}-orders"
BillingMode: PROVISIONED
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
AttributeDefinitions:
- AttributeName: OrderId
AttributeType: S
KeySchema:
- AttributeName: OrderId
KeyType: HASH
Tags:
- Key: Project
Value: !Ref Project
# VPC設定(DAXに必要)
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.0.0.0/16
EnableDnsSupport: true
EnableDnsHostnames: true
Tags:
- Key: Project
Value: !Ref Project
- Key: Name
Value: !Sub "${Project}-vpc"
# サブネットx2
Subnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.0.0.0/24
AvailabilityZone: !Select [0, !GetAZs ""]
Tags:
- Key: Project
Value: !Ref Project
- Key: Name
Value: !Sub "${Project}-subnet-1"
Subnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.0.1.0/24
AvailabilityZone: !Select [1, !GetAZs ""]
Tags:
- Key: Project
Value: !Ref Project
- Key: Name
Value: !Sub "${Project}-subnet-2"
# DAX関連リソース
DaxSubnetGroup:
Type: AWS::DAX::SubnetGroup
Properties:
SubnetGroupName: !Sub "${Project}-subnet-group"
SubnetIds:
- !Ref Subnet1
- !Ref Subnet2
DaxSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for DAX cluster
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 8111
ToPort: 8111
CidrIp: 10.0.0.0/16
Tags:
- Key: Project
Value: !Ref Project
- Key: Name
Value: !Sub "${Project}-sg"
DaxServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "${Project}-dax-role"
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: dax.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
Tags:
- Key: Project
Value: !Ref Project
DaxCluster:
Type: AWS::DAX::Cluster
Properties:
ClusterName: !Sub "test-cluster"
NodeType: dax.t3.small
ReplicationFactor: 2
IAMRoleARN: !GetAtt DaxServiceRole.Arn
SecurityGroupIds:
- !GetAtt DaxSecurityGroup.GroupId
SubnetGroupName: !Ref DaxSubnetGroup
# Lambda用IAMロール
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub "${Project}-lambda-role"
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
Tags:
- Key: Project
Value: !Ref Project
# Lambda関数
TransactWriteTestFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${Project}-write"
CodeUri: functions/transact_write_test/
Handler: app.handler
Role: !GetAtt LambdaExecutionRole.Arn
Environment:
Variables:
PRODUCTS_TABLE: !Ref ProductsTable
Tags:
Project: !Ref Project
TransactGetTestFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${Project}-get"
CodeUri: functions/transact_get_test/
Handler: app.handler
Role: !GetAtt LambdaExecutionRole.Arn
Environment:
Variables:
PRODUCTS_TABLE: !Ref ProductsTable
Tags:
Project: !Ref Project
EcommerceTestFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${Project}-ecommerce"
CodeUri: functions/ecommerce_test/
Handler: app.handler
Role: !GetAtt LambdaExecutionRole.Arn
Environment:
Variables:
PRODUCTS_TABLE: !Ref ProductsTable
ORDERS_TABLE: !Ref OrdersTable
Tags:
Project: !Ref Project
Outputs:
DaxEndpoint:
Value: !GetAtt DaxCluster.ClusterDiscoveryEndpoint
ProductsTable:
Value: !Ref ProductsTable
OrdersTable:
Value: !Ref OrdersTable
ところで以下のAZの指定方法いいですね。初めて知りました。下記のブログで詳しい詳細が載ってます
AvailabilityZone: !Select [1, !GetAZs ""]
検証①:TransactWriteItems のバックグラウンドキャッシュ更新
DAXにてTransactWriteItemsを行うとDynamoDBに項目を保存した後バックグラウンドでDAXのキャッシュを更新してくれるはずです。なので以下のコードで検証しました
- Productsテーブルに対して2つの項目をDAX経由でTransactWriteItemsする
- 保存した値をすぐにDAX経由で読み取る
- CloudWatch メトリクスで以下項目を確認する
- DynamoDB : ConsumedWriteCapacityUnits(消費WCU)
- DynamoDB : ConsumedReadCapacityUnits(消費RCU)
- DAX : ItemCacheHits(キャッシュヒット数)
- DAX : ItemCacheMisses(キャッシュミス数)
Lambda関数
Productsテーブルに 2つの商品をトランザクションでDAX経由で保存しその後取得する簡易コードです。
ちなみにpythonにおけるDAXの操作は「boto3」ではできません。今回は「amazon-dax-client ver 2.0.3」を使用しています。
import os
import time
from amazondax import AmazonDaxClient
# 環境変数から取得
DAX_ENDPOINT = os.environ['DAX_ENDPOINT']
PRODUCTS_TABLE = os.environ['PRODUCTS_TABLE']
# DAXクライアント
dax = AmazonDaxClient(endpoint_url=DAX_ENDPOINT)
def handler(event, context):
# 書き込むテストデータ
products = [
{
'ProductId': {'S': 'dax-phone'},
'Name': {'S': 'スマートフォン'},
'Price': {'N': '60000'},
'Stock': {'N': '30'}
},
{
'ProductId': {'S': 'dax-laptop'},
'Name': {'S': 'ノートパソコン'},
'Price': {'N': '150000'},
'Stock': {'N': '15'}
}
]
# DAX 経由で TransactWriteItems を実行
dax.transact_write_items(
TransactItems=[
{'Put': {'TableName': PRODUCTS_TABLE, 'Item': products[0]}},
{'Put': {'TableName': PRODUCTS_TABLE, 'Item': products[1]}}
]
)
# DAX経由のGetItem を実行
for p in products:
key = {'ProductId': p['ProductId']}
result = dax.get_item(TableName=PRODUCTS_TABLE, Key=key)
print(f"DAX Cache Read: {result.get('Item')}")
結果
Cloudwatchメトリクスにて以下の結果になりました
メトリクス名 | 値 | 説明 |
---|---|---|
ConsumedWriteCapacityUnits | 4 | DynamoDB での書き込みキャパシティ使用量。2項目 × 2WCU(トランザクション) |
ConsumedReadCapacityUnits | 1 | DynamoDB 側での読み取りキャパシティ使用量(DAX経由のバックグラウンド読み込み) |
ItemCacheHits (DAX) | 0 | DAX キャッシュからのヒット数 |
ItemCacheMisses (DAX) | 1 | DAX でのキャッシュミス数 |
トランザクションで更新後すぐにDAXを読み込んでもキャッシュヒットしませんでした。これはトランザクション更新後バックグラウンドにて非同期でDAXに数秒以内にキャッシュが書き込まれるという仕様なので、キャッシュが更新される前に値を読み込んだことがきっかけだと思われます。なので上記の関数の商品を変えてトランザクションと読み込みに1秒の遅延を挟んでみます
import os
import time
from amazondax import AmazonDaxClient
# 環境変数から取得
DAX_ENDPOINT = os.environ['DAX_ENDPOINT']
PRODUCTS_TABLE = os.environ['PRODUCTS_TABLE']
# DAXクライアント
dax = AmazonDaxClient(endpoint_url=DAX_ENDPOINT)
def handler(event, context):
# 書き込むテストデータ
products = [
{
'ProductId': {'S': 'dax-desktop'},
'Name': {'S': 'デスクトップ'},
'Price': {'N': '120000'},
'Stock': {'N': '30'}
},
{
'ProductId': {'S': 'dax-filter'},
'Name': {'S': 'フィルター'},
'Price': {'N': '10000'},
'Stock': {'N': '5'}
}
]
# DAX 経由で TransactWriteItems を実行
dax.transact_write_items(
TransactItems=[
{'Put': {'TableName': PRODUCTS_TABLE, 'Item': products[0]}},
{'Put': {'TableName': PRODUCTS_TABLE, 'Item': products[1]}}
]
)
# バックグラウンドキャッシュ更新を待つ(例:1秒)
time.sleep(1)
# キャッシュ効果を検証:DAX経由のGetItemを実行
for p in products:
key = {'ProductId': p['ProductId']}
result = dax.get_item(TableName=PRODUCTS_TABLE, Key=key)
print(f"DAX Cache Read: {result.get('Item')}")
上記コードで実行すると適切にキャッシュヒットし、バックグラウンドでDAXにキャッシュが更新されていることを検証できました。
ただ予想外だったのは「ConsumedReadCapacityUnits」の値が1だったことです。以下公式ドキュメントの 「トランザクションの容量管理」 という項目にて以下のような記述がされており、予想では2つの項目を書き込む場合はWCUと同じく「4RCU」が消費される予想でした。
DynamoDB アクセラレーター (DAX) を使用していた場合、TransactWriteItems のコールで項目ごとに 2 つの読み込み容量単位 (RCU) も使用します。
ただ、結果としては何度やっても「1RCU」しか消費しませんでした。これに関しての公式的な記述はどこにも見つかりませんでしたが、予想としては書き込むサイズが小さい場合は裏側で効率的にトランザクションをまとめてくれているのかもしれません。
検証②:TransactGetItems のキャッシュバイパス動作
続いてはTransactGetItemsの動作確認です。下記公式ドキュメントの 「強力な整合性のあるトランザクション読み込み」 項目には「強い整合性のある読み込み」動作と同じくDAX内部のキャッシュを無視してDynamoDB内に項目を確認しに行き、その値をキャッシュせずに返すのでRCUのみが消費される旨が記載されています。なのでその動作を確認します。
Lambda関数
Productsテーブルから先ほど保存した2つの項目に関してトランザクションで読み込むだけのシンプルなコードです。
import os
from amazondax import AmazonDaxClient
# 環境変数
DAX_ENDPOINT = os.environ.get('DAX_ENDPOINT')
PRODUCTS_TABLE = os.environ.get('PRODUCTS_TABLE', 'Products')
# DAXクライアント初期化
dax = AmazonDaxClient(endpoint_url=DAX_ENDPOINT)
def handler(event, context):
# DAX経由でトランザクション読み取り実行
# キャッシュをバイパスしてDynamoDBから直接取得になるはず
dax.transact_get_items(
TransactItems=[
{
'Get': {
'TableName': PRODUCTS_TABLE,
'Key': {'ProductId': {'S': 'dax-phone'}}
}
},
{
'Get': {
'TableName': PRODUCTS_TABLE,
'Key': {'ProductId': {'S': 'dax-laptop'}}
}
}
]
)
結果
Cloudwatchメトリクスにて以下の結果になりました
メトリクス名 | 値 | 説明 |
---|---|---|
ConsumedWriteCapacityUnits | 0 | DynamoDB での書き込みキャパシティ使用量 |
ConsumedReadCapacityUnits | 4 | DynamoDB 側での読み取りキャパシティ使用量 2項目 × 2RCU(トランザクション) |
ItemCacheHits (DAX) | 0 | 予想通りキャッシュ関係の値は記録されない |
ItemCacheMisses (DAX) | 0 | 予想通りキャッシュ関係の値は記録されない |
これは各項目について準備用とコミット用のRCUがそれぞれ各項目について消費されるという公式ドキュメント通りの動きになりました。DAX経由で「TransactGetItems」を行っても実質DynamoDBに「強い整合性のある読み込み」を行うということです。
まとめ
検証内容 | 期待される挙動 | 実際の動作 |
---|---|---|
TransactWriteItems(直後にGetItem) | 書き込み後、DAXがバックグラウンドでTransactGetItemsを行い、キャッシュが更新される → DAXから取得できる | ItemCacheHits=0 ItemCacheMisses=1 ConsumedReadCapacityUnits = 1 ConsumedWriteCapacityUnits = 4 |
TransactWriteItems(1秒待機後にGetItem) | バックグラウンドキャッシュ更新後にDAXからのキャッシュヒット | ItemCacheHits=1 ItemCacheMisses=0 ConsumedReadCapacityUnits = 1 ConsumedWriteCapacityUnits = 4 |
TransactGetItems | キャッシュバイパス、DynamoDB本体から強整合性読み込み、キャッシュには記録されない | キャッシュヒット/ミスともに無記録ConsumedReadCapacityUnits = 4 ConsumedWriteCapacityUnits = 0 |
-
TransactWriteItems によるDAX 経由での更新時WCUは予想通りの消費になったが、裏側でのRCUの消費が実装によっては最適化される可能性がある。
-
トランザクション経由の書き込み後確実にDAXから値を取得したい場合、すぐにDAXキャッシュから値が取得できるとは限らず、キャッシュ更新に短時間かかるため少し待ってから読み取るのが望ましい。
最後に
今回はDAXでのトランザクション処理について検証してみました。さらっと公式ドキュメントに記載してある項目でも検証してみると色々なパターンが出てきて奥が深いですね。よりもっと動作を確認するためにもトランザクションのサイズや回数を増やして検証してみることが大事だと感じました。