AWS IoT CoreでRePublish先トピックを動的に変更してみる

AWS IoT CoreでRePublish先トピックを動的に変更してみる

IoTルールアクションのRepublish時に動的なルーティングをやってみました
Clock Icon2025.04.23

大阪オフィスの小倉です。

AWS IoT Coreのルールアクションでは、別のトピックに再Publish(Republish)することが可能です。

今回、再Publish先を動的に変更する方法を試してみました。

Republish

Republish - AWS IoT Core

今回の内容で大事なポイントを抜粋します。

  • 構文は ${式}
  • $で始まる予約済みトピックに再Publishする場合、代わりに$$を使用する
  • 再publish先の指定topicは、置換テンプレートをサポートしている。

また、置換テンプレートに関しての注意事項も、上記置換テンプレートのドキュメントに記載があります。

  • 置換テンプレートで参照できる情報は、元のペイロード、関数、および演算子に存在する情報のみ
  • SELECT ... AS XXXで作成されたエイリアスは参照できない

これらの条件を元に進めていきます。

やってみる

再publish先のトピック名をハードコードせず、動的に組み立てられればOKです。

今回作成するトピックは、全て基本的な取り込みのトピックを使用します。
($aws/rules/<ルール名> のトピックです)

宛先固定で再publish

まずハードコードで再publish先を指定するパターンをやってみます。

  • クライアントがpublishするトピック : $aws/rules/RepublishRule
  • 再publish先トピック : $aws/rules/V1Rule
//AWS CDKでの実装を抜粋
const republishRule = new iot.CfnTopicRule(this, 'RepublishRule', {
  ruleName: 'RepublishRule',
  topicRulePayload: {
    sql: "SELECT * FROM '$aws/rules/RepublishRule'",
    actions: [
      {
        republish: {
          topic: '$$aws/rules/V1Rule',
          qos: 1,
          roleArn: iotRepublishRole.roleArn,
        },
      },
    ],
    ruleDisabled: false,
    awsIotSqlVersion: '2016-03-23',
  },
});

republishtopicに再publish先をほぼそのまま記述していますが、基本的な取り込みのトピックは$から始まる予約済みトピックなので、代わりに$$を指定しています。

式を利用する

ハードコードとやってることは大差ないのですが、式が利用できることを確認します。

  • クライアントがpublishするトピック : $aws/rules/RepublishRule
  • 再publish先トピック : $aws/rules/V1Rule
const republishByExpressionRule = new iot.CfnTopicRule(this, 'RepublishByExpressionRule', {
  ruleName: 'RepublishByExpressionRule',
  topicRulePayload: {
    sql: "SELECT * FROM '$aws/rules/RepublishByExpressionRule'",
    actions: [
      {
        republish: {
          topic: "$$aws/rules/${concat('V', '1')}Rule",
          qos: 1,
          roleArn: iotRepublishRole.roleArn,
        },
      },
    ],
    ruleDisabled: false,
    awsIotSqlVersion: '2016-03-23',
  },
});

concat()を使って文字列を連結しています。同様に他の関数(clientid()等)も利用可能です。

以下はCloudWatch LogsのAWSIotLogsV2の様子です。 RepublishTopicが期待通り生成されていることがわかります。
vscode-drop-1745384757553-pzkc7v7ss7.png

ユーザプロパティを参照する

MQTT v5で追加されたユーザプロパティの値を参照させてみます。

[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました | DevelopersIO

  • クライアントがpublishするトピック : $aws/rules/RepublishByUserPropertiesRule
  • 再publish先トピック : $aws/rules/V1Rule
const republishByUserPropertiesRule = new iot.CfnTopicRule(this, 'RepublishByUserPropertiesRule', {
  ruleName: 'RepublishByUserPropertiesRule',
  topicRulePayload: {
    sql: "SELECT * FROM '$aws/rules/RepublishByUserPropertiesRule'",
    actions: [
      {
        republish: {
          topic: "$$aws/rules/${get(get_user_properties('mykey'),0)}Rule",
          qos: 1,
          roleArn: iotRepublishRole.roleArn,
        },
      },
    ],
    ruleDisabled: false,
    awsIotSqlVersion: '2016-03-23',
  },
});

ユーザプロパティの値はget_user_properties()関数で参照可能です。
この関数は、キーに一致する値を配列で返す為、get()関数を使って要素を1つだけ取り出しています。

MQTTクライアント(MQTTX)からは、以下のイメージで実行します。

vscode-drop-1745384771047-yik6dnf2wn.png

実行ログも期待どおりです。

vscode-drop-1745384784911-hvsi11bux2d.png

DynamoDBを参照する

AWS IoTのSQLではDynamoDBやDevice Shadowの値を参照できます。
DynamoDBに再publish先のトピック名の一部を格納し、get_dynamodb()関数を使って参照します。

  • クライアントがpublishするトピック : $aws/rules/RepublishByDynamoDBRule
  • 再publish先トピック : $aws/rules/V2Rule
const republishByDynamoDBRule = new iot.CfnTopicRule(this, 'RepublishByDynamoDBRule', {
  ruleName: 'RepublishByDynamoDBRule',
  topicRulePayload: {
    sql: "SELECT * FROM '$aws/rules/RepublishByDynamoDBRule'",
    actions: [
      {
        republish: {
          topic: "$$aws/rules/${get(get_dynamodb('" + clientTable.tableName + "', 'client_id', clientid(), '" + dynamoDBAccessRole.roleArn + "'), 'topic_name')}Rule",
          qos: 1,
          roleArn: iotRepublishRole.roleArn,
        },
      },
    ],
    ruleDisabled: false,
    awsIotSqlVersion: '2016-03-23',
  },
});

CDKのコードだと分かりづらいですが、再publish先トピック名の指定は以下になります

$$aws/rules/${get(get_dynamodb('<DynamoDB Table名>', 'client_id', clientid(), '<IAM RoleのARN>'), 'topic_name')}Rule

AWS IoTのSQLからDynamoDBを参照するために必要なロールはこちらに記載があるため、別途作成します。

DynamoDB Tableは以下のようなレイアウトで、
MQTT接続のClient IDをclient_idというPartition Keyに持ち、
topic_nameという属性を参照して再publish先トピック名を組み立てます。

client_id(PK) topic_name
RoutingTestThing(MQTT接続のClient ID) V2

RoutingTestThingというモノを作成し、同名のClient IDでMQTTクライアント(MQTTX)を用いてPublishしました。
Client IDを元にDynamoDBを参照し、再publish先のトピック名が生成できています。

vscode-drop-1745384807903-z8dko9ch0wh.png

まとめ

再publishの際に、送信先トピックを動的に変更してみました。

デバイスがAWS IoTに接続している環境で、機器またはクラウド側の実装がバージョンアップ等で変更になり、
利用するトピックを変更する必要が発生する、といった状況で使えると良いなと思います。

実装に関しては、AWS IoT SQLの関数が使えるので、結構柔軟に対応可能だな、と感じました。
今回紹介しませんでしたが、get_thing_shadow()get_mqtt_property()のような関数も恐らく利用可能だと思われます。

置換テンプレートで利用可能な元のペイロード、関数、式の組み合わせでできることは色々ありそうなので、
面白い組み合わせを思いついたらぜひ試してみてください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.