DynamoDBのトランザクションを試してみた #reinvent
先日のAWS re:Invent 2018においてDynamoDBのトランザクションがリリースされました。ということで、実際に試してみました。
実行環境
- MacBook Pro (13-inch, 2017, Four Thunderbolt 3 Ports)
- Python3
- boto3
- 東京リージョン
$ sw_vers ProductName: Mac OS X ProductVersion: 10.13.6 BuildVersion: 17G3025 $ python --version Python 3.6.6 $ pip list Package Version --------------- ------- boto3 1.9.58 botocore 1.12.58 docutils 0.14 jmespath 0.9.3 pip 18.1 python-dateutil 2.7.5 s3transfer 0.1.13 setuptools 39.0.1 six 1.11.0 urllib3 1.24.1
検証内容
今回はシナリオをシンプルにするため、単一テーブルの複数アイテムを同時更新した際に一部アイテムが事前条件を満たさず、結果的に全アイテムの書き込みに失敗するケースを確認したいと思います。具体的にはAccounts(口座)テーブルにDynamoさんとDBさんの2人分のアイテムを作成し、DynamoさんからDBさんへ振り込みをした際にエラーが発生するシナリオを確認したいと思います。
まずはソースを丸ごと貼っておきます。
import time import boto3 # 前処理。まず検証用にテーブルを作成する table_name = 'Accounts_{}'.format(time.time()) client = boto3.client('dynamodb', region_name='ap-northeast-1') response = client.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'name', 'KeyType': 'HASH' }, ], AttributeDefinitions=[ { 'AttributeName': 'name', 'AttributeType': 'S' }, ], BillingMode='PAY_PER_REQUEST' ) print('create_table: TableName = {}'.format(response['TableDescription']['TableName'])) # テーブルの初期化作業の完了待ち table_exists_waiter = client.get_waiter('table_exists') table_exists_waiter.wait(TableName=table_name) # 次にアイテムを2件登録する # Dynamoさんの口座の残高を 100 に、DBさんの口座の残高を 50 に設定するイメージ response = client.put_item( TableName=table_name, Item={ 'name': {'S': 'Dynamo'}, 'balance': {'N': '100'} }, ReturnConsumedCapacity='INDEXES' ) print('put_item: ConsumedCapacity = {}'.format(response['ConsumedCapacity'])) response = client.put_item( TableName=table_name, Item={ 'name': {'S': 'DB'}, 'balance': {'N': '50'} }, ReturnConsumedCapacity='INDEXES' ) print('put_item: ConsumedCapacity = {}'.format(response['ConsumedCapacity'])) # 1件だけ更新する # Dynamoさんの口座から 20 引き出されたイメージ response = client.update_item( TableName=table_name, Key={ 'name': {'S': 'Dynamo'} }, AttributeUpdates={ 'balance': { 'Action': 'PUT', 'Value': {'N': '80'} } }, ReturnConsumedCapacity='INDEXES' ) print('update_item: ConsumedCapacity = {}'.format(response['ConsumedCapacity'])) # 2件まとめて更新して両方共エラーになることを確認する # Dynamoさんの口座からDBさんの口座に 10 振り込む処理の裏側で同時に先程のDynamoさんの口座の引き落としが発生していたイメージ try: client.transact_write_items( ReturnConsumedCapacity='INDEXES', TransactItems=[ { 'Update': { 'TableName': table_name, 'Key': { 'name': {'S': 'Dynamo'} }, 'ConditionExpression': 'balance = :bc', 'UpdateExpression': 'SET balance = :bu', 'ExpressionAttributeValues': { ':bc': {'N': '100'}, # 元々は 100 の想定 ':bu': {'N': '90'}, # 100 - 10 = 90 } } }, { 'Update': { 'TableName': table_name, 'Key': { 'name': {'S': 'DB'} }, 'ConditionExpression': 'balance = :bc', 'UpdateExpression': 'SET balance = :bu', 'ExpressionAttributeValues': { ':bc': {'N': '50'}, # 元々は 50 の想定 ':bu': {'N': '60'}, # 50 + 10 = 60 } } } ] ) except Exception as e: print('transact_write_items: {}'.format(e)) # エラーになったため再度最新の値を取得して振り込み処理をやり直すイメージ response = client.transact_get_items( TransactItems=[ { 'Get': { 'TableName': table_name, 'Key': { 'name': {'S': 'Dynamo'} } } }, { 'Get': { 'TableName': table_name, 'Key': { 'name': {'S': 'DB'} } } } ] ) print('transact_get_items: Responses = {}'.format(response['Responses'])) # 再度振込処理を実行するイメージ response = client.transact_write_items( ReturnConsumedCapacity='INDEXES', TransactItems=[ { 'Update': { 'TableName': table_name, 'Key': { 'name': {'S': 'Dynamo'} }, 'ConditionExpression': 'balance = :bc', 'UpdateExpression': 'SET balance = :bu', 'ExpressionAttributeValues': { ':bc': {'N': '80'}, # 80 に修正 ':bu': {'N': '70'}, # 80 - 10 = 70 } } }, { 'Update': { 'TableName': table_name, 'Key': { 'name': {'S': 'DB'} }, 'ConditionExpression': 'balance = :bc', 'UpdateExpression': 'SET balance = :bu', 'ExpressionAttributeValues': { ':bc': {'N': '50'}, # 変更なし ':bu': {'N': '60'}, # 50 + 10 = 60 } } } ] ) print('transact_write_items: ConsumedCapacity = {}'.format(response['ConsumedCapacity'])) # 両方のアイテムが想定通り更新されていることを確認する response = client.transact_get_items( TransactItems=[ { 'Get': { 'TableName': table_name, 'Key': { 'name': {'S': 'Dynamo'} } } }, { 'Get': { 'TableName': table_name, 'Key': { 'name': {'S': 'DB'} } } } ] ) print('transact_get_items: Responses = {}'.format(response['Responses'])) # 後処理。テーブルを削除する response = client.delete_table(TableName=table_name) print('delete_table: TableName = {}'.format(response['TableDescription']['TableName']))
実行結果は以下のような出力になります。
create_table: TableName = Accounts_1543972687.451009 put_item: ConsumedCapacity = {'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 1.0, 'Table': {'CapacityUnits': 1.0}} put_item: ConsumedCapacity = {'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 1.0, 'Table': {'CapacityUnits': 1.0}} update_item: ConsumedCapacity = {'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 1.0, 'Table': {'CapacityUnits': 1.0}} transact_write_items: An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed, None] transact_get_items: Responses = [{'Item': {'name': {'S': 'Dynamo'}, 'balance': {'N': '80'}}}, {'Item': {'name': {'S': 'DB'}, 'balance': {'N': '50'}}}] transact_write_items: ConsumedCapacity = [{'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 4.0, 'WriteCapacityUnits': 4.0, 'Table': {'WriteCapacityUnits': 4.0, 'CapacityUnits': 4.0}}] transact_get_items: Responses = [{'Item': {'name': {'S': 'Dynamo'}, 'balance': {'N': '70'}}}, {'Item': {'name': {'S': 'DB'}, 'balance': {'N': '60'}}}] delete_table: TableName = Accounts_1543972687.451009
ソース自体はシンプルだと思うので解説は省きまして、実行結果のConsumedCapacityに注目したいと思います。put_itemやupdate_itemは当然ですが{'CapacityUnits': 1.0}
となっています。一方でtransact_write_itemsの場合は{'WriteCapacityUnits': 4.0, 'CapacityUnits': 4.0}
となっています。2件のアイテムを更新したのだから2
になるかと思ったらのその倍の4
になっていますよね。こちらが通常の更新処理とトランザクションを利用した場合の差分で、トランザクションを利用した場合は事前確認と実際のコミットで2倍のCapacityUnitsを消費する仕様になっています。そのため、トランザクションを利用する場合はいままでよりもCapacityUnitsを消費することにご注意ください。
CapacityUnitsも含めた留意事項の一覧は公式ドキュメントにまとまっています。実際にトランザクションを利用する前に一度目を通しておくとよいかと考えます。
Amazon DynamoDB Transactions: How it Works - Amazon DynamoDB
参考資料
- 新機能 – DynamoDB Transactions | Amazon Web Services ブログ
- 公式のリリースブログエントリーです。非常に分かりやすく簡潔にまとまっていますし、Node.jsのサンプルも載っています。
- DynamoDB Transactions Example - Amazon DynamoDB
- Javaによる複数テーブルを利用したサンプルが載っています。
- [レポート] DAT374 :[新機能!] Amazon DynamoDBのTransactionsを用いたモダンなアプリケーションの構築 #reinvent | DevelopersIO
- re:Invent 2018のDynamoDBのトランザクションに関するセッションです。ユースケースや注意点などが紹介されています。
- DynamoDBのトランザクションについてFAQ形式で答えてみる #reinvent | DevelopersIO
- FAQがまとまっています。
最後に
boto3にはDynamoDB.Table
というもう少し直感的に操作できるAPIが存在するのですが、transact_write_items
は複数テーブル横断の機能であるためDynamoDB.Table
の延長では実装が難しい気がします。。とはいえ、やはりソースが直感的ではないので抽象度の高いAPIが出るといいなと思いました。
(今回も検証のためにLow-level APIの使い方から調べることになったので。。)