ちょっと話題の記事

AWS IoT にみる サーバーレスアーキテクチャ Messaging パターン実装例 – Lambda 同時実行数を制御する

2018.06.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

まずはこれを見てください。

messaging_pattern.png

これは、書籍AWSによるサーバーレスアーキテクチャで紹介されている Messaging パターン です。今回、こちらのパターンを、AWS IoT のバックエンドに適用してみる機会がありましたので、実装例として参考にしてください。パターンの要点としては、

  • 複数のデータソースがひとつのストリームに対してメッセージを送る
  • ディスパッチャーとなる Lambda Function が、ストリームからデータを取り出す
  • ディスパッチャーは、データに応じて実際の仕事を行う別の Lambda Function を Invoke する

という流れになります。

AWS IoT のメッセージ処理に適用する

なぜ AWS IoT のバックエンドに使ってみようと思ったかという話をします。もともと Messaging パターンは、以下のように紹介されていました。

Messaging パターンは、関数やサービスを直接的な相互依存関係から切り離し、イベント、レコード、リクエストをキューに格納するパターンです。(中略)コンシューマサービスがオフラインになっても、メッセージはキューに残り、あとで処理できる状態を保つことから、高い信頼性が得られます。

つまり 一旦データやメッセージを溜めておく場所として使う 目的だそうです。もちろん AWS IoT のバックエンドに適用することによって、このメリットも得られます。が、今回目的として一番重要視したのが Kinesis Streams を挟むことで流量の制御ができ、Lambda Function の同時実行数を調整できる という点です。AWSによるサーバーレスアプリケーションで常に考えておかなければならないのが Lambda Function の同時実行数です。負荷をみながら必要に応じて上限緩和を申請するというのももちろん手段としてアリですが、別の手として、遅延してもよいから同時実行数を制限する というのも一考する価値はあります。AWS IoT のように、

  • メッセージを送りっぱなしにして、結果についてはさほど気にしない
  • 多少遅延してもビジネス要件に影響がない
  • 時間帯によって大量のメッセージが発生する可能性がある(=>暇な時間帯で残った処理を片付けられる)

という状況で、後者の手段も使えるかと思います。

ヒーロー活動を登録する AWS IoT

今回の例として、ヒーロー活動を登録する AWS IoT を考えます。ヒーロー、事務所、スポンサーが登場し、彼らはいろいろ活動し、それをたくさん記録します。Kinesis Streams を導入する前は、AWS IoT Rule Action により直接 Lambda Function が起動する方式です。

before.png

これはこれでシンプルですね。ところが、アプリケーションが盛り上がり、活動も活発になると、Lambda Function の同時実行制限数にさしかかる可能性が浮上しました。そこで、Messaging パターン を適用してみます。

after.png

やってみて嬉しかったのは、DynamoDBにイベントを登録する それぞれのLambda Function には手を入れなくてよい という点です。ディスパッチャーとなる Lambda Function を挟むことで、ビジネスロジックを処理する Lambda Fucntion はシステム結合のやり方を気にせずに自分の処理に集中できます。では実際につくってみましょう。

Kinesis Streams を作成する

特に難しいことはありません。検証目的なので シャード数は1 あれば十分です。プロダクションでは登録イベント数が増えてきたときに並列数を上げる要素として調整する候補になります。

kinesis.png

AWS IoT Rule Action を設定する

これも簡単です。ディスパッチャーとなる Lambda Function では トピック名 をもとに振り分けを行うので、関数を使ってトピック名がメッセージに含まれるようにします。

iot-rule-action.png

  • topic() as topic とすることで、 メッセージに "topic":"heroes/activity/heroes" のように組み込まれます
  • 先程作成した Kinesis Streams にデータを渡すよう Actions を設定します
  • 事務所活動イベントのトピック: heroes/activity/offices の Rule Action も同様に作成します。メッセージ送り先は同じ Kinesis Streams です
  • スポンサー活動イベントのトピック: heroes/activity/sponsors の Rule Action も同様に作成します。メッセージ送り先は同じ Kinesis Streams です

