平均3分?AWS IoTポリシーを証明書にアタッチしてから反映されるまでのタイムラグについて調べてみた

AWS IoTポリシーを証明書にアタッチした際、確立済みのMQTT接続に設定が反映されるまでには数分のタイムラグが発生するという話です
2019.06.23

サーバーレス開発部@大阪の岩田です。 先日AWS IoTと連携したシステムのテストを行なっていたのですが、AWS IoTポリシーが想定していた時間内に反映されず、意図した通信が行えませんでした。 そこで、AWS IoTポリシーが反映されるまでにどの程度の時間がかかるのか、実際に計測を行いました。

計測の結果、ポリシーアタッチ前に確立済みMQTT接続を使い続けるのか、ポリシーアタッチ後にMQTT接続を再確立するかによって、ポリシー反映までの時間が変わることが分かりました。

概要

今回やろうとしたことですが、ざっくり下記のシーケンスです。

※トピックの設計等、実際よりもかなり簡略化しています。

シーケンス
  1. MQTTクライアントからAWS IoTにConnect
  2. MQTTクライアントがトピックhoge/resをSubscribe
  3. MQTTクライアントがトピックhoge/reqにPublish
  4. hoge/reqへのPublishをトリガーに、AWS IoTのルールエンジンでLambdaが起動
  5. LambdaがMQTTクライアントの使用している証明書のポリシーを差し替え、ここでhoge/resへのReceive権限が付与される
  6. Lambdaがhoge/resへPublish
  7. AWS IoTがMQTTクライアントへPublish
  8. MQTTクライアントがメッセージを受信する

ポリシー差し替えによる権限の変更が即座に反映されないことを考慮して、メッセージを受信するまで3以後の処理は定期的に繰り返します。

事前準備

それでは、早速検証の準備をしていきます。

モノと証明書の作成

AWS IoTの1-Click 証明書作成を使ってテスト用のモノと証明書を作成しておきます。 詳細な手順は割愛します。

AWS IoTポリシーの作成

まず、最初に利用するポリシーを作成します。以下のポリシードキュメントを準備します。 hoge/reqへのPublishとhoge/resへのSubscribeを許可しています。なお、検証用なのでConnect周りの権限は緩くしています。

