DynamoDBのトランザクションを試してみた #reinvent

先日のAWS re:Invent 2018においてDynamoDBのトランザクションがリリースされました。ということで、実際に試してみました。
2018.12.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日のAWS re:Invent 2018においてDynamoDBのトランザクションがリリースされました。ということで、実際に試してみました。

[速報]待望のDynamoDBのトランザクションがリリースされました! #reinvent

実行環境

  • MacBook Pro (13-inch, 2017, Four Thunderbolt 3 Ports)
  • Python3
  • boto3
  • 東京リージョン
$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.13.6
BuildVersion:   17G3025
$ python --version
Python 3.6.6
$ pip list
Package         Version
--------------- -------
boto3           1.9.58
botocore        1.12.58
docutils        0.14
jmespath        0.9.3
pip             18.1
python-dateutil 2.7.5
s3transfer      0.1.13
setuptools      39.0.1
six             1.11.0
urllib3         1.24.1

検証内容

今回はシナリオをシンプルにするため、単一テーブルの複数アイテムを同時更新した際に一部アイテムが事前条件を満たさず、結果的に全アイテムの書き込みに失敗するケースを確認したいと思います。具体的にはAccounts(口座)テーブルにDynamoさんとDBさんの2人分のアイテムを作成し、DynamoさんからDBさんへ振り込みをした際にエラーが発生するシナリオを確認したいと思います。

まずはソースを丸ごと貼っておきます。

import time
import boto3

# 前処理。まず検証用にテーブルを作成する
table_name = 'Accounts_{}'.format(time.time())
client = boto3.client('dynamodb', region_name='ap-northeast-1')
response = client.create_table(
    TableName=table_name,
    KeySchema=[
        {
            'AttributeName': 'name',
            'KeyType': 'HASH'
        },
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'name',
            'AttributeType': 'S'
        },
    ],
    BillingMode='PAY_PER_REQUEST'
)
print('create_table: TableName = {}'.format(response['TableDescription']['TableName']))

# テーブルの初期化作業の完了待ち
table_exists_waiter = client.get_waiter('table_exists')
table_exists_waiter.wait(TableName=table_name)

# 次にアイテムを2件登録する
# Dynamoさんの口座の残高を 100 に、DBさんの口座の残高を 50 に設定するイメージ
response = client.put_item(
    TableName=table_name,
    Item={
        'name': {'S': 'Dynamo'},
        'balance': {'N': '100'}
    },
    ReturnConsumedCapacity='INDEXES'
)
print('put_item: ConsumedCapacity = {}'.format(response['ConsumedCapacity']))
response = client.put_item(
    TableName=table_name,
    Item={
        'name': {'S': 'DB'},
        'balance': {'N': '50'}
    },
    ReturnConsumedCapacity='INDEXES'
)
print('put_item: ConsumedCapacity = {}'.format(response['ConsumedCapacity']))

# 1件だけ更新する
# Dynamoさんの口座から 20 引き出されたイメージ
response = client.update_item(
    TableName=table_name,
    Key={
        'name': {'S': 'Dynamo'}
    },
    AttributeUpdates={
        'balance': {
            'Action': 'PUT',
            'Value': {'N': '80'}
        }
    },
    ReturnConsumedCapacity='INDEXES'
)
print('update_item: ConsumedCapacity = {}'.format(response['ConsumedCapacity']))

# 2件まとめて更新して両方共エラーになることを確認する
# Dynamoさんの口座からDBさんの口座に 10 振り込む処理の裏側で同時に先程のDynamoさんの口座の引き落としが発生していたイメージ
try:
    client.transact_write_items(
        ReturnConsumedCapacity='INDEXES',
        TransactItems=[
            {
                'Update': {
                    'TableName': table_name,
                    'Key': {
                        'name': {'S': 'Dynamo'}
                    },
                    'ConditionExpression': 'balance = :bc',
                    'UpdateExpression': 'SET balance = :bu',
                    'ExpressionAttributeValues': {
                        ':bc': {'N': '100'},  # 元々は 100 の想定
                        ':bu': {'N': '90'},  # 100 - 10 = 90
                    }
                }
            },
            {
                'Update': {
                    'TableName': table_name,
                    'Key': {
                        'name': {'S': 'DB'}
                    },
                    'ConditionExpression': 'balance = :bc',
                    'UpdateExpression': 'SET balance = :bu',
                    'ExpressionAttributeValues': {
                        ':bc': {'N': '50'},  # 元々は 50 の想定
                        ':bu': {'N': '60'},  # 50 + 10 = 60
                    }
                }
            }
        ]
    )
except Exception as e:
    print('transact_write_items: {}'.format(e))

