この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT のルールで、Amazon Kinesis Data Streams(以下、Kinesis Data Streams)をアクションとして指定した場合、シャードの入力制限を超えたデータは、ロストすることになります。
今回は、シャードの制限を超えて溢れてしまったデータを、ルールのエラーアクションで指定したLambdaによって、再び、IoT CoreへPublishする事で、ロストをリカバリする要領を確認してみました。
溢れたデータを再投入する仕組みを作ったとしても、常時、入力制限を超えてしまっている様な場面では、対応できません。あくまでも、想定していたデータ量が一時的に超えてしまった場合のリカバリーというイメージです。
ここで紹介する仕組みは、ルールのアクションが、常にエラーになったりすると無限ループとなってしまうため、Payloadにマークを入れるなどして、想定外のループになってしまわないようにする注意が必要です。
2 正常な状態
最初に、シャードの制限内で想定通り動作している状況を作ってみます。
処理するLambdaでは、集計のために、受信したRecord数をログ出力しています。
import time
def lambda_handler(event, context):
print({
"name": "load_test",
"records_len": len(event["Records"])
})
確認作業のための、IoT CoreへのMQTTの送信は、下記のブログで作成したものを使用しました。
上記を調整し、秒間900件のデータを10秒間送信しています。
AWS IoT Coreのログを確認すると、綺麗に毎秒900件到着していることが確認できます。
また、Lambdaの方も、概ね毎秒900件が処理されています。
3 データのロスト
続いて、データがロストする状況を作為してみます。
Kinesis Data Streamsでは、シャードを1に設定しているので、秒間1,000件を超えると、制限オーバーとなるはずです。
上記と同じ様に、毎秒900件送信し、3秒目と4秒目だけ倍の1800件としてみました。(総数、10,800件)
data_size:128
CLIENT_ID_PREFIX:AX CLIENT_MAX:100
RPS:900 PERIOD:10 [sec]
0 2021-06-13 00:32:40.400338
counter: 900 elapsed_time: 0.018278837203979492 sec
1 2021-06-13 00:32:41.401362
counter: 900 elapsed_time: 0.016971588134765625 sec
2 2021-06-13 00:32:42.402381
counter: 900 elapsed_time: 0.01862025260925293 sec
3 2021-06-13 00:32:43.403397
counter: 1800 elapsed_time: 0.03636598587036133 sec
4 2021-06-13 00:32:44.404393
counter: 1800 elapsed_time: 0.04102015495300293 sec
5 2021-06-13 00:32:45.405385
counter: 900 elapsed_time: 0.016520261764526367 sec
6 2021-06-13 00:32:46.406401
counter: 900 elapsed_time: 0.01676487922668457 sec
7 2021-06-13 00:32:47.407414
counter: 900 elapsed_time: 0.016756534576416016 sec
8 2021-06-13 00:32:48.408428
counter: 900 elapsed_time: 0.016646146774291992 sec
9 2021-06-13 00:32:49.409441
counter: 900 elapsed_time: 0.016554594039916992 sec
wait...
finish!
また、ルールのエラーアクションに、Lambdaを設定しました。
def lambda_handler(event, context):
print(event)
以下が、結果です。
正常に処理された場合Lambdaの処理数は、10,800件となるはずですが、結果は、全部で10,564件となっており、236件がロストしているようです。
また、設定したエラーアクションもコールされています。
エラーアクションの方を集計してみると、ロストされた236件と一致していました。
4 ロストのリカバリ
エラーアクションに指定したLambdaのeventの内容は以下の様になっています。
{
"ruleName": "load_test",
"topic": "topic/load_test",
"cloudwatchTraceId": "0d3d5279-9381-eaa0-66cf-2ec3b4cba326",
"clientId": "AX_0045",
"base64OriginalPayload": "eyJwcm9kdWNlcl90aW1lc3RhbXAiOiAiMjAyMS0wNi0xMiAyMjo1MDowNS4xNTgiLCAiY291bnRlciI6IDg0NSwgImRhdGEiOiAiWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFgifQ==",
"failures": [
{
"failedAction": "KinesisAction",
"failedResource": "load_test_stream",
"errorMessage": "Failed to publish Kinesis message. The error received was Rate exceeded for shard shardId-000000000000 in stream load_test_stream under account 439028474478. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: f861ee0e-0b85-322a-a15e-d25015cb7515; Proxy: null). Message arrived on: topic/load_test, Action: kinesis, Name: load_test_stream, PartitionKey: 84317fbf-e5bc-4448-a70c-b5480d206931"
}
]
}
エラーアクションに設定したLambdaで、base64OriginalPayloadをデコードすると、Payloadの内容がそのまま入っています。 そこで、それを再び IoT Core へPublishするように変更しました。(IoT Coreへのアクセスポリシーも追加)
import time
import base64
import json
import boto3
import ast
iot = boto3.client('iot-data', region_name='ap-northeast-1')
def lambda_handler(event, context):
print(event)
time.sleep(0.1) # 少し、ウエイトを置いてから、再度処理する
payload = base64.b64decode(event["base64OriginalPayload"]).decode("utf-8")
payload = ast.literal_eval(payload)
topic = 'load_test'
iot.publish(
topic = topic,
qos = 1,
payload = json.dumps(payload)
)
上記のLambdaをエラーアクションに設定していると、到着はやや乱れますが、総数で、10,800件処置されたことが確認できました。
この時、エラーアクションの方は、310回起動されています。
10,800件送ったPublishの到着は、11,110件となっています。
クライアントIDで区別してみると 当初のクライアントから到着しているのが、10,800件
それ以外(LambdaからのPublish)が、310件です。
5 最後に
今回は、一時的にシャードの制限を超えてしまって溢れたデータを、ルールのエラーアクションで、再び、IoT Coreへ送り直す事で、データのロストをリカバリーする要領を確認してみました。
なお、溢れたデータがリカバリーできても、結局、シャードの制限が増加する訳ではないので、エラーアクションがInvokeされた時点で、シャードの数の見直しは必要だと思います。