DynamoDBでセマフォを実現するための同時アクセスを検証してみた

はじめに

こんにちは、平野です。

あるファイルが処理済みであるかを管理するような仕組みにDynamoDBを使うことがあります。 そして同時に複数走るLambdaでの処理を管理する場合、 このDynamoDBには同一レコードについて、(ほぼ)同時にステータスが確認されるということが起き得ます。 同一ファイルに対する処理が同時に2つ以上のプロセスで実行されてはまずい (そのためにわざわざDynamoDBを使っている)ので、 これが問題なく行われるのかどうかを確認しました。

DynamoDBには条件付き更新という機能があり、 それを使うことで期待した動作が得られそうでしたので、実際に試してみました。

準備

テーブルの作成

検証用テーブルを作成します。

ハッシュキーをLockID、ステータスを表すカラムをLockStatus1としました。

同時実行のやり方

同時実行する方法は色々あるかと思いますが、 今回はtmuxのsynchronize-panes機能を使って複数のペインから同時にテーブルを参照&書き込みに行きます。 tmuxのコマンドラインで下記を実行すると、synchronize-panesモードをON/OFFできます。 もちろんキーをバインドしても良いです。

set-window-option synchronize-panes

今回はとりあえず8つのペインから一斉にアクセスしてみます。

検証するシチュエーション

ある処理対象のファイルに対して、 処理を開始する場合にはステータスをRUNNINGにし、 処理が正常終了したらDONEに、 異常終了した場合はFAILEDにするという場面を想定しています。

再実行をする際には、ステータスがFAILEDである場合にだけ、 1つのプロセスが再実行の処理を実行することが望まれます。 複数プロセスが同時に処理を行うことは避けたいという状況です。

本当は初めてファイルを処理する際にも、 そのレコードをDynamoDBに作成するという操作が必要になりますが、 今回はすでにFAILEDになったレコードを再処理するという場面を取り上げています。 実際には初回にレコードを作成する場合にも、下記と同じように 「まだレコードがない」という条件をつけてレコードを作成する必要があります。

検証その1(失敗する例)

検証コード

以下のようなコードで実行します。 このコードでは、目的のIDのステータスをget_statusで取得し、 その文字列を見ることでFAILEDであるか確認し、FAILEDであれば目的の処理を実行します2

import sys
import boto3
import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table("lock_table_01")

def get_status(task_id):
    key = {"LockID": task_id}
    item = table.get_item(Key=key)
    return item['Item']['LockStatus']

def update_status(task_id, status):
    key = {"LockID": task_id}
    update = 'SET LockStatus = :status'
    attrib = {":status": status}
    table.update_item(
            Key=key,
            UpdateExpression=update,
            ExpressionAttributeValues=attrib)

def need_to_retry(task_id):
    status = get_status(task_id)
    if status == "DONE":
        print("LockStatus = " + status)
        return False
    elif status == "RUNNING":
        print("LockStatus = " + status)
        return False
    elif status == "FAILED":
        update_status(task_id, "RUNNING")
        print("LockStatus = " + status + " => \"RUNNING\"")
        return True


task_id = sys.argv[1]
if need_to_retry(task_id):
    # 目的の処理
    time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("output.txt", 'a') as f:
        f.write(task_id + "\t" + time + "\n")
    # 完了したらステータスを"DONE"に変更
    update_status(task_id, "DONE")

上記コードをtmuxの8つのペインから同時に実行します。

予測と結果

このコードではステータスを取得してから書き換えるまでにしばらく時間がかかるため、 複数のプロセスが同時にステータスを取得しに行くと、 一部or全部のプロセスがFAILEDを取得してしまうはずです。 そのため、1プロセスだけに目的の処理を行わせるという要望は叶わなくなりそうです。

実際、これを実行してみると、各ペインの結果は以下のような感じでした。

pane1$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane2$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane3$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane4$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane5$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane6$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane7$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane8$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"

8つのプロセスすべてがFAILEDRUNNINGに書きかえたと言っていますので、 8プロセス分の処理が走ってしまったことになります。

目的の処理の出力先であるoutput.txtを見ると、 以下のように8プロセス分の処理結果が格納されてしまっています。

test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33
test.txt     2019-04-18 17:29:33

何度か実行してみましたが、やはり全プロセスがFAILEDを取得してしまいました。 (少しぐらいRUNNINGが取れた方が結果としては面白かったのですが・・・)

検証その2(成功する例)

次にコードを修正して、条件付き更新を行うようにしてみます。

検証コード2

import sys
import boto3
import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table("lock_table_01")

def update_status_with_condition(task_id, status_to, status_from):
    key = {"LockID": task_id}
    update = 'SET LockStatus = :status_to'
    condition = "LockStatus = :condition"
    attrib = {":status_to": status_to, ":condition": status_from}
    table.update_item(
            Key=key,
            UpdateExpression=update,
            ConditionExpression=condition,
            ExpressionAttributeValues=attrib)


task_id = sys.argv[1]
try:
    update_status_with_condition(task_id, "RUNNING", "FAILED")
    print("LockStatus = \"FAILED\" => \"RUNNING\"")
    # 目的の処理
    time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("output.txt", 'a') as f:
        f.write(task_id + "\t" + time + "\n")
    # 完了したらステータスを"DONE"に変更
    update_status_with_condition(task_id, "DONE", "RUNNING")
except Exception as error:
    if error.response['Error']['Code'] == "ConditionalCheckFailedException":
        print("LockStatus is not \"FAILED\"")
    else:
        print("Unknown exception")
        sys.exit(1)

ConditionExpressionに条件を渡すことで、 その条件に合致した場合のみレコードの更新が行われるようにしました。 条件に合致しなかった場合はConditionalCheckFailedExceptionとなります。

こちらを実行した際の各ペインの出力は以下のような感じでした。

pane1$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane2$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane3$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane4$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane5$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane6$ python test2.py test.txt
LockStatus = "FAILED" => "RUNNING"
──────────────────────────────────────────
pane7$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane8$ python test2.py test.txt
LockStatus is not "FAILED"

今回はpane6が一番乗りで、FAILEDRUNNINGに書き換えました。 その他のペインはRUNNINGに書き換え後のアクセスになり、条件を満たしません。

output.txtの出力も1行だけであることが確認できます。

test.txt     2019-04-18 17:56:02

条件付き更新をしようすることで、 セマフォとして想定通りの動きをしていることが確認できました。

まとめ

DynamoDBのレコードをセマフォとしてロックをする際に、 同時アクセスされてもセマフォが想定通りに働くかどうかの確認を行いました。 条件つき更新を使用することで、同時アクセスに対しても(少なくとも1つのレコードに対しては) 原子性が保たれていることが確認できました。

ドキュメントを見れば、実現できるということをおよそ読み取ることはできたのですが、 実際に試してみたことでDynamoDBと少し仲良くなれたような気がします。

誰かの参考になれば幸いです。


  1. 最初、statusという名前で作成したら、データを更新しようとした際にキーワードだからダメと言われました。 
  2. 実際の場合は文字列ではなく数値などで判別する方が適切ですね。