AWS IoT CoreでRePublish先トピックを動的に変更してみる
大阪オフィスの小倉です。
AWS IoT Coreのルールアクションでは、別のトピックに再Publish(Republish)することが可能です。
今回、再Publish先を動的に変更する方法を試してみました。
Republish
今回の内容で大事なポイントを抜粋します。
- 構文は
${式}
$
で始まる予約済みトピックに再Publishする場合、代わりに$$
を使用する- 再publish先の指定
topic
は、置換テンプレートをサポートしている。
また、置換テンプレートに関しての注意事項も、上記置換テンプレートのドキュメントに記載があります。
これらの条件を元に進めていきます。
やってみる
再publish先のトピック名をハードコードせず、動的に組み立てられればOKです。
今回作成するトピックは、全て基本的な取り込みのトピックを使用します。
($aws/rules/<ルール名>
のトピックです)
宛先固定で再publish
まずハードコードで再publish先を指定するパターンをやってみます。
- クライアントがpublishするトピック :
$aws/rules/RepublishRule
- 再publish先トピック :
$aws/rules/V1Rule
//AWS CDKでの実装を抜粋
const republishRule = new iot.CfnTopicRule(this, 'RepublishRule', {
ruleName: 'RepublishRule',
topicRulePayload: {
sql: "SELECT * FROM '$aws/rules/RepublishRule'",
actions: [
{
republish: {
topic: '$$aws/rules/V1Rule',
qos: 1,
roleArn: iotRepublishRole.roleArn,
},
},
],
ruleDisabled: false,
awsIotSqlVersion: '2016-03-23',
},
});
republish
のtopic
に再publish先をほぼそのまま記述していますが、基本的な取り込みのトピックは$
から始まる予約済みトピックなので、代わりに$$
を指定しています。
式を利用する
ハードコードとやってることは大差ないのですが、式が利用できることを確認します。
- クライアントがpublishするトピック :
$aws/rules/RepublishRule
- 再publish先トピック :
$aws/rules/V1Rule
const republishByExpressionRule = new iot.CfnTopicRule(this, 'RepublishByExpressionRule', {
ruleName: 'RepublishByExpressionRule',
topicRulePayload: {
sql: "SELECT * FROM '$aws/rules/RepublishByExpressionRule'",
actions: [
{
republish: {
topic: "$$aws/rules/${concat('V', '1')}Rule",
qos: 1,
roleArn: iotRepublishRole.roleArn,
},
},
],
ruleDisabled: false,
awsIotSqlVersion: '2016-03-23',
},
});
concat()
を使って文字列を連結しています。同様に他の関数(clientid()
等)も利用可能です。
以下はCloudWatch LogsのAWSIotLogsV2
の様子です。 RepublishTopicが期待通り生成されていることがわかります。
ユーザプロパティを参照する
MQTT v5で追加されたユーザプロパティの値を参照させてみます。
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました | DevelopersIO
- クライアントがpublishするトピック :
$aws/rules/RepublishByUserPropertiesRule
- 再publish先トピック :
$aws/rules/V1Rule
const republishByUserPropertiesRule = new iot.CfnTopicRule(this, 'RepublishByUserPropertiesRule', {
ruleName: 'RepublishByUserPropertiesRule',
topicRulePayload: {
sql: "SELECT * FROM '$aws/rules/RepublishByUserPropertiesRule'",
actions: [
{
republish: {
topic: "$$aws/rules/${get(get_user_properties('mykey'),0)}Rule",
qos: 1,
roleArn: iotRepublishRole.roleArn,
},
},
],
ruleDisabled: false,
awsIotSqlVersion: '2016-03-23',
},
});
ユーザプロパティの値はget_user_properties()関数で参照可能です。
この関数は、キーに一致する値を配列で返す為、get()
関数を使って要素を1つだけ取り出しています。
MQTTクライアント(MQTTX)からは、以下のイメージで実行します。
実行ログも期待どおりです。
DynamoDBを参照する
AWS IoTのSQLではDynamoDBやDevice Shadowの値を参照できます。
DynamoDBに再publish先のトピック名の一部を格納し、get_dynamodb()関数を使って参照します。
- クライアントがpublishするトピック :
$aws/rules/RepublishByDynamoDBRule
- 再publish先トピック :
$aws/rules/V2Rule
const republishByDynamoDBRule = new iot.CfnTopicRule(this, 'RepublishByDynamoDBRule', {
ruleName: 'RepublishByDynamoDBRule',
topicRulePayload: {
sql: "SELECT * FROM '$aws/rules/RepublishByDynamoDBRule'",
actions: [
{
republish: {
topic: "$$aws/rules/${get(get_dynamodb('" + clientTable.tableName + "', 'client_id', clientid(), '" + dynamoDBAccessRole.roleArn + "'), 'topic_name')}Rule",
qos: 1,
roleArn: iotRepublishRole.roleArn,
},
},
],
ruleDisabled: false,
awsIotSqlVersion: '2016-03-23',
},
});
CDKのコードだと分かりづらいですが、再publish先トピック名の指定は以下になります
$$aws/rules/${get(get_dynamodb('<DynamoDB Table名>', 'client_id', clientid(), '<IAM RoleのARN>'), 'topic_name')}Rule
AWS IoTのSQLからDynamoDBを参照するために必要なロールはこちらに記載があるため、別途作成します。
DynamoDB Tableは以下のようなレイアウトで、
MQTT接続のClient IDをclient_id
というPartition Keyに持ち、
topic_name
という属性を参照して再publish先トピック名を組み立てます。
client_id(PK) | topic_name |
---|---|
RoutingTestThing(MQTT接続のClient ID) | V2 |
RoutingTestThing
というモノを作成し、同名のClient IDでMQTTクライアント(MQTTX)を用いてPublishしました。
Client IDを元にDynamoDBを参照し、再publish先のトピック名が生成できています。
まとめ
再publishの際に、送信先トピックを動的に変更してみました。
デバイスがAWS IoTに接続している環境で、機器またはクラウド側の実装がバージョンアップ等で変更になり、
利用するトピックを変更する必要が発生する、といった状況で使えると良いなと思います。
実装に関しては、AWS IoT SQLの関数が使えるので、結構柔軟に対応可能だな、と感じました。
今回紹介しませんでしたが、get_thing_shadow()
やget_mqtt_property()
のような関数も恐らく利用可能だと思われます。
置換テンプレートで利用可能な元のペイロード、関数、式の組み合わせでできることは色々ありそうなので、
面白い組み合わせを思いついたらぜひ試してみてください。