ディスパッチャーとなる Lambda Function を実装する

Lambda Function の設定

まず、先ほど作成した Kinesis Streams をイベント発生元として指定します。Kinesis Streams からデータを取り出す Batch Size は ひとまず5にしておきます。

lambda-settings-kinesis.png

環境変数は以下。振り分け先の関数名が入るようにしています。このようにしておくと、振り分け先の関数名が変わった場合でもコードを修正せずに済みます。

lambda-settings-env.png

Lambda Function のコード

以下のようなことをやっています。

  • event には Kinesis Streams から送られているメッセージがリストで入っているのでそれぞれについてループ処理する
    • AWS IoT のメッセージは Kinesis Streams によりBase64エンコードされているためデコードする
    • AWS IoT のメッセージにトピック名が入ってるので取り出す
    • トピック名を元に起動するべき Lambda Function の名前を判断
    • 起動するべき Lambda Function を Event(非同期)モードで 起動

Python 3.6 で実装しました。

import os
import boto3
import base64
import json

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)


PUT_HEROES_FUNCTION_NAME = os.getenv('PUT_HEROES_FUNCTION_NAME')
PUT_OFFICES_FUNCTION_NAME = os.getenv('PUT_OFFICES_FUNCTION_NAME')
PUT_SPONSORS_FUNCTION_NAME = os.getenv('PUT_SPONSORS_FUNCTION_NAME')

LAMBDA = boto3.client('lambda')


def lambda_handler(event, context):
    for record in event['Records']:
        binary_message = base64.b64decode(record["kinesis"]["data"])
        json_message = json.loads(binary_message.decode('utf-8'))
        topic_name = json_message['topic']

        if 'heroes/activity/heroes' in topic_name:
            function_name = PUT_HEROES_FUNCTION_NAME
        elif 'heroes/activity/offices' in topic_name:
            function_name = PUT_OFFICES_FUNCTION_NAME
        elif 'heroes/activity/sponsors' in topic_name:
            function_name = PUT_SPONSORS_FUNCTION_NAME
        else:
            raise Exception(f'UnknownTopicName:{topic_name}')

        LAMBDA.invoke(
            FunctionName=function_name,
            InvocationType='Event',
            Payload=binary_message
        )
    message = f'Finished: dispatch message to function_name:{function_name}. '
    logger.info(message)
    return message

メッセージを送ってみる

AWS IoT のコンソールからヒーロー活動イベントのメッセージを送ってみます。

publish-activity.png

すると、AWS IoT => Kinesis Streams => ディスパッチャー Lambda => 実際に処理する Lambda と渡り、DynamoDB に無事データが保存されていることを確認できました。これで実装は終わりです。

dyanamo.png

同時実行数を調整してみる

実装できることがわかりましたが、肝心の Lambda Function 同時実行数を調整できるどうかについてはハッキリしていません。これも実際に試してみましょう。以下の前提を設定します。

設定項目 内容
発行するイベント 101個
ディスパッチャーの Batch Size 5
確認するメトリクス CloudWatch の Lambda ConcurrentExecutions

AWS IoT から直接 Lambda Function を起動した場合、ConcurrentExcecutions が100に近くなり、Kinesis Streams を挟んだ場合は ディスパッチャー自身 + Batch Size 5 を加味すると ConcurrentExcecutions 6 くらいになると期待します。なお、ドキュメントによると、「ある期間で集計された場合、平均メトリクスで表示する必要があります」とのことだったので、Period:1secondStatistic:Average のデータを使うことにしました。

イベントは、手元のマシンのターミナルからループ処理で101個発行します。

