![[AWS IoT Core] ルールのエラーアクションでKinesis Data Streamsのシャードから溢れたデータを再投入してみました](https://devio2023-media.developers.io/wp-content/uploads/2019/05/aws-iot-core.png)
[AWS IoT Core] ルールのエラーアクションでKinesis Data Streamsのシャードから溢れたデータを再投入してみました
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT のルールで、Amazon Kinesis Data Streams(以下、Kinesis Data Streams)をアクションとして指定した場合、シャードの入力制限を超えたデータは、ロストすることになります。
今回は、シャードの制限を超えて溢れてしまったデータを、ルールのエラーアクションで指定したLambdaによって、再び、IoT CoreへPublishする事で、ロストをリカバリする要領を確認してみました。

溢れたデータを再投入する仕組みを作ったとしても、常時、入力制限を超えてしまっている様な場面では、対応できません。あくまでも、想定していたデータ量が一時的に超えてしまった場合のリカバリーというイメージです。
ここで紹介する仕組みは、ルールのアクションが、常にエラーになったりすると無限ループとなってしまうため、Payloadにマークを入れるなどして、想定外のループになってしまわないようにする注意が必要です。
2 正常な状態
最初に、シャードの制限内で想定通り動作している状況を作ってみます。
処理するLambdaでは、集計のために、受信したRecord数をログ出力しています。
import time
def lambda_handler(event, context):
print({
"name": "load_test",
"records_len": len(event["Records"])
})
確認作業のための、IoT CoreへのMQTTの送信は、下記のブログで作成したものを使用しました。
上記を調整し、秒間900件のデータを10秒間送信しています。
AWS IoT Coreのログを確認すると、綺麗に毎秒900件到着していることが確認できます。

また、Lambdaの方も、概ね毎秒900件が処理されています。

3 データのロスト
続いて、データがロストする状況を作為してみます。
Kinesis Data Streamsでは、シャードを1に設定しているので、秒間1,000件を超えると、制限オーバーとなるはずです。
上記と同じ様に、毎秒900件送信し、3秒目と4秒目だけ倍の1800件としてみました。(総数、10,800件)
data_size:128 CLIENT_ID_PREFIX:AX CLIENT_MAX:100 RPS:900 PERIOD:10 [sec] 0 2021-06-13 00:32:40.400338 counter: 900 elapsed_time: 0.018278837203979492 sec 1 2021-06-13 00:32:41.401362 counter: 900 elapsed_time: 0.016971588134765625 sec 2 2021-06-13 00:32:42.402381 counter: 900 elapsed_time: 0.01862025260925293 sec 3 2021-06-13 00:32:43.403397 counter: 1800 elapsed_time: 0.03636598587036133 sec 4 2021-06-13 00:32:44.404393 counter: 1800 elapsed_time: 0.04102015495300293 sec 5 2021-06-13 00:32:45.405385 counter: 900 elapsed_time: 0.016520261764526367 sec 6 2021-06-13 00:32:46.406401 counter: 900 elapsed_time: 0.01676487922668457 sec 7 2021-06-13 00:32:47.407414 counter: 900 elapsed_time: 0.016756534576416016 sec 8 2021-06-13 00:32:48.408428 counter: 900 elapsed_time: 0.016646146774291992 sec 9 2021-06-13 00:32:49.409441 counter: 900 elapsed_time: 0.016554594039916992 sec wait... finish!
また、ルールのエラーアクションに、Lambdaを設定しました。
def lambda_handler(event, context):
print(event)

以下が、結果です。
正常に処理された場合Lambdaの処理数は、10,800件となるはずですが、結果は、全部で10,564件となっており、236件がロストしているようです。

また、設定したエラーアクションもコールされています。

エラーアクションの方を集計してみると、ロストされた236件と一致していました。

4 ロストのリカバリ
エラーアクションに指定したLambdaのeventの内容は以下の様になっています。
{
"ruleName": "load_test",
"topic": "topic/load_test",
"cloudwatchTraceId": "0d3d5279-9381-eaa0-66cf-2ec3b4cba326",
"clientId": "AX_0045",
"base64OriginalPayload": "eyJwcm9kdWNlcl90aW1lc3RhbXAiOiAiMjAyMS0wNi0xMiAyMjo1MDowNS4xNTgiLCAiY291bnRlciI6IDg0NSwgImRhdGEiOiAiWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFgifQ==",
"failures": [
{
"failedAction": "KinesisAction",
"failedResource": "load_test_stream",
"errorMessage": "Failed to publish Kinesis message. The error received was Rate exceeded for shard shardId-000000000000 in stream load_test_stream under account 439028474478. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: f861ee0e-0b85-322a-a15e-d25015cb7515; Proxy: null). Message arrived on: topic/load_test, Action: kinesis, Name: load_test_stream, PartitionKey: 84317fbf-e5bc-4448-a70c-b5480d206931"
}
]
}
エラーアクションに設定したLambdaで、base64OriginalPayloadをデコードすると、Payloadの内容がそのまま入っています。 そこで、それを再び IoT Core へPublishするように変更しました。(IoT Coreへのアクセスポリシーも追加)
import time
import base64
import json
import boto3
import ast
iot = boto3.client('iot-data', region_name='ap-northeast-1')
def lambda_handler(event, context):
print(event)
time.sleep(0.1) # 少し、ウエイトを置いてから、再度処理する
payload = base64.b64decode(event["base64OriginalPayload"]).decode("utf-8")
payload = ast.literal_eval(payload)
topic = 'load_test'
iot.publish(
topic = topic,
qos = 1,
payload = json.dumps(payload)
)
上記のLambdaをエラーアクションに設定していると、到着はやや乱れますが、総数で、10,800件処置されたことが確認できました。

この時、エラーアクションの方は、310回起動されています。

10,800件送ったPublishの到着は、11,110件となっています。

クライアントIDで区別してみると 当初のクライアントから到着しているのが、10,800件

それ以外(LambdaからのPublish)が、310件です。

5 最後に
今回は、一時的にシャードの制限を超えてしまって溢れたデータを、ルールのエラーアクションで、再び、IoT Coreへ送り直す事で、データのロストをリカバリーする要領を確認してみました。
なお、溢れたデータがリカバリーできても、結局、シャードの制限が増加する訳ではないので、エラーアクションがInvokeされた時点で、シャードの数の見直しは必要だと思います。







