[AWS IoT Core] ルールのエラーアクションでKinesis Data Streamsのシャードから溢れたデータを再投入してみました

[AWS IoT Core] ルールのエラーアクションでKinesis Data Streamsのシャードから溢れたデータを再投入してみました

Clock Icon2021.06.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

AWS IoT のルールで、Amazon Kinesis Data Streams(以下、Kinesis Data Streams)をアクションとして指定した場合、シャードの入力制限を超えたデータは、ロストすることになります。

今回は、シャードの制限を超えて溢れてしまったデータを、ルールのエラーアクションで指定したLambdaによって、再び、IoT CoreへPublishする事で、ロストをリカバリする要領を確認してみました。

溢れたデータを再投入する仕組みを作ったとしても、常時、入力制限を超えてしまっている様な場面では、対応できません。あくまでも、想定していたデータ量が一時的に超えてしまった場合のリカバリーというイメージです。

ここで紹介する仕組みは、ルールのアクションが、常にエラーになったりすると無限ループとなってしまうため、Payloadにマークを入れるなどして、想定外のループになってしまわないようにする注意が必要です。

2 正常な状態

最初に、シャードの制限内で想定通り動作している状況を作ってみます。

処理するLambdaでは、集計のために、受信したRecord数をログ出力しています。

import time

def lambda_handler(event, context):
    print({
        "name": "load_test",
        "records_len": len(event["Records"])
    })

確認作業のための、IoT CoreへのMQTTの送信は、下記のブログで作成したものを使用しました。

上記を調整し、秒間900件のデータを10秒間送信しています。

AWS IoT Coreのログを確認すると、綺麗に毎秒900件到着していることが確認できます。

また、Lambdaの方も、概ね毎秒900件が処理されています。

3 データのロスト

続いて、データがロストする状況を作為してみます。

Kinesis Data Streamsでは、シャードを1に設定しているので、秒間1,000件を超えると、制限オーバーとなるはずです。

上記と同じ様に、毎秒900件送信し、3秒目と4秒目だけ倍の1800件としてみました。(総数、10,800件)

data_size:128
CLIENT_ID_PREFIX:AX CLIENT_MAX:100
RPS:900 PERIOD:10 [sec]
0 2021-06-13 00:32:40.400338
counter: 900 elapsed_time: 0.018278837203979492 sec
1 2021-06-13 00:32:41.401362
counter: 900 elapsed_time: 0.016971588134765625 sec
2 2021-06-13 00:32:42.402381
counter: 900 elapsed_time: 0.01862025260925293 sec
3 2021-06-13 00:32:43.403397
counter: 1800 elapsed_time: 0.03636598587036133 sec
4 2021-06-13 00:32:44.404393
counter: 1800 elapsed_time: 0.04102015495300293 sec
5 2021-06-13 00:32:45.405385
counter: 900 elapsed_time: 0.016520261764526367 sec
6 2021-06-13 00:32:46.406401
counter: 900 elapsed_time: 0.01676487922668457 sec
7 2021-06-13 00:32:47.407414
counter: 900 elapsed_time: 0.016756534576416016 sec
8 2021-06-13 00:32:48.408428
counter: 900 elapsed_time: 0.016646146774291992 sec
9 2021-06-13 00:32:49.409441
counter: 900 elapsed_time: 0.016554594039916992 sec
wait...
finish!

また、ルールのエラーアクションに、Lambdaを設定しました。

def lambda_handler(event, context):

    print(event)

以下が、結果です。

正常に処理された場合Lambdaの処理数は、10,800件となるはずですが、結果は、全部で10,564件となっており、236件がロストしているようです。

また、設定したエラーアクションもコールされています。

エラーアクションの方を集計してみると、ロストされた236件と一致していました。

4 ロストのリカバリ

エラーアクションに指定したLambdaのeventの内容は以下の様になっています。

{
    "ruleName": "load_test",
    "topic": "topic/load_test",
    "cloudwatchTraceId": "0d3d5279-9381-eaa0-66cf-2ec3b4cba326",
    "clientId": "AX_0045",
    "base64OriginalPayload": "eyJwcm9kdWNlcl90aW1lc3RhbXAiOiAiMjAyMS0wNi0xMiAyMjo1MDowNS4xNTgiLCAiY291bnRlciI6IDg0NSwgImRhdGEiOiAiWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFgifQ==",
    "failures": [
        {
            "failedAction": "KinesisAction",
            "failedResource": "load_test_stream",
            "errorMessage": "Failed to publish Kinesis message. The error received was Rate exceeded for shard shardId-000000000000 in stream load_test_stream under account 439028474478. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: f861ee0e-0b85-322a-a15e-d25015cb7515; Proxy: null). Message arrived on: topic/load_test, Action: kinesis, Name: load_test_stream, PartitionKey: 84317fbf-e5bc-4448-a70c-b5480d206931"
        }
    ]
}

エラーアクションに設定したLambdaで、base64OriginalPayloadをデコードすると、Payloadの内容がそのまま入っています。 そこで、それを再び IoT Core へPublishするように変更しました。(IoT Coreへのアクセスポリシーも追加)

import time
import base64
import json
import boto3
import ast

iot = boto3.client('iot-data', region_name='ap-northeast-1')

def lambda_handler(event, context):

    print(event)
    
    time.sleep(0.1) # 少し、ウエイトを置いてから、再度処理する
    
    payload = base64.b64decode(event["base64OriginalPayload"]).decode("utf-8")
    payload = ast.literal_eval(payload)

    topic = 'load_test'
    
    iot.publish(
        topic = topic,
        qos = 1,
        payload = json.dumps(payload)
    )

上記のLambdaをエラーアクションに設定していると、到着はやや乱れますが、総数で、10,800件処置されたことが確認できました。

この時、エラーアクションの方は、310回起動されています。

10,800件送ったPublishの到着は、11,110件となっています。

クライアントIDで区別してみると 当初のクライアントから到着しているのが、10,800件

それ以外(LambdaからのPublish)が、310件です。

5 最後に

今回は、一時的にシャードの制限を超えてしまって溢れたデータを、ルールのエラーアクションで、再び、IoT Coreへ送り直す事で、データのロストをリカバリーする要領を確認してみました。

なお、溢れたデータがリカバリーできても、結局、シャードの制限が増加する訳ではないので、エラーアクションがInvokeされた時点で、シャードの数の見直しは必要だと思います。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.