DynamoDBのアトミックカウンターを利用してみた

DynamoDBで連番を管理する方法であるアトミックカウンターについて、Pythonを用いて動作含め確認してみました。
2023.12.15

DA事業本部の横山です。

連番を管理する必要があったので、DynamoDBのアトミックカウンターをPythonを用いて利用してみました。

前提条件

本記事で利用しているPythonのバージョンは以下になります。

  • Python: 3.8.13

アトミックカウンターとは

DynamoDBのUpdateItemオペレーションを利用してアトミックカウンターを実装してみます。

アトミックカウンターとは、他の書き込みリクエストに干渉することなく無条件に増分される数値属性を指します。

注意点として、エラー時のリトライ動作等によって複数回更新される場合があるためカウントの誤差が許容されない場合には条件付き更新等のアトミックカウンター以外の方法を検討する必要があります。

DynamoDBのテーブル定義及び初期値

Key: nameをハッシュキーとするテーブルを作成します。テーブル定義は以下です。

{
  "AttributeDefinitions": [
    {
      "AttributeName": "name",
      "AttributeType": "S"
    }
  ],
  "KeySchema": [
    {
      "AttributeName": "name",
      "KeyType": "HASH"
    }
  ],
  "BillingMode": "PAY_PER_REQUEST"
}

ハッシュキーをname=atomic_counterとして。初期値0のレコードを作成します。

[
    {
        "PutRequest": {
            "Item": {
                "name": "atomic_counter",
                "value": 0
            }
        }
    }
]

アトミックカウンターを増やしてみる

試しに4スレッドでDynamoDBに対するアトミックカウンターの採番(インクリメント)を100回行ってみて、重複なく連番になっているか確認してみます。

参考コードを記載しておきます。

import decimal
import threading

import boto3


def increment_atomic_counter(
    atomic_counter_table, increment_num: int, thread_idx: int
) -> None:
    """DynamoDBのatomic-counterをインクリメントして値を取得する

    Args:
        atomic_counter_table : テーブルリソース
        increment_num (int) : 加算する数
    """
    response = atomic_counter_table.update_item(
        Key={"name": "atomic_counter"},
        UpdateExpression="ADD #key :increment",
        ExpressionAttributeNames={"#key": "value"},
        ExpressionAttributeValues={":increment": decimal.Decimal(increment_num)},
        ReturnValues="ALL_NEW",
    )
    atomic_counter = response.get("Attributes").get("value")

    print(f"Thread {thread_idx}: {atomic_counter}")


dynamodb = boto3.resource("dynamodb")
atomic_counter_table = dynamodb.Table("atomic-counter")


threads = []

for num in range(100):
    for idx in range(4):
        thread = threading.Thread(
            target=increment_atomic_counter, args=([atomic_counter_table, 1, idx])
        )
        thread.start()
        threads.append(thread)

for thread in threads:
    thread.join()

実行結果は割愛しますが以下のようになります。 出力段階では順番はばらばらですが、すべてまとめてソートして確認したところ重複なく連番になっていることが確認できました。

Thread 0: 5
Thread 3: 1
Thread 1: 2
Thread 2: 3
Thread 1: 8
Thread 3: 6
Thread 3: 7
Thread 3: 10
Thread 1: 11
Thread 2: 4
Thread 0: 12
Thread 2: 9
...

今回の参考コードでは、インクリメント値は1としていますが1より大きい値で一気にインクリメントすることも可能です。更新頻度が非常に高い場合はコスト面や性能面含めてまとめてインクリメントする必要が出てくる可能性があります。

注意点として、エラー時のリトライ等でアトミックカウンターが飛び番となる可能性があるのでこのまま利用する場合はそれを許容できるかを確認してから利用する必要があります。 例えば、更新に成功して値を取得した後にそれを利用する処理が失敗した場合、またアトミックカウンターの採番(インクリメント)が走ってしまう等。

おわりに

DynamoDBのアトミックカウンターをPythonを用いて利用してみました。 テーブルを用意して、更新と値の取得ルールをまもるだけなので手軽に利用しやすいですね。 手軽に連番を管理したい場合に利用を検討してみてはいかがでしょうか?

以上になります。この記事がどなたかの助けになれば幸いです。