この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
こんにちは、平野です。
あるファイルが処理済みであるかを管理するような仕組みにDynamoDBを使うことがあります。 そして同時に複数走るLambdaでの処理を管理する場合、 このDynamoDBには同一レコードについて、(ほぼ)同時にステータスが確認されるということが起き得ます。 同一ファイルに対する処理が同時に2つ以上のプロセスで実行されてはまずい (そのためにわざわざDynamoDBを使っている)ので、 これが問題なく行われるのかどうかを確認しました。
DynamoDBには条件付き更新という機能があり、 それを使うことで期待した動作が得られそうでしたので、実際に試してみました。
準備
テーブルの作成
検証用テーブルを作成します。
ハッシュキーをLockID
、ステータスを表すカラムをLockStatus
1としました。
同時実行のやり方
同時実行する方法は色々あるかと思いますが、
今回はtmuxのsynchronize-panes
機能を使って複数のペインから同時にテーブルを参照&書き込みに行きます。
tmuxのコマンドラインで下記を実行すると、synchronize-panes
モードをON/OFFできます。
もちろんキーをバインドしても良いです。
set-window-option synchronize-panes
今回はとりあえず8つのペインから一斉にアクセスしてみます。
検証するシチュエーション
ある処理対象のファイルに対して、
処理を開始する場合にはステータスをRUNNING
にし、
処理が正常終了したらDONE
に、
異常終了した場合はFAILED
にするという場面を想定しています。
再実行をする際には、ステータスがFAILED
である場合にだけ、
1つのプロセスが再実行の処理を実行することが望まれます。
複数プロセスが同時に処理を行うことは避けたいという状況です。
本当は初めてファイルを処理する際にも、
そのレコードをDynamoDBに作成するという操作が必要になりますが、
今回はすでにFAILED
になったレコードを再処理するという場面を取り上げています。
実際には初回にレコードを作成する場合にも、下記と同じように
「まだレコードがない」という条件をつけてレコードを作成する必要があります。
検証その1(失敗する例)
検証コード
以下のようなコードで実行します。
このコードでは、目的のIDのステータスをget_status
で取得し、
その文字列を見ることでFAILED
であるか確認し、FAILED
であれば目的の処理を実行します2。
import sys
import boto3
import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table("lock_table_01")
def get_status(task_id):
key = {"LockID": task_id}
item = table.get_item(Key=key)
return item['Item']['LockStatus']
def update_status(task_id, status):
key = {"LockID": task_id}
update = 'SET LockStatus = :status'
attrib = {":status": status}
table.update_item(
Key=key,
UpdateExpression=update,
ExpressionAttributeValues=attrib)
def need_to_retry(task_id):
status = get_status(task_id)
if status == "DONE":
print("LockStatus = " + status)
return False
elif status == "RUNNING":
print("LockStatus = " + status)
return False
elif status == "FAILED":
update_status(task_id, "RUNNING")
print("LockStatus = " + status + " => \"RUNNING\"")
return True
task_id = sys.argv[1]
if need_to_retry(task_id):
# 目的の処理
time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("output.txt", 'a') as f:
f.write(task_id + "\t" + time + "\n")
# 完了したらステータスを"DONE"に変更
update_status(task_id, "DONE")
上記コードをtmuxの8つのペインから同時に実行します。
予測と結果
このコードではステータスを取得してから書き換えるまでにしばらく時間がかかるため、
複数のプロセスが同時にステータスを取得しに行くと、
一部or全部のプロセスがFAILED
を取得してしまうはずです。
そのため、1プロセスだけに目的の処理を行わせるという要望は叶わなくなりそうです。
実際、これを実行してみると、各ペインの結果は以下のような感じでした。
pane1$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane2$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane3$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane4$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane5$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane6$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane7$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
──────────────────────────────────────────
pane8$ python test1.py test.txt
LockStatus = FAILED => "RUNNING"
8つのプロセスすべてがFAILED
をRUNNING
に書きかえたと言っていますので、
8プロセス分の処理が走ってしまったことになります。
目的の処理の出力先であるoutput.txt
を見ると、
以下のように8プロセス分の処理結果が格納されてしまっています。
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
test.txt 2019-04-18 17:29:33
何度か実行してみましたが、やはり全プロセスがFAILED
を取得してしまいました。
(少しぐらいRUNNINGが取れた方が結果としては面白かったのですが・・・)
検証その2(成功する例)
次にコードを修正して、条件付き更新を行うようにしてみます。
検証コード2
import sys
import boto3
import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table("lock_table_01")
def update_status_with_condition(task_id, status_to, status_from):
key = {"LockID": task_id}
update = 'SET LockStatus = :status_to'
condition = "LockStatus = :condition"
attrib = {":status_to": status_to, ":condition": status_from}
table.update_item(
Key=key,
UpdateExpression=update,
ConditionExpression=condition,
ExpressionAttributeValues=attrib)
task_id = sys.argv[1]
try:
update_status_with_condition(task_id, "RUNNING", "FAILED")
print("LockStatus = \"FAILED\" => \"RUNNING\"")
# 目的の処理
time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("output.txt", 'a') as f:
f.write(task_id + "\t" + time + "\n")
# 完了したらステータスを"DONE"に変更
update_status_with_condition(task_id, "DONE", "RUNNING")
except Exception as error:
if error.response['Error']['Code'] == "ConditionalCheckFailedException":
print("LockStatus is not \"FAILED\"")
else:
print("Unknown exception")
sys.exit(1)
ConditionExpression
に条件を渡すことで、
その条件に合致した場合のみレコードの更新が行われるようにしました。
条件に合致しなかった場合はConditionalCheckFailedException
となります。
こちらを実行した際の各ペインの出力は以下のような感じでした。
pane1$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane2$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane3$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane4$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane5$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane6$ python test2.py test.txt
LockStatus = "FAILED" => "RUNNING"
──────────────────────────────────────────
pane7$ python test2.py test.txt
LockStatus is not "FAILED"
──────────────────────────────────────────
pane8$ python test2.py test.txt
LockStatus is not "FAILED"
今回はpane6が一番乗りで、FAILED
をRUNNING
に書き換えました。
その他のペインはRUNNING
に書き換え後のアクセスになり、条件を満たしません。
output.txt
の出力も1行だけであることが確認できます。
test.txt 2019-04-18 17:56:02
条件付き更新をしようすることで、 セマフォとして想定通りの動きをしていることが確認できました。
まとめ
DynamoDBのレコードをセマフォとしてロックをする際に、 同時アクセスされてもセマフォが想定通りに働くかどうかの確認を行いました。 条件つき更新を使用することで、同時アクセスに対しても(少なくとも1つのレコードに対しては) 原子性が保たれていることが確認できました。
ドキュメントを見れば、実現できるということをおよそ読み取ることはできたのですが、 実際に試してみたことでDynamoDBと少し仲良くなれたような気がします。
誰かの参考になれば幸いです。