AWS IoT にみる サーバーレスアーキテクチャ Messaging パターン実装例 – Lambda 同時実行数を制御する
まずはこれを見てください。
これは、書籍AWSによるサーバーレスアーキテクチャで紹介されている Messaging パターン です。今回、こちらのパターンを、AWS IoT のバックエンドに適用してみる機会がありましたので、実装例として参考にしてください。パターンの要点としては、
- 複数のデータソースがひとつのストリームに対してメッセージを送る
- ディスパッチャーとなる Lambda Function が、ストリームからデータを取り出す
- ディスパッチャーは、データに応じて実際の仕事を行う別の Lambda Function を Invoke する
という流れになります。
AWS IoT のメッセージ処理に適用する
なぜ AWS IoT のバックエンドに使ってみようと思ったかという話をします。もともと Messaging パターンは、以下のように紹介されていました。
Messaging パターンは、関数やサービスを直接的な相互依存関係から切り離し、イベント、レコード、リクエストをキューに格納するパターンです。(中略)コンシューマサービスがオフラインになっても、メッセージはキューに残り、あとで処理できる状態を保つことから、高い信頼性が得られます。
つまり 一旦データやメッセージを溜めておく場所として使う 目的だそうです。もちろん AWS IoT のバックエンドに適用することによって、このメリットも得られます。が、今回目的として一番重要視したのが Kinesis Streams を挟むことで流量の制御ができ、Lambda Function の同時実行数を調整できる という点です。AWSによるサーバーレスアプリケーションで常に考えておかなければならないのが Lambda Function の同時実行数です。負荷をみながら必要に応じて上限緩和を申請するというのももちろん手段としてアリですが、別の手として、遅延してもよいから同時実行数を制限する というのも一考する価値はあります。AWS IoT のように、
- メッセージを送りっぱなしにして、結果についてはさほど気にしない
- 多少遅延してもビジネス要件に影響がない
- 時間帯によって大量のメッセージが発生する可能性がある(=>暇な時間帯で残った処理を片付けられる)
という状況で、後者の手段も使えるかと思います。
ヒーロー活動を登録する AWS IoT
今回の例として、ヒーロー活動を登録する AWS IoT を考えます。ヒーロー、事務所、スポンサーが登場し、彼らはいろいろ活動し、それをたくさん記録します。Kinesis Streams を導入する前は、AWS IoT Rule Action により直接 Lambda Function が起動する方式です。
これはこれでシンプルですね。ところが、アプリケーションが盛り上がり、活動も活発になると、Lambda Function の同時実行制限数にさしかかる可能性が浮上しました。そこで、Messaging パターン を適用してみます。
やってみて嬉しかったのは、DynamoDBにイベントを登録する それぞれのLambda Function には手を入れなくてよい という点です。ディスパッチャーとなる Lambda Function を挟むことで、ビジネスロジックを処理する Lambda Fucntion はシステム結合のやり方を気にせずに自分の処理に集中できます。では実際につくってみましょう。
Kinesis Streams を作成する
特に難しいことはありません。検証目的なので シャード数は1 あれば十分です。プロダクションでは登録イベント数が増えてきたときに並列数を上げる要素として調整する候補になります。
AWS IoT Rule Action を設定する
これも簡単です。ディスパッチャーとなる Lambda Function では トピック名 をもとに振り分けを行うので、関数を使ってトピック名がメッセージに含まれるようにします。
topic() as topic
とすることで、 メッセージに"topic":"heroes/activity/heroes"
のように組み込まれます- 先程作成した Kinesis Streams にデータを渡すよう Actions を設定します
- 事務所活動イベントのトピック:
heroes/activity/offices
の Rule Action も同様に作成します。メッセージ送り先は同じ Kinesis Streams です - スポンサー活動イベントのトピック:
heroes/activity/sponsors
の Rule Action も同様に作成します。メッセージ送り先は同じ Kinesis Streams です
ディスパッチャーとなる Lambda Function を実装する
Lambda Function の設定
まず、先ほど作成した Kinesis Streams をイベント発生元として指定します。Kinesis Streams からデータを取り出す Batch Size は ひとまず5にしておきます。
環境変数は以下。振り分け先の関数名が入るようにしています。このようにしておくと、振り分け先の関数名が変わった場合でもコードを修正せずに済みます。
Lambda Function のコード
以下のようなことをやっています。
event
には Kinesis Streams から送られているメッセージがリストで入っているのでそれぞれについてループ処理する- AWS IoT のメッセージは Kinesis Streams によりBase64エンコードされているためデコードする
- AWS IoT のメッセージにトピック名が入ってるので取り出す
- トピック名を元に起動するべき Lambda Function の名前を判断
- 起動するべき Lambda Function を Event(非同期)モードで 起動
Python 3.6 で実装しました。
import os import boto3 import base64 import json import logging logger = logging.getLogger() logger.setLevel(logging.INFO) PUT_HEROES_FUNCTION_NAME = os.getenv('PUT_HEROES_FUNCTION_NAME') PUT_OFFICES_FUNCTION_NAME = os.getenv('PUT_OFFICES_FUNCTION_NAME') PUT_SPONSORS_FUNCTION_NAME = os.getenv('PUT_SPONSORS_FUNCTION_NAME') LAMBDA = boto3.client('lambda') def lambda_handler(event, context): for record in event['Records']: binary_message = base64.b64decode(record["kinesis"]["data"]) json_message = json.loads(binary_message.decode('utf-8')) topic_name = json_message['topic'] if 'heroes/activity/heroes' in topic_name: function_name = PUT_HEROES_FUNCTION_NAME elif 'heroes/activity/offices' in topic_name: function_name = PUT_OFFICES_FUNCTION_NAME elif 'heroes/activity/sponsors' in topic_name: function_name = PUT_SPONSORS_FUNCTION_NAME else: raise Exception(f'UnknownTopicName:{topic_name}') LAMBDA.invoke( FunctionName=function_name, InvocationType='Event', Payload=binary_message ) message = f'Finished: dispatch message to function_name:{function_name}. ' logger.info(message) return message
メッセージを送ってみる
AWS IoT のコンソールからヒーロー活動イベントのメッセージを送ってみます。
すると、AWS IoT => Kinesis Streams => ディスパッチャー Lambda => 実際に処理する Lambda と渡り、DynamoDB に無事データが保存されていることを確認できました。これで実装は終わりです。
同時実行数を調整してみる
実装できることがわかりましたが、肝心の Lambda Function 同時実行数を調整できるどうかについてはハッキリしていません。これも実際に試してみましょう。以下の前提を設定します。
設定項目 | 内容 |
---|---|
発行するイベント | 101個 |
ディスパッチャーの Batch Size | 5 |
確認するメトリクス | CloudWatch の Lambda ConcurrentExecutions |
AWS IoT から直接 Lambda Function を起動した場合、ConcurrentExcecutions が100に近くなり、Kinesis Streams を挟んだ場合は ディスパッチャー自身 + Batch Size 5 を加味すると ConcurrentExcecutions 6 くらいになると期待します。なお、ドキュメントによると、「ある期間で集計された場合、平均メトリクスで表示する必要があります」とのことだったので、Period:1second
、Statistic:Average
のデータを使うことにしました。
イベントは、手元のマシンのターミナルからループ処理で101個発行します。
for i in `seq 0 100` do curl --tlsv1.2 --cacert root-CA.crt \ --cert 4b7828d2e5-certificate.pem.crt \ --key 4b7828d2e5-private.pem.key \ -X POST -d '{ "name": "Serverless Man", "office": "AKihabara", "sponsor": "Classmethod", "activity":"同時実行数の検証活動" }' \ "https://a1pn10j0v8htvw.iot.ap-northeast-1.amazonaws.com:8443/topics/heroes/activity/heroes?qos=1" & done
※ ここで載せている証明書のファイル名や AWS IoT のエンドポイントは、AWSのドキュメントにあるサンプルから流用しています。実際に試す場合は、ご自身の環境の値に置き換えてください。
直接 Lambda Function を起動する場合
同時実行数はおおよそ50と判定されました。
Kinesis Streams を挟む場合
こちらはおおむね想定通りの値になりました。Kinesis Streams を挟むことで、流量を制御し、同時実行数を調整できることがわかりました。ちなみに、この場合、Kinesis Streams 内のメッセージは自分が処理されるのを待つ時間が発生するわけですが、どのくらい待たされているのかを示すメトリクス IteratorAge
は 約11秒 でした。ゆるやかに実行されるのと引き換えに、たまったメッセージを処理するのに時間を要することがわかります。
まとめ
- サーバーレスアーキテクチャの Messaging パターンは、AWS IoT のバックエンドに適用できる
- Kinesis Streams を挟むことで、IoT クライアントから送られてくるメッセージを一時的に格納できる
- AWS においては、サーバーレスアプリケーションで Lambda Function の同時実行数を制御するひとつの手段としても使える
- IoT Topic から直接 Lambda Function を起動する場合と比較し、ディスパッチャーとなる Lambda Function の Batch Size に依存した同時実行数へ調整可能
- ただし、当然その分だけ Kinesis Streams にたまったメッセージをすべて処理するのに時間がかかるようになる
注意点
今回は、実際に処理する Lambda Function がごく単純な例でした。他に実行中の Lambda Functionもなかったので、同時実行数の予想もしやすいです。本記事は、あくまで Messaging パターン を実装する場合、ディスパッチャーの Batch Size を変更することで、実際に処理する Lambda Function の同時実行数を増減できる というサンプルを示すものです。これをやっておけば安心!というわけではなく、プロダクションで運用するにあたってはやはり監視&調整が必須です。
- 慢性的に Kinesis Streams にデータが貯まるようだと、Lambda が捌けずに24時間経過してレコードが失われてしまう可能性がある
- 実際に処理する Lambda Function が例えば30秒かかるような重い処理のような場合、意図せず同時実行数が増えてしまう
というリスクも一例としてあります。
監視する候補としては以下のようなものがあります。
- Lambda Function の同時実行数
- Lambda Function のスロットル発生
- Kinesis Streams や DynamoDB Streams をソースに実行される Lambda Function の IteratorAge
調整する候補としては以下のようなものがあります。
- Lambda Function の同時実行数上限緩和申請
- Kinesis Streams のシャード数
- Kinesis Streams や DynamoDB Streams をソースに実行される Lambda Function の Batch Size
- Lambda Function のメモリサイズ
アプリケーションの要件も確認しておきましょう。
- Streamにデータがたまり、実行が遅延しても問題ないか?
- 同時実行数の上限に達し、スロットルが発生するリスクを考慮できているか?
- 最悪のケースに備えた、バックアップ、リストアの運用設計は必要か?
これらを継続的に確認、判断して、ビジネス状況に合った調整を行うことになります。サーバーレスアプリケーションを構築する方々の一助となれば幸いです。
このあたりの注意点に関しては、弊社の 大瀧、鈴木、齋藤から考慮するべきポイントとしてアドバイスをもらいました。
ソースコード
今回利用したコードは GitHub に上げています。