Python(Boto3)でDynamoDB上のデータの「アップサート」と「条件付き書き込み」を一度にできるのか確認してみた

2020.03.23

こんにちは、CX事業本部の若槻です。

案件でIoTデバイスのステート管理をDynamoDB上のテーブルで行うことになり、そのときに以下のような仕様が要求されました。(実際よりは簡潔化しています)

  • デバイスは自身のステート変更時にステート情報をAWSに送信してくる。
  • テーブル(state_table)では以下のAttributionを管理する。
    • deviceId(String):デバイス識別ID(Partition Key)
    • state(Boolean):デバイスのステートのブール値
    • deviceTimeStamp(String):ステート変更時にデバイス側で記録されるタイムスタンプ
  • 新しいステートデータが送信されてくるたびに、そのデバイスのdeviceIdのキーのstatedeviceTimeStampの値をDynamoDB上で更新する。
  • 新規デバイスのステートデータは初回のステート送信時に初めて作成されるようにする。
  • システムの都合上、このステートデータへの書き込み処理はIoT RuleとLambdaの2経路から行われる。

このとき、Lambda側のステートデータの書き込み処理の実装は以下のように行う必要があります。

  • キーがあれば更新、なければ新規作成をする(アップサート
  • キーの更新は、新規データのdeviceTimaStamp属性の値が既存データより小さければ行わない(条件付き書き込み

Lambdaの実装はPythonとBoto3を利用するため、Boto3でDynamoDBのデータの「アップサート」と「条件付き書き込み」を一度にできるのか確認してみました。

確認してみた

DynamoDB.Tableクラスのupdate_item()メソッド使って、ステートデータの「アップサート」と「条件付き書き込み」を一度に行う以下のようなスクリプトを作成しました。

import boto3
from boto3.resources.base import ServiceResource
from botocore.exceptions import ClientError

dynamodb_resource = boto3.resource('dynamodb')

# ステートデータ
device_id = '<デバイスID>'
state = '<ステート>'
device_time_stamp = '<タイムスタンプ>'

def upsate_state(device_id: str, state: bool, device_time_stamp: str, dynamodb_resource: ServiceResource) -> None:
    table = dynamodb_resource.Table('device_state')
    try:
        option = {
            'Key': {'deviceId': device_id},
            'ConditionExpression': '#deviceTimeStamp < :device_time_stamp',
            'UpdateExpression': 'set #deviceTimeStamp = :device_time_stamp, #state = :state',
            'ExpressionAttributeNames': {
                '#deviceTimeStamp': 'deviceTimeStamp',
                '#state': 'state'
            },
            'ExpressionAttributeValues': {
                ':device_time_stamp': device_time_stamp,
                ':state': state
            }
        }
        table.update_item(**option)
        return

    except ClientError as e:
        if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
            raise
        print("old or not_exist data!!!")
  • update_item()は、キーがあれば更新し無ければ新規作成をする「アップサート」の処理を行ってくれます。
  • update_item()に渡すオプションでConditionExpressionを指定すれば「条件付き書き込み」を行うことができます。値に指定した条件式を満たせば更新し、満たさなければClientErrorConditionalCheckFailedExceptionを発生させます。
  • 上記スクリプトではConditionExpressionで「新規データのdeviceTimaStamp属性の値が既存データより大きい」ことを書き込み条件にしています。

上記スクリプトを用いて、3パターンのステートデータで動作を確認したところ、下記のような結果となりました。

No. 既存キー 既存タイムスタンプとの比較 書き込み結果
1 あり 大きい 成功(DynamoDB上のデータが更新された)
2 小さい 失敗(ConditionalCheckFailedException発生)
3 なし - 失敗(ConditionalCheckFailedException発生)

No.1と2は想定どおりの結果となりましたが、今回の確認対象のパターンのNo.3では、update_item()ConditionExpressionを指定した上で、既存キーのないデータの新規作成を行おうとすると、ConditionalCheckFailedExceptionにより書き込みが失敗するという結果となりました。

よって以上の確認により、「アップサート」と「条件付き書き込み」をupdate_item()による一度の書き込み処理で行うことはできないという結論となりました。

「アップサート」と「条件付き書き込み」のいずれも行う方法

「アップサート」と「条件付き書き込み」のいずれも行いたい場合は、update_item()に加えほかのメソッドを併用する必要があります。例として以下のような方法が考えられます。

  • 1. get_item()で事前にキーの存在チェックを行い、存在する場合のみupdate_item()ConditionExpressionを指定する
  • 2. update_item()ConditionalCheckFailedExceptionにより書き込みが失敗した場合は、既存キーがない場合に限りput_item()でデータを新規作成する
  • など

2つ目の方法の場合は以下のようなスクリプトとなります。put_item()ConditionExpressionattribute_not_exists(deviceId)と指定して、キーが存在しない場合にのみデータを新規作成するようにすれば、当初の要求を満たす処理とすることができました。

import boto3
from boto3.resources.base import ServiceResource
from botocore.exceptions import ClientError

dynamodb_resource = boto3.resource('dynamodb')

# ステートデータ
device_id = '<デバイスID>'
state = '<ステート>'
device_time_stamp = '<タイムスタンプ>'

def upsate_state(device_id: str, state: bool, device_time_stamp: str, dynamodb_resource: ServiceResource) -> None:
    table = dynamodb_resource.Table('device_state')
    try:
        option = {
            'Key': {'deviceId': device_id},
            'ConditionExpression': '#deviceTimeStamp < :device_time_stamp',
            'UpdateExpression': 'set #deviceTimeStamp = :device_time_stamp, #state = :state',
            'ExpressionAttributeNames': {
                '#deviceTimeStamp': 'deviceTimeStamp',
                '#state': 'state'
            },
            'ExpressionAttributeValues': {
                ':device_time_stamp': device_time_stamp,
                ':state': state
            }
        }
        table.update_item(**option)
        return

    except ClientError as e:
        if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
            raise
        print('old or not_exist data.')

    try:
        item = {
            'deviceId': device_id,
            'state': state,
            'deviceTimeStamp': device_time_stamp
        }
        table.put_item(
            Item=item,
            ConditionExpression='attribute_not_exists(deviceId)'
        )
    except ClientError as e:
        if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
            raise
        print('old data.')

おわりに

DynamoDBのデータ更新で「アップサート」と「条件付き書き込み」を一度にできるのか確認してみたところ、結果として複数のメソッドを組み合わせる必要があったという話でした。単一のメソッドで出来た方が冗長にならないので良いのですが、現在のところはこの方法が(おそらく)一番有用そうです。

以上