この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
サーバーレス開発部@大阪の岩田です。 先日AWS IoTと連携したシステムのテストを行なっていたのですが、AWS IoTポリシーが想定していた時間内に反映されず、意図した通信が行えませんでした。 そこで、AWS IoTポリシーが反映されるまでにどの程度の時間がかかるのか、実際に計測を行いました。
計測の結果、ポリシーアタッチ前に確立済みMQTT接続を使い続けるのか、ポリシーアタッチ後にMQTT接続を再確立するかによって、ポリシー反映までの時間が変わることが分かりました。
概要
今回やろうとしたことですが、ざっくり下記のシーケンスです。
※トピックの設計等、実際よりもかなり簡略化しています。
- MQTTクライアントからAWS IoTにConnect
- MQTTクライアントがトピック
hoge/res
をSubscribe - MQTTクライアントがトピック
hoge/req
にPublish hoge/req
へのPublishをトリガーに、AWS IoTのルールエンジンでLambdaが起動- LambdaがMQTTクライアントの使用している証明書のポリシーを差し替え、ここで
hoge/res
へのReceive権限が付与される - Lambdaが
hoge/res
へPublish - AWS IoTがMQTTクライアントへPublish
- MQTTクライアントがメッセージを受信する
ポリシー差し替えによる権限の変更が即座に反映されないことを考慮して、メッセージを受信するまで3以後の処理は定期的に繰り返します。
事前準備
それでは、早速検証の準備をしていきます。
モノと証明書の作成
AWS IoTの1-Click 証明書作成を使ってテスト用のモノと証明書を作成しておきます。 詳細な手順は割愛します。
AWS IoTポリシーの作成
まず、最初に利用するポリシーを作成します。以下のポリシードキュメントを準備します。
hoge/req
へのPublishとhoge/res
へのSubscribeを許可しています。なお、検証用なのでConnect
周りの権限は緩くしています。
allow_hoge_pub_sub.json
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"iot:Connect"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:client/*"
],
"Effect": "Allow"
},
{
"Action": [
"iot:Publish"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topic/hoge/req"
],
"Effect": "Allow"
},
{
"Action": [
"iot:Subscribe"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topicfilter/hoge/res"
],
"Effect": "Allow"
}
]
}
AWS CLIからポリシーを作成します。
aws iot create-policy --policy-name allow_hoge_pub_sub --policy-document file://allow_hoge_pub_sub.json
作成したポリシーをテスト用の証明書にアタッチしておきます
aws iot attach-policy --policy-name allow_hoge_pub_sub --target <作成した証明書のARN>
次にLambdaを使って差し替えるポリシーを準備します。
先ほどのポリシーに加えてhoge/res
へのReceive権限を追加しています。
allow_hoge_pub_sub_res.json
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"iot:Connect"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:client/*"
],
"Effect": "Allow"
},
{
"Action": [
"iot:Publish"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topic/hoge/req"
],
"Effect": "Allow"
},
{
"Action": [
"iot:Receive"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topic/hoge/res"
],
"Effect": "Allow"
},
{
"Action": [
"iot:Subscribe"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topicfilter/hoge/res"
],
"Effect": "Allow"
}
]
}
AWS CLIからポリシーを作成します。
aws iot create-policy --policy-name allow_hoge_pub_sub_res --policy-document file://allow_hoge_pub_sub_res.json
ルールエンジンとLambdaの準備
Lambdaのコードです。Python3.6で記述しています。
import boto3
import json
iot = boto3.client('iot')
iot_data = boto3.client('iot-data')
def handler(event, context):
# 証明書の情報取得
cert_id = event['cert_id']
res = iot.describe_certificate(certificateId=cert_id)
cert_arn = res['certificateDescription']['certificateArn']
# 証明書に新しいポリシーをアタッチ
iot.attach_policy(policyName='allow_hoge_pub_sub_res', target=cert_arn)
# 証明書から古いポリシーをデタッチ
iot.detach_policy(policyName='allow_hoge_pub_sub', target=cert_arn)
# レスポンスをPublish
iot_data.publish(topic='hoge/res', qos=1, payload=json.dumps({'key': 'val'}) )
hoge/req
へのPublishをトリガーに上記のコードが実行されるようにルールエンジンを設定します。
ルールクエリステートメントは以下のSQLを設定します。
SELECT
principal() as cert_id,
*
FROM
'hoge/req'
検証1
準備が整ったので計測してみます。
検証用のプログラム
以下のコードで検証を行いました。こちらもPython3.6で記述しています。
やっていることは単純で、AWS IoTに接続後、hoge/req
をSubscribeしつつ、別スレッドで10秒に1回hoge/req
にPublishしています。
MQTTクライアントを準備する際のhoge.crt
はクライアント証明書、hoge.key
はクライアント証明書に紐づく秘密鍵、rotca.pem
はAWS IoTのCA証明書です。
boto3とpaho-mqttが必要になるので、事前にインストールしておいて下さい。
import boto3
from datetime import datetime
import json
import paho.mqtt.client as mqtt
import ssl
import threading
import time
iot = boto3.client('iot-data')
client = mqtt.Client(client_id='hoge-client',protocol=mqtt.MQTTv311)
def on_message(client, userdata, message):
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
print(f'----------subscribe received : {now}----------')
def main():
client.tls_set('rootca.crt',
certfile='hoge.crt',
keyfile='hoge.key',
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
client.connect('<AWS IoTのエンドポイント>', 8883, keepalive=60)
time.sleep(3)
client.subscribe(f'hoge/res')
client.on_message = on_message
pub_thread = threading.Thread(target=publish)
pub_thread.start()
client.loop_forever()
def publish():
while True:
time.sleep(10)
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
print(f'----------start publish :{now}----------')
client.publish(f'hoge/req', json.dumps({'key': 'val'}))
if __name__ == '__main__':
main()
上記のプログラムを実行した出力結果です。
----------start publish :2019/06/23 15:39:03----------
----------start publish :2019/06/23 15:39:13----------
----------start publish :2019/06/23 15:39:23----------
...略
----------start publish :2019/06/23 15:42:23----------
----------subscribe received : 2019/06/23 15:42:24----------
最初のPublishが15:39:03ですが、Subscribeしたメッセージを受信しているのが15:42:24になります。ポリシーが反映されるまでに3分程度のタイムラグが発生していることになります。 該当時間帯のCloudwatch Logsのログを確認してみましょう。 10秒に1回の頻度でPublish-OutのERRORが頻発しています。
その後15:42:23にPublish-OutがSUCCESSしています。
注目したいのが、AROAJ3ERVEWLZ7YERMSCK:UpdateIotPolicy
というprincipalIdからPublish-Inのイベントが発生していることです。これがLambdaから実行されたattach_policy
、detach_policy
のログと思われます。SUCCESSしたログの直前だけでなく、ERRORになっている各ログの直前でも実行はされているのですが、検証用プログラムの方では新しいポリシーが使えずにERRORが出力され続けています。
検証2
少しパターンを変えて計測してみましょう。 先ほどのプログラムを一部改変し、メッセージの受信を試みる都度AWS IoTへのConnectとDisconnectを行うように変更します。
検証用のプログラム
import boto3
from datetime import datetime
import json
import paho.mqtt.client as mqtt
import ssl
import threading
import time
iot = boto3.client('iot-data')
client = mqtt.Client(client_id='hoge-client',protocol=mqtt.MQTTv311)
def on_message(client, userdata, message):
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
print(f'----------subscribe received : {now}----------')
def main():
client.tls_set('rootca.crt',
certfile='hoge.crt',
keyfile='hoge.key',
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
while True:
client.connect('<AWS IoTのエンドポイント>', 8883, keepalive=60)
time.sleep(3)
client.subscribe(f'hoge/res')
client.on_message = on_message
client.loop_start()
pub_thread = threading.Thread(target=publish)
pub_thread.start()
time.sleep(7)
pub_thread._stop()
client.loop_stop()
client.disconnect()
def publish():
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
print(f'----------start publish :{now}----------')
client.publish(f'hoge/req', json.dumps({'key': 'val'}))
if __name__ == '__main__':
main()
出力結果です。
----------start publish :2019/06/23 15:55:38----------
----------start publish :2019/06/23 15:55:48----------
----------subscribe received : 2019/06/23 15:55:49----------
今度は2回目のPublish後にメッセージが受信できています。
Cloudwatch Logsのログはこんな感じでした
検証3
これまでの挙動を見る限り、証明書にアタッチするポリシーが変更された場合
- 変更対象の証明書を利用して確立済みのMQTT接続についてはポリシー変更が反映されるまでに数分のタイムラグが発生する
- 新たに確立するMQTT接続に関しては新たなポリシーが適用される
ということが言えそうです。 最後の検証として検証1で使ったプログラムをコピー、クライアントIDを変更して、検証1で使ったプログラムのPublish成功直後に実行してみます。
ターミナルの1つ目のタブで検証1のプログラムを実行します。
----------start publish :2019/06/23 16:16:01----------
----------start publish :2019/06/23 16:16:11----------
----------start publish :2019/06/23 16:16:21----------
2つ目のタブで実行した検証1のプログラム修正版の出力結果です。
----------subscribe received : 2019/06/23 16:16:11----------
確立済みのMQTT接続を使い回す1つ目のタブでは相変わらずメッセージを受信できていませんが、2つ目のタブではプログラム実行直後からメッセージの受信に成功しています。
まとめ
今回の調査結果をまとめると、AWS IoTにおいて証明書にアタッチするポリシーを変更した場合、変更対象の証明書を利用して確立済みのMQTT接続についてはポリシー変更が反映されるまでに数分のタイムラグが発生するということが言えそうです。
ルールエンジンと連動して動的に証明書のポリシーを変更するようなユースケースにおいては、MQTTクライアント側の実装として、リトライ処理の中でMQTT接続を再確立してもらうのが良さそうです。