# エラーになったため再度最新の値を取得して振り込み処理をやり直すイメージ
response = client.transact_get_items(
    TransactItems=[
        {
            'Get': {
                'TableName': table_name,
                'Key': {
                    'name': {'S': 'Dynamo'}
                }
            }
        },
        {
            'Get': {
                'TableName': table_name,
                'Key': {
                    'name': {'S': 'DB'}
                }
            }
        }
    ]
)
print('transact_get_items: Responses = {}'.format(response['Responses']))

# 再度振込処理を実行するイメージ
response = client.transact_write_items(
    ReturnConsumedCapacity='INDEXES',
    TransactItems=[
        {
            'Update': {
                'TableName': table_name,
                'Key': {
                    'name': {'S': 'Dynamo'}
                },
                'ConditionExpression': 'balance = :bc',
                'UpdateExpression': 'SET balance = :bu',
                'ExpressionAttributeValues': {
                    ':bc': {'N': '80'},  # 80 に修正
                    ':bu': {'N': '70'},  # 80 - 10 = 70
                }
            }
        },
        {
            'Update': {
                'TableName': table_name,
                'Key': {
                    'name': {'S': 'DB'}
                },
                'ConditionExpression': 'balance = :bc',
                'UpdateExpression': 'SET balance = :bu',
                'ExpressionAttributeValues': {
                    ':bc': {'N': '50'},  # 変更なし
                    ':bu': {'N': '60'},  # 50 + 10 = 60
                }
            }
        }
    ]
)
print('transact_write_items: ConsumedCapacity = {}'.format(response['ConsumedCapacity']))

# 両方のアイテムが想定通り更新されていることを確認する
response = client.transact_get_items(
    TransactItems=[
        {
            'Get': {
                'TableName': table_name,
                'Key': {
                    'name': {'S': 'Dynamo'}
                }
            }
        },
        {
            'Get': {
                'TableName': table_name,
                'Key': {
                    'name': {'S': 'DB'}
                }
            }
        }
    ]
)
print('transact_get_items: Responses = {}'.format(response['Responses']))

# 後処理。テーブルを削除する
response = client.delete_table(TableName=table_name)
print('delete_table: TableName = {}'.format(response['TableDescription']['TableName']))

実行結果は以下のような出力になります。

create_table: TableName = Accounts_1543972687.451009
put_item: ConsumedCapacity = {'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 1.0, 'Table': {'CapacityUnits': 1.0}}
put_item: ConsumedCapacity = {'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 1.0, 'Table': {'CapacityUnits': 1.0}}
update_item: ConsumedCapacity = {'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 1.0, 'Table': {'CapacityUnits': 1.0}}
transact_write_items: An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed, None]
transact_get_items: Responses = [{'Item': {'name': {'S': 'Dynamo'}, 'balance': {'N': '80'}}}, {'Item': {'name': {'S': 'DB'}, 'balance': {'N': '50'}}}]
transact_write_items: ConsumedCapacity = [{'TableName': 'Accounts_1543972687.451009', 'CapacityUnits': 4.0, 'WriteCapacityUnits': 4.0, 'Table': {'WriteCapacityUnits': 4.0, 'CapacityUnits': 4.0}}]
transact_get_items: Responses = [{'Item': {'name': {'S': 'Dynamo'}, 'balance': {'N': '70'}}}, {'Item': {'name': {'S': 'DB'}, 'balance': {'N': '60'}}}]
delete_table: TableName = Accounts_1543972687.451009

ソース自体はシンプルだと思うので解説は省きまして、実行結果のConsumedCapacityに注目したいと思います。put_itemやupdate_itemは当然ですが{'CapacityUnits': 1.0}となっています。一方でtransact_write_itemsの場合は{'WriteCapacityUnits': 4.0, 'CapacityUnits': 4.0}となっています。2件のアイテムを更新したのだから2になるかと思ったらのその倍の4になっていますよね。こちらが通常の更新処理とトランザクションを利用した場合の差分で、トランザクションを利用した場合は事前確認と実際のコミットで2倍のCapacityUnitsを消費する仕様になっています。そのため、トランザクションを利用する場合はいままでよりもCapacityUnitsを消費することにご注意ください。

CapacityUnitsも含めた留意事項の一覧は公式ドキュメントにまとまっています。実際にトランザクションを利用する前に一度目を通しておくとよいかと考えます。

Amazon DynamoDB Transactions: How it Works - Amazon DynamoDB

参考資料

最後に

boto3にはDynamoDB.Tableというもう少し直感的に操作できるAPIが存在するのですが、transact_write_itemsは複数テーブル横断の機能であるためDynamoDB.Tableの延長では実装が難しい気がします。。とはいえ、やはりソースが直感的ではないので抽象度の高いAPIが出るといいなと思いました。
(今回も検証のためにLow-level APIの使い方から調べることになったので。。)