for i in `seq 0 100`
do
  curl --tlsv1.2 --cacert root-CA.crt \
  --cert 4b7828d2e5-certificate.pem.crt \
  --key 4b7828d2e5-private.pem.key \
  -X POST -d '{ "name": "Serverless Man", "office": "AKihabara", "sponsor": "Classmethod", "activity":"同時実行数の検証活動" }' \
   "https://a1pn10j0v8htvw.iot.ap-northeast-1.amazonaws.com:8443/topics/heroes/activity/heroes?qos=1" &
done

※ ここで載せている証明書のファイル名や AWS IoT のエンドポイントは、AWSのドキュメントにあるサンプルから流用しています。実際に試す場合は、ご自身の環境の値に置き換えてください。

直接 Lambda Function を起動する場合

cuncurrent_direct.png

同時実行数はおおよそ50と判定されました。 

Kinesis Streams を挟む場合

cuncurrent_stream.png

こちらはおおむね想定通りの値になりました。Kinesis Streams を挟むことで、流量を制御し、同時実行数を調整できることがわかりました。ちなみに、この場合、Kinesis Streams 内のメッセージは自分が処理されるのを待つ時間が発生するわけですが、どのくらい待たされているのかを示すメトリクス IteratorAge約11秒 でした。ゆるやかに実行されるのと引き換えに、たまったメッセージを処理するのに時間を要することがわかります。

まとめ

  • サーバーレスアーキテクチャの Messaging パターンは、AWS IoT のバックエンドに適用できる
  • Kinesis Streams を挟むことで、IoT クライアントから送られてくるメッセージを一時的に格納できる
  • AWS においては、サーバーレスアプリケーションで Lambda Function の同時実行数を制御するひとつの手段としても使える
  • IoT Topic から直接 Lambda Function を起動する場合と比較し、ディスパッチャーとなる Lambda Function の Batch Size に依存した同時実行数へ調整可能
  • ただし、当然その分だけ Kinesis Streams にたまったメッセージをすべて処理するのに時間がかかるようになる

注意点

今回は、実際に処理する Lambda Function がごく単純な例でした。他に実行中の Lambda Functionもなかったので、同時実行数の予想もしやすいです。本記事は、あくまで Messaging パターン を実装する場合、ディスパッチャーの Batch Size を変更することで、実際に処理する Lambda Function の同時実行数を増減できる というサンプルを示すものです。これをやっておけば安心!というわけではなく、プロダクションで運用するにあたってはやはり監視&調整が必須です。

  • 慢性的に Kinesis Streams にデータが貯まるようだと、Lambda が捌けずに24時間経過してレコードが失われてしまう可能性がある
  • 実際に処理する Lambda Function が例えば30秒かかるような重い処理のような場合、意図せず同時実行数が増えてしまう

というリスクも一例としてあります。

監視する候補としては以下のようなものがあります。

  • Lambda Function の同時実行数
  • Lambda Function のスロットル発生
  • Kinesis Streams や DynamoDB Streams をソースに実行される Lambda Function の IteratorAge

調整する候補としては以下のようなものがあります。

  • Lambda Function の同時実行数上限緩和申請
  • Kinesis Streams のシャード数
  • Kinesis Streams や DynamoDB Streams をソースに実行される Lambda Function の Batch Size
  • Lambda Function のメモリサイズ

アプリケーションの要件も確認しておきましょう。

  • Streamにデータがたまり、実行が遅延しても問題ないか?
  • 同時実行数の上限に達し、スロットルが発生するリスクを考慮できているか?
  • 最悪のケースに備えた、バックアップ、リストアの運用設計は必要か?

これらを継続的に確認、判断して、ビジネス状況に合った調整を行うことになります。サーバーレスアプリケーションを構築する方々の一助となれば幸いです。

このあたりの注意点に関しては、弊社の 大瀧鈴木齋藤から考慮するべきポイントとしてアドバイスをもらいました。

ソースコード

今回利用したコードは GitHub に上げています。

参考