この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
前回の記事では基本的なFleet Provisioningの仕組みを確認してみました。
今回はせっかく学んだFleet Provisioningを活用してみます。具体的には、preProvisioningHook(事前プロビジョニングフック)でAmazon Elasticsearch Service(以下、ES)にインデックスが存在するかをチェックし(なければ作成しておく)、存在する場合はFleet Provisioningを失敗させてみます。
※ DynamoDB等のデータストアにデバイス管理情報を保存しておいてFleet Provisioningを許可するかチェックする、という手段もあると思いますが、今回はIoT CoreとESとの連携を色々試してみたかったのでESのインデックス存在チェックを採用しています。
環境
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.15.7
BuildVersion: 19H15
$ node -v
v14.15.4
$ npm -v
6.14.10
$ yarn -v
1.22.10
本記事に掲載しているコードは以下に格納してあります。
https://github.com/urawa72/aws-iot-samples/tree/main/es-rule
リソースの準備
以下の構成図に必要なリソースをマネジメントコンソールとAWS CDK(以下、CDK)で構築します。
Amazon Elasticsearch Service
マネジメントコンソールで以下の設定で新規ドメインを作成します。ドメインアクセスポリシーは後で変更するため、一旦適当に作成しておきます。
項目 | 内容 |
---|---|
デプロイタイプ | 開発およびテスト |
Elasticsearchのバージョン | 7.10 |
Elasticsearchドメイン名 | iot-es-rule-test |
自動調整 | 有効化 |
インスタンスタイプ | t2.small.elasticsearch |
ネットワーク構成 | パブリックアクセス |
ドメインアクセスポリシー | カスタムアクセスポリシーでダミーIPアドレスを許可しておく |
AWS Lambda
関数はTypeScriptで実装し、webpackを使ってトランスパイルとバンドルを行います。
webpack.config.js
の内容は本記事では割愛します。詳細は以下を参照ください。
https://github.com/urawa72/aws-iot-samples/blob/main/es-rule/lambda/webpack.config.js
LambdaからESへ接続するためにAWS SDK以外のライブラリを使用しています。詳細は以下の記事を参照ください。
lambda/index.ts
import { Client } from '@elastic/elasticsearch';
import {
createAWSConnection,
awsGetCredentials,
} from '@acuris/aws-es-connection';
const ES_DOMAIN = process.env.ES_DOMAIN ?? '';
interface Event {
claimCertificateId: 'string';
certificateId: 'string';
certificatePem: 'string';
templateArn: 'string';
clientId: 'string';
parameters: {
SerialNumber: 'string';
};
}
interface Result {
allowProvisioning: boolean;
}
export const handler = async (event: Event): Promise<Result> => {
console.log(event);
const awsCredentials = await awsGetCredentials();
const AWSConnection = createAWSConnection(awsCredentials);
const client = new Client({
...AWSConnection,
node: `https://${ES_DOMAIN}`,
});
// indexの存在チェック・なければ作成
try {
const res = await client.indices.exists({
index: event.parameters.SerialNumber,
});
console.log(res);
if (res.body) {
return {
allowProvisioning: false,
};
} else {
await client.indices.create({
index: event.parameters.SerialNumber,
});
}
} catch (e) {
console.error(e);
return {
allowProvisioning: false,
};
}
return {
allowProvisioning: true,
};
};
cdk.json
に上記で作成したESドメイン名を記載しておきます。
cdk.json
{
"app": "npx ts-node --prefer-ts-exts bin/es-rule.ts",
"context": {
// 省略
"esDomain": "<es domain name>.ap-northeast-1.es.amazonaws.com"
}
}
あとは以下のコードでデプロイするだけです。
lib/es-test-lambda-stack.ts
import * as cdk from '@aws-cdk/core';
import * as iam from '@aws-cdk/aws-iam';
import * as es from '@aws-cdk/aws-elasticsearch';
import * as lambda from '@aws-cdk/aws-lambda';
export class LambdaStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// cdk.jsonからESドメインを取得
const esDomain = this.node.tryGetContext('esDomain');
// 環境変数にESドメインを設定しておく
const func = new lambda.Function(this, 'test-pre-hook-function', {
code: lambda.Code.fromAsset('lambda/dist/preHookFunction'),
functionName: 'test-es-index-create-pre-hook',
handler: 'index.handler',
runtime: lambda.Runtime.NODEJS_14_X,
timeout: cdk.Duration.seconds(10),
memorySize: 256,
environment: { ES_DOMAIN: esDomain },
});
// IoT Coreにlambda:InvokeFunctionを許可する
func.addPermission('test-permission', {
principal: new iam.ServicePrincipal('iot.amazonaws.com'),
});
// 関数にESドメインへのアクセスを許可する
const domain = es.Domain.fromDomainEndpoint(
this,
'test-es-domain',
`https://${esDomain}`,
);
domain.grantReadWrite(func);
}
}
デプロイ完了後、ESのアクセスポリシーにLambda関数の実行ロールArnのアクセス許可設定を追加します。マネジメントコンソールで対象ESドメイン > アクション > アクセスポリシーの変更 を選択します。
Provisioning Template
以下はProvisioning Templateを作成するコード部分のみを記載しています。
lib/es-test-iot-core-stack.ts
import * as cdk from '@aws-cdk/core';
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
const timestamp = new Date().getTime();
export class IoTCoreStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const region = props?.env?.region;
const accountId = props?.env?.account;
// Fleet Provisioningに必要なポリシーをアタッチしたロール
const provisioningRole = new iam.Role(this, 'test-role-for-provisioning', {
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
roleName: `es-test-role-for-provisioning-${timestamp}`,
managedPolicies: [
iam.ManagedPolicy.fromManagedPolicyArn(
this,
'AWSIoTThingsRegistration',
'arn:aws:iam::aws:policy/service-role/AWSIoTThingsRegistration',
),
],
});
// Fleet Provisioningで作成される証明書にアタッチされるポリシー
const thingPolicy = new iot.CfnPolicy(this, 'test-thing-policy', {
policyName: `es-test-thing-policy-${timestamp}`,
policyDocument: {
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: 'iot:*',
Resource: '*',
},
],
},
});
// Template本体
// preProvisioningHookで前述のLambda関数Arnを指定する
new iot.CfnProvisioningTemplate(this, 'test-provisioning-template', {
templateName: `es-test-template-${timestamp}`,
enabled: true,
provisioningRoleArn: provisioningRole.roleArn,
preProvisioningHook: {
targetArn: `arn:aws:lambda:${region}:${accountId}:function:test-es-index-create-pre-hook`,
},
templateBody: `
{
"Parameters": {
"SerialNumber": {
"Type": "String"
},
"AWS::IoT::Certificate::Id": {
"Type": "String"
}
},
"Resources": {
"certificate": {
"Properties": {
"CertificateId": {
"Ref": "AWS::IoT::Certificate::Id"
},
"Status": "Active"
},
"Type": "AWS::IoT::Certificate"
},
"policy": {
"Properties": {
"PolicyName": "${thingPolicy.policyName}"
},
"Type": "AWS::IoT::Policy"
},
"thing": {
"OverrideSettings": {
"AttributePayload": "MERGE",
"ThingGroups": "DO_NOTHING",
"ThingTypeName": "REPLACE"
},
"Properties": {
"AttributePayload": {},
"ThingGroups": [],
"ThingName": {
"Fn::Join": [
"",
[
"Temp_",
{
"Ref": "SerialNumber"
}
]
]
}
},
"Type": "AWS::IoT::Thing"
}
}
}
`,
});
// 省略(以下、後述するTopic Ruleを作成するコード
Topic Rule
ESへデータを送信するための構成です。
lib/es-test-iot-core-stack.ts
// ESへデータをPutするためのIAMロール・ポリシー
const ruleRole = new iam.Role(this, 'test-role-for-iot-rule-topic', {
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
});
ruleRole.addToPolicy(
new iam.PolicyStatement({ resources: ['*'], actions: ['es:ESHttpPut'] }),
);
// cdk.jsonからドメインを取得してTopic Ruleに使用する
const esDomain = this.node.tryGetContext('esDomain');
new iot.CfnTopicRule(this, 'test-topic-rule', {
topicRulePayload: {
actions: [
{
elasticsearch: {
endpoint: `https://${esDomain}`,
id: '${newuuid()}',
index: '${topic(3)}',
roleArn: ruleRole.roleArn,
type: '_doc',
},
},
],
sql: 'SELECT * FROM "test/topic/+"',
},
});
試す
Fleet Provisioningで認証情報とデバイスの登録を実施し、MQTT Publishを試します。この手順は以下の記事を参照ください。今回はCreateKeysAndCertificate
で認証情報とモノの新規登録を行います。
以下のコードでFleet Provisioningを実施します。--template_parameters
のSerialNumber
はモノやESのインデックスを一意に特定する値の想定です。
npx ts-node index.ts \
--endpoint xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com \
--ca_file AmazonRootCA1.pem \
--cert 証明書ファイル \
--key 秘密鍵ファイル \
--template_name es-test-template-1620304521632 \
--template_parameters '{"SerialNumber": "12345"}'
Subscribing to CreateKeysAndCertificate Accepted and Rejected topics..
Publishing to CreateKeysAndCertificate topic..
// 省略
RegisterThingResponse for thingName=Temp_12345
証明書と秘密鍵を入手したら、MQTT Publishを実行します。--topic
の最後の3つ目の値はFleet Provisioning実行時に指定した--template_parameters
の "SerialNumber"
の値です。
$ npx ts-node index.ts \
--topic test/topic/12345 \
--endpoint xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com \
--ca_file AmazonRootCA1.pem \
--cert Fleet Provisioningで作成された証明書ファイル \
--key Fleet Provisioningで作成された秘密鍵ファイル
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":1}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":2}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":3}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":4}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":5}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":6}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":7}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":8}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":9}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":10}
ESの対象ドメインの「インデックス」タブでSerialNumber
名でインデックスが新規作成され、対象インデックスに保存されたドキュメントの件数を確認できます。
KibanaにアクセスしてDev ToolでAPIをたたけば、どんなデータが保存されているか確認できます(Kibanaにアクセスするには自身のグローバルIPアドレスの許可設定をドメインアクセスポリシーに追加しておく必要がある)。
もう一度同じSerialNumber
でFleet Provisioningを試してみると、意図通り失敗しました。
npx ts-node index.ts \
--endpoint xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com \
--ca_file AmazonRootCA1.pem \
--cert 証明書ファイル \
--key 秘密鍵ファイル \
--template_name es-test-template-1620304521632 \
--template_parameters '{"SerialNumber": "12345"}'
Subscribing to CreateKeysAndCertificate Accepted and Rejected topics..
Publishing to CreateKeysAndCertificate topic..
// 省略
RegisterThing ErrorResponse for statusCode=:403errorCode=:AccessDeniederrorMessage=:Access Denied
おわりに
実はIoT Coreへの初回MQTT Publish時にTopic RuleのActionに設定したindex
の値で自動でESにインデックスを作成してくれるのですが、今回はFleet ProvisioningのpreProvisioningHookのお試しとしてESへアクセスして何かしらの前処理をする、という実装をしてみました。