S3をトリガーとするLambdaの冪等性をDynamoDBで実現してみた
どうも!AWS勉強中の西村祐二@大阪です。
みなさんLambdaは使ってますでしょうか。
サーバレスでいろんなことができるので個人的にかなり好きなサービスです。
Lambdaは様々な使い方ができますが
S3と連携してファイルがアップロードされたらLambdaで
何かしらの処理を実行するなどの使い方をしている方多いのではないでしょうか。
そんな時に重要になってくるのが冪等性です。
今回はLambdaの冪等性をDynamoDBの条件付き書き込みという機能を利用して
冪等性を実現してみたいと思います。
なんで冪等性が必要なのか
- Lambdaはたまに2回以上実行されるときがあるからです
例えば
Lambdaでアクセスログの集計処理などを行っている場合、
複数回処理されると複数回集計されて結果がおかしくなるときがあります。
また、複数のファイルをまとめてアップロードした際に
何らかの理由で一部のファイルのみ
処理が実行されてしまったときや、ファイルが破損していたときに
正常に処理されたファイルを人手で除外して
再アップロードしなければならないことがあります。
そんな時に、冪等性を確保する仕組みを作っておくと
何も気にせずにS3に再アップロードすれば、
正常に処理が完了していないものだけ処理する事ができます。
また、Lambdaによって複数回処理が実行されても結果に影響がでなくなります。
Lambda内で冪等性を実現できないか
いろいろ方法はあると思いますが、
Lambdaは常に同一インスタンスで実行されるわけではないため難しいと思われます。
そのため今回は外部(DynamoDB)で処理状況を管理することによって冪等性を実現しています。
DynamoDBの条件付き書き込みとは
簡単に言うと、複数人で同じテーブルの項目を更新しようとするときに
コンフリクトしないようロックする機能です。
今回はこれを利用して
- Lambdaが実行されたらロックし、複数回実行されても結果が変わらない(処理が実行されない)ようにする
- Lambdaの処理に失敗したらアンロックし、再処理できるようにする
を実装してみたいと思います。
DynamoDBの条件付き書き込みの詳細は公式ドキュメントをご確認ください。
今回実装すること
今回、S3とLambdaとDynamoDBで
下記の仕組みを実装してみたいと思います。
▶S3にファイルがアップロードされると、
Lambdaによってアップロードされたファイルのコンテンツタイプをログに出力する。
▶同じファイルを再アップロードしても、コンテンツタイプはログに表示されない。
▶処理に失敗したときに、ロックを解除し再処理可能にする。
では、さっそく作っていきましょう。
ロールの作成
Lambdaで使用するロールを作成します。
今回はCloudWatchログ、S3、DynamoDBへアクセスするので
この3つのリソースへアクセスする権限を付与しておきます。
手順
▼マネジメントコンソールのIAMの画面に移動します。
「ロール」→「新しいロールの作成」をクリックします。
▼ロールタイプの選択で、「AWS Lambda」を選択します。
▼「AmazonDynamoDBFullAccess」「AmazonS3FullAccess」「CloudWatchLogsFullAccess」に
チェックをつけて次のステップへのボタンをクリックします。
▼ロール名を記載(今回は「s3-dynamodb」としています)し「ロールの作成」をクリックすると今回利用するロールが作成されます。
DynamoDBでテーブルの作成
Lambdaで処理が完了したかどうかをファイル名をキーとして状態を記録しておくテーブルを作成します。
手順
▼マネジメントコンソールからDynamoDBにアクセスし、「テーブルの作成」をクリックします。
▼今回は下記の設定でテーブルを作成しました。
テーブル名:test-lambda
プライマリキー:filename
▼作成が完了すると下記の画面に移動するかと思います。
▼項目タブをクリックすると現在のテーブルの状況が確認できます。
はじめは何も項目はありませんが、Lambdaによって処理が完了すると
画像下の赤枠のようにファイル名とステータスが追加されていきます。
S3でバケットの作成
ファイルを保存しておくためのバケットを作成します。
手順
▼マネジメントコンソールからS3へアクセスし「バケットを作成する」をクリックします。
その後に、バケット名を記載し「作成」ボタンをクリックします。
今回、「test-dynamodb-lambda」という名前で作成しています。
Lambdaで関数の作成
やっと本題です。
トリガーをS3としてPythonでコードを書いていきます。
手順
▼マネジメントコンソールからLambdaへ行き、
「Lambda関数の作成」→「Blank Function」を選択します。
トリガーの設定を「S3」に設定し、
バケットを先程作成した「test-dynamodb-Lambda」
イベントタイプを「put」、
トリガーの有効化にチェックをつけて「次へ」をクリックします。
▼関数名を記入し、
ランタイムを「Python3.6」に設定します。
今回の名前は「test-dynamodb」としています。
▼ロールは先程作成した「s3-dynamodb」を選択し、「次へ」→「関数作成」をクリックして関数を作成します。
コード
関数が作成できたらコードを記載していきます。
以下が全文です。こちらをコード入力欄にコピペして保存したら完成です。
36行目〜39行目を行いたい処理に変更すれば、
冪等性を確保した処理ができるかと思います。
今回は簡易的にアップロードされたファイルのコンテンツタイプを
ログに出力する処理を記述しています。
import boto3 import json import urllib.parse import logging logger = logging.getLogger() logger.setLevel(logging.INFO) dynamodb = boto3.client('dynamodb') s3 = boto3.client('s3') def lock(key): try: dynamodb.put_item( TableName = 'test-lambda', Item = {'filename':{'S':key},'status':{'S':'complete!'}}, Expected = {'filename':{'Exists':False}} ) return True except Exception as e: return False def unlock(key): dynamodb.delete_item( TableName = 'test-lambda', Key = { 'filename': {'S': key} } ) def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] filename = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') if(lock(filename)): try: # Processing you want to do response = s3.get_object(Bucket=bucket, Key=filename) print("CONTENT TYPE: " + response['ContentType']) print("Completed! " + filename) except Exception as e: print(e) unlock(filename) print("Unlocked!") else: print(filename + " seems to be completed already.")
それぞれの関数毎に解説していきます。
ロック処理
def lock(key): try: dynamodb.put_item( TableName = 'test-lambda', Item = {'filename':{'S':key},'status':{'S':'complete!'}}, Expected = {'filename':{'Exists':False}} ) return True except Exception as e: return False
put_itemは指定したテーブルに対して項目を追加できます。
テーブルは作成したTableName=test-lambda
を指定しています。
追加する項目はItem
の箇所で「filename:key, status:complete!」としています。
追加する条件はExpected
の箇所で「filename」に何も入っていないことを条件としています。
項目が追加されたら「True」をかえし、
すでに項目が追加されていたらとエラーとなり「False」を返すようにしています。
アンロック処理
def unlock(key): dynamodb.delete_item( TableName = 'test-lambda', Key = { 'filename': {'S': key} } )
この関数はメインの処理で
正常に処理ができなかったときに、DynamoDBのテーブルから項目を削除します。
これにより、ロックされっぱなしを防ぎ、再処理が可能になります。
メイン関数
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] filename = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') if(lock(filename)): try: # Processing you want to do response = s3.get_object(Bucket=bucket, Key=filename) print("CONTENT TYPE: " + response['ContentType']) print("Completed! " + filename) except Exception as e: print(e) unlock(filename) print("Unlocked!") else: print(filename + " seems to be completed already.")
まず、バケット名とファイル名を取得しています。
次にif文で、lock関数を呼び出し、ロック及びTrueなら
S3にアップロードされたファイルのコンテンツタイプをログに出力するようにしています。
エラーになったらunlock関数を呼び出しロックを解除しています。
if文でTrueではなかったらロック済なので、
ログにメッセージを出力するだけにしています。
動作確認
■ テキストファイルをアップロードしメイン機能を確認
▼今回「test-s3.txt」を作成したバケット「test-dynamodb-lambda」にアップロードしてみました。
アップロードが完了したら、CloudWatchのログを確認してみましょう。
▼想定どおりログにコンテンツタイプが出力さています。
▼DynamoDBのテーブルを確認すると項目が追加されていることがわかります。
■ 同じファイルをアップロードしロック機能を確認
▼同じファイルを再アップロードしログを確認してみます。
同じ処理はされず、メッセージのみが出力されていることがわかります。
■ テストイベントによるアンロック機能を確認
▼アンロックの機能を確認するためにテストイベントからテストを実施してみます。
バケット名、ファイル名は取得できるのですが、
バケットにアクセスできない、バケット内にファイルはないのでコンテンツタイプの取得はエラーとなります。
そのときに、ロックが解除されるか確認します。
テストイベントの設定は下記から行います。
▼「S3 Put」を選択し、下の「保存しテスト」をクリックします。
▼エラー内容とアンロックしたメッセージが出力されていることがわかります。
▼アンロックされたので、DynamoDBにも項目が追加されていないことがわかります。
さいごに
いかがだったでしょうか。
DynamoDBを利用して、Lambdaの冪等性を実現してみました。
S3をトリガーとしてLambdaで処理する際に冪等性は考慮しておいたほうが
後々余計な手間が発生せずに済むと感じたのでブログにしてみました。
だれかの参考になれば幸いです。