allow_hoge_pub_sub.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "iot:Connect"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:client/*"
      ],
      "Effect": "Allow"
    },
    {
      "Action": [
        "iot:Publish"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topic/hoge/req"
      ],
      "Effect": "Allow"
    },
    {
      "Action": [
        "iot:Subscribe"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topicfilter/hoge/res"
      ],
      "Effect": "Allow"
    }
  ]
}

AWS CLIからポリシーを作成します。

aws iot create-policy --policy-name allow_hoge_pub_sub --policy-document file://allow_hoge_pub_sub.json

作成したポリシーをテスト用の証明書にアタッチしておきます

 aws iot attach-policy --policy-name allow_hoge_pub_sub --target <作成した証明書のARN>

次にLambdaを使って差し替えるポリシーを準備します。 先ほどのポリシーに加えてhoge/resへのReceive権限を追加しています。

allow_hoge_pub_sub_res.json

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "iot:Connect"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:client/*"
      ],
      "Effect": "Allow"
    },
    {
      "Action": [
        "iot:Publish"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topic/hoge/req"
      ],
      "Effect": "Allow"
    },
    {
      "Action": [
        "iot:Receive"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topic/hoge/res"
      ],
      "Effect": "Allow"
    },
    {
      "Action": [
        "iot:Subscribe"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:<AWSアカウントID>:topicfilter/hoge/res"
      ],
      "Effect": "Allow"
    }
  ]
}

AWS CLIからポリシーを作成します。

aws iot create-policy --policy-name allow_hoge_pub_sub_res --policy-document file://allow_hoge_pub_sub_res.json

ルールエンジンとLambdaの準備

Lambdaのコードです。Python3.6で記述しています。

import boto3
import json

iot = boto3.client('iot')
iot_data = boto3.client('iot-data')

def handler(event, context):

    # 証明書の情報取得
    cert_id = event['cert_id']
    res = iot.describe_certificate(certificateId=cert_id)
    cert_arn = res['certificateDescription']['certificateArn']

    # 証明書に新しいポリシーをアタッチ
    iot.attach_policy(policyName='allow_hoge_pub_sub_res', target=cert_arn)    

    # 証明書から古いポリシーをデタッチ
    iot.detach_policy(policyName='allow_hoge_pub_sub', target=cert_arn)
    
    # レスポンスをPublish
    iot_data.publish(topic='hoge/res', qos=1, payload=json.dumps({'key': 'val'}) )

hoge/reqへのPublishをトリガーに上記のコードが実行されるようにルールエンジンを設定します。 ルールクエリステートメントは以下のSQLを設定します。

SELECT
    principal() as cert_id,
    *
FROM
    'hoge/req'
ルールクエリステートメント

検証1

準備が整ったので計測してみます。

検証用のプログラム

以下のコードで検証を行いました。こちらもPython3.6で記述しています。

やっていることは単純で、AWS IoTに接続後、hoge/reqをSubscribeしつつ、別スレッドで10秒に1回hoge/reqにPublishしています。 MQTTクライアントを準備する際のhoge.crtはクライアント証明書、hoge.keyはクライアント証明書に紐づく秘密鍵、rotca.pemはAWS IoTのCA証明書です。 boto3とpaho-mqttが必要になるので、事前にインストールしておいて下さい。

import boto3
from datetime import datetime
import json
import paho.mqtt.client as mqtt
import ssl
import threading
import time

iot = boto3.client('iot-data')
client = mqtt.Client(client_id='hoge-client',protocol=mqtt.MQTTv311)


def on_message(client, userdata, message):
        
    now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    print(f'----------subscribe received : {now}----------')

def main():
  client.tls_set('rootca.crt',
                certfile='hoge.crt',
                keyfile='hoge.key',
                tls_version=ssl.PROTOCOL_TLSv1_2)
  client.tls_insecure_set(True)
  client.connect('<AWS IoTのエンドポイント>', 8883, keepalive=60)
  time.sleep(3)
  client.subscribe(f'hoge/res')
  client.on_message = on_message
  pub_thread = threading.Thread(target=publish)
  pub_thread.start()
  client.loop_forever()

def publish():
    while True:
      time.sleep(10)
      now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
      print(f'----------start publish :{now}----------')
      client.publish(f'hoge/req', json.dumps({'key': 'val'}))

if __name__ == '__main__':
  main()

上記のプログラムを実行した出力結果です。

----------start publish :2019/06/23 15:39:03----------
----------start publish :2019/06/23 15:39:13----------
----------start publish :2019/06/23 15:39:23----------
...略
----------start publish :2019/06/23 15:42:23----------
----------subscribe received : 2019/06/23 15:42:24----------

最初のPublishが15:39:03ですが、Subscribeしたメッセージを受信しているのが15:42:24になります。ポリシーが反映されるまでに3分程度のタイムラグが発生していることになります。 該当時間帯のCloudwatch Logsのログを確認してみましょう。 10秒に1回の頻度でPublish-OutのERRORが頻発しています。

Cloudwatch Logsのエラーログ達

その後15:42:23にPublish-OutがSUCCESSしています。

Publish-Out成功時のログ1

注目したいのが、AROAJ3ERVEWLZ7YERMSCK:UpdateIotPolicyというprincipalIdからPublish-Inのイベントが発生していることです。これがLambdaから実行されたattach_policydetach_policyのログと思われます。SUCCESSしたログの直前だけでなく、ERRORになっている各ログの直前でも実行はされているのですが、検証用プログラムの方では新しいポリシーが使えずにERRORが出力され続けています。

検証2

少しパターンを変えて計測してみましょう。 先ほどのプログラムを一部改変し、メッセージの受信を試みる都度AWS IoTへのConnectとDisconnectを行うように変更します。

検証用のプログラム

import boto3
from datetime import datetime
import json
import paho.mqtt.client as mqtt
import ssl
import threading
import time

iot = boto3.client('iot-data')
client = mqtt.Client(client_id='hoge-client',protocol=mqtt.MQTTv311)


def on_message(client, userdata, message):
    now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    print(f'----------subscribe received : {now}----------')

def main():
  client.tls_set('rootca.crt',
                certfile='hoge.crt',
                keyfile='hoge.key',
                tls_version=ssl.PROTOCOL_TLSv1_2)
  client.tls_insecure_set(True)

  while True:
    client.connect('<AWS IoTのエンドポイント>', 8883, keepalive=60)
    time.sleep(3)
    client.subscribe(f'hoge/res')
    client.on_message = on_message
    client.loop_start()
    pub_thread = threading.Thread(target=publish)
    pub_thread.start()
    time.sleep(7)
    pub_thread._stop()
    client.loop_stop()
    client.disconnect()

def publish():
    now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    print(f'----------start publish :{now}----------')
    client.publish(f'hoge/req', json.dumps({'key': 'val'}))

if __name__ == '__main__':
  main()

出力結果です。

----------start publish :2019/06/23 15:55:38----------
----------start publish :2019/06/23 15:55:48----------
----------subscribe received : 2019/06/23 15:55:49----------

今度は2回目のPublish後にメッセージが受信できています。

Cloudwatch Logsのログはこんな感じでした

Publish-Out成功時のログ2

検証3

これまでの挙動を見る限り、証明書にアタッチするポリシーが変更された場合

  • 変更対象の証明書を利用して確立済みのMQTT接続についてはポリシー変更が反映されるまでに数分のタイムラグが発生する
  • 新たに確立するMQTT接続に関しては新たなポリシーが適用される

ということが言えそうです。 最後の検証として検証1で使ったプログラムをコピー、クライアントIDを変更して、検証1で使ったプログラムのPublish成功直後に実行してみます。

ターミナルの1つ目のタブで検証1のプログラムを実行します。

----------start publish :2019/06/23 16:16:01----------
----------start publish :2019/06/23 16:16:11----------
----------start publish :2019/06/23 16:16:21----------

2つ目のタブで実行した検証1のプログラム修正版の出力結果です。

----------subscribe received : 2019/06/23 16:16:11----------

確立済みのMQTT接続を使い回す1つ目のタブでは相変わらずメッセージを受信できていませんが、2つ目のタブではプログラム実行直後からメッセージの受信に成功しています。

まとめ

今回の調査結果をまとめると、AWS IoTにおいて証明書にアタッチするポリシーを変更した場合、変更対象の証明書を利用して確立済みのMQTT接続についてはポリシー変更が反映されるまでに数分のタイムラグが発生するということが言えそうです。

ルールエンジンと連動して動的に証明書のポリシーを変更するようなユースケースにおいては、MQTTクライアント側の実装として、リトライ処理の中でMQTT接続を再確立してもらうのが良さそうです。