[AWS IoT Core] Fleet Provisioningの事前プロビジョニングフックでElasticsearchのインデックス存在確認をしてみた

2021.05.07

前回の記事では基本的なFleet Provisioningの仕組みを確認してみました。

今回はせっかく学んだFleet Provisioningを活用してみます。具体的には、preProvisioningHook(事前プロビジョニングフック)でAmazon Elasticsearch Service(以下、ES)にインデックスが存在するかをチェックし(なければ作成しておく)、存在する場合はFleet Provisioningを失敗させてみます。

※ DynamoDB等のデータストアにデバイス管理情報を保存しておいてFleet Provisioningを許可するかチェックする、という手段もあると思いますが、今回はIoT CoreとESとの連携を色々試してみたかったのでESのインデックス存在チェックを採用しています。

環境

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H15

$ node -v
v14.15.4

$ npm -v
6.14.10

$ yarn -v
1.22.10

本記事に掲載しているコードは以下に格納してあります。

https://github.com/urawa72/aws-iot-samples/tree/main/es-rule

リソースの準備

以下の構成図に必要なリソースをマネジメントコンソールとAWS CDK(以下、CDK)で構築します。

Amazon Elasticsearch Service

マネジメントコンソールで以下の設定で新規ドメインを作成します。ドメインアクセスポリシーは後で変更するため、一旦適当に作成しておきます。

項目 内容
デプロイタイプ 開発およびテスト
Elasticsearchのバージョン 7.10
Elasticsearchドメイン名 iot-es-rule-test
自動調整 有効化
インスタンスタイプ t2.small.elasticsearch
ネットワーク構成 パブリックアクセス
ドメインアクセスポリシー カスタムアクセスポリシーでダミーIPアドレスを許可しておく

AWS Lambda

関数はTypeScriptで実装し、webpackを使ってトランスパイルとバンドルを行います。

webpack.config.jsの内容は本記事では割愛します。詳細は以下を参照ください。

https://github.com/urawa72/aws-iot-samples/blob/main/es-rule/lambda/webpack.config.js

LambdaからESへ接続するためにAWS SDK以外のライブラリを使用しています。詳細は以下の記事を参照ください。

lambda/index.ts

import { Client } from '@elastic/elasticsearch';
import {
  createAWSConnection,
  awsGetCredentials,
} from '@acuris/aws-es-connection';

const ES_DOMAIN = process.env.ES_DOMAIN ?? '';

interface Event {
  claimCertificateId: 'string';
  certificateId: 'string';
  certificatePem: 'string';
  templateArn: 'string';
  clientId: 'string';
  parameters: {
    SerialNumber: 'string';
  };
}

interface Result {
  allowProvisioning: boolean;
}

export const handler = async (event: Event): Promise<Result> => {
  console.log(event);

  const awsCredentials = await awsGetCredentials();
  const AWSConnection = createAWSConnection(awsCredentials);
  const client = new Client({
    ...AWSConnection,
    node: `https://${ES_DOMAIN}`,
  });

  // indexの存在チェック・なければ作成
  try {
    const res = await client.indices.exists({
      index: event.parameters.SerialNumber,
    });
    console.log(res);

    if (res.body) {
      return {
        allowProvisioning: false,
      };
    } else {
      await client.indices.create({
        index: event.parameters.SerialNumber,
      });
    }
  } catch (e) {
    console.error(e);
    return {
      allowProvisioning: false,
    };
  }

  return {
    allowProvisioning: true,
  };
};

cdk.jsonに上記で作成したESドメイン名を記載しておきます。

cdk.json

{
  "app": "npx ts-node --prefer-ts-exts bin/es-rule.ts",
  "context": {
    // 省略
    "esDomain": "<es domain name>.ap-northeast-1.es.amazonaws.com"
  }
}

あとは以下のコードでデプロイするだけです。

lib/es-test-lambda-stack.ts

import * as cdk from '@aws-cdk/core';
import * as iam from '@aws-cdk/aws-iam';
import * as es from '@aws-cdk/aws-elasticsearch';
import * as lambda from '@aws-cdk/aws-lambda';

export class LambdaStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // cdk.jsonからESドメインを取得
    const esDomain = this.node.tryGetContext('esDomain');

    // 環境変数にESドメインを設定しておく
    const func = new lambda.Function(this, 'test-pre-hook-function', {
      code: lambda.Code.fromAsset('lambda/dist/preHookFunction'),
      functionName: 'test-es-index-create-pre-hook',
      handler: 'index.handler',
      runtime: lambda.Runtime.NODEJS_14_X,
      timeout: cdk.Duration.seconds(10),
      memorySize: 256,
      environment: { ES_DOMAIN: esDomain },
    });

    // IoT Coreにlambda:InvokeFunctionを許可する
    func.addPermission('test-permission', {
      principal: new iam.ServicePrincipal('iot.amazonaws.com'),
    });

    // 関数にESドメインへのアクセスを許可する
    const domain = es.Domain.fromDomainEndpoint(
      this,
      'test-es-domain',
      `https://${esDomain}`,
    );
    domain.grantReadWrite(func);
  }
}

デプロイ完了後、ESのアクセスポリシーにLambda関数の実行ロールArnのアクセス許可設定を追加します。マネジメントコンソールで対象ESドメイン > アクション > アクセスポリシーの変更 を選択します。

Provisioning Template

以下はProvisioning Templateを作成するコード部分のみを記載しています。

lib/es-test-iot-core-stack.ts

import * as cdk from '@aws-cdk/core';
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';

const timestamp = new Date().getTime();

export class IoTCoreStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const region = props?.env?.region;
    const accountId = props?.env?.account;

    // Fleet Provisioningに必要なポリシーをアタッチしたロール
    const provisioningRole = new iam.Role(this, 'test-role-for-provisioning', {
      assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
      roleName: `es-test-role-for-provisioning-${timestamp}`,
      managedPolicies: [
        iam.ManagedPolicy.fromManagedPolicyArn(
          this,
          'AWSIoTThingsRegistration',
          'arn:aws:iam::aws:policy/service-role/AWSIoTThingsRegistration',
        ),
      ],
    });

    // Fleet Provisioningで作成される証明書にアタッチされるポリシー
    const thingPolicy = new iot.CfnPolicy(this, 'test-thing-policy', {
      policyName: `es-test-thing-policy-${timestamp}`,
      policyDocument: {
        Version: '2012-10-17',
        Statement: [
          {
            Effect: 'Allow',
            Action: 'iot:*',
            Resource: '*',
          },
        ],
      },
    });

    // Template本体
    // preProvisioningHookで前述のLambda関数Arnを指定する
    new iot.CfnProvisioningTemplate(this, 'test-provisioning-template', {
      templateName: `es-test-template-${timestamp}`,
      enabled: true,
      provisioningRoleArn: provisioningRole.roleArn,
      preProvisioningHook: {
        targetArn: `arn:aws:lambda:${region}:${accountId}:function:test-es-index-create-pre-hook`,
      },
      templateBody: `
{
  "Parameters": {
    "SerialNumber": {
      "Type": "String"
    },
    "AWS::IoT::Certificate::Id": {
      "Type": "String"
    }
  },
  "Resources": {
    "certificate": {
      "Properties": {
        "CertificateId": {
          "Ref": "AWS::IoT::Certificate::Id"
        },
        "Status": "Active"
      },
      "Type": "AWS::IoT::Certificate"
    },
    "policy": {
      "Properties": {
        "PolicyName": "${thingPolicy.policyName}"
      },
      "Type": "AWS::IoT::Policy"
    },
    "thing": {
      "OverrideSettings": {
        "AttributePayload": "MERGE",
        "ThingGroups": "DO_NOTHING",
        "ThingTypeName": "REPLACE"
      },
      "Properties": {
        "AttributePayload": {},
        "ThingGroups": [],
        "ThingName": {
          "Fn::Join": [
            "",
            [
              "Temp_",
              {
                "Ref": "SerialNumber"
              }
            ]
          ]
        }
      },
      "Type": "AWS::IoT::Thing"
    }
  }
}
      `,
    });

// 省略(以下、後述するTopic Ruleを作成するコード

Topic Rule

ESへデータを送信するための構成です。

lib/es-test-iot-core-stack.ts

    // ESへデータをPutするためのIAMロール・ポリシー
    const ruleRole = new iam.Role(this, 'test-role-for-iot-rule-topic', {
      assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
    });
    ruleRole.addToPolicy(
      new iam.PolicyStatement({ resources: ['*'], actions: ['es:ESHttpPut'] }),
    );

    // cdk.jsonからドメインを取得してTopic Ruleに使用する
    const esDomain = this.node.tryGetContext('esDomain');
    new iot.CfnTopicRule(this, 'test-topic-rule', {
      topicRulePayload: {
        actions: [
          {
            elasticsearch: {
              endpoint: `https://${esDomain}`,
              id: '${newuuid()}',
              index: '${topic(3)}',
              roleArn: ruleRole.roleArn,
              type: '_doc',
            },
          },
        ],
        sql: 'SELECT * FROM "test/topic/+"',
      },
    });

試す

Fleet Provisioningで認証情報とデバイスの登録を実施し、MQTT Publishを試します。この手順は以下の記事を参照ください。今回はCreateKeysAndCertificateで認証情報とモノの新規登録を行います。

以下のコードでFleet Provisioningを実施します。--template_parametersSerialNumberはモノやESのインデックスを一意に特定する値の想定です。

npx ts-node index.ts \
--endpoint xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com \
--ca_file AmazonRootCA1.pem \
--cert 証明書ファイル \
--key 秘密鍵ファイル \
--template_name es-test-template-1620304521632 \
--template_parameters '{"SerialNumber": "12345"}'
Subscribing to CreateKeysAndCertificate Accepted and Rejected topics..
Publishing to CreateKeysAndCertificate topic..
// 省略
RegisterThingResponse for thingName=Temp_12345

証明書と秘密鍵を入手したら、MQTT Publishを実行します。--topicの最後の3つ目の値はFleet Provisioning実行時に指定した--template_parameters"SerialNumber"の値です。

$ npx ts-node index.ts \
--topic test/topic/12345 \
--endpoint xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com \
--ca_file AmazonRootCA1.pem \
--cert Fleet Provisioningで作成された証明書ファイル \
--key Fleet Provisioningで作成された秘密鍵ファイル

Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":1}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":2}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":3}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":4}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":5}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":6}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":7}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":8}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":9}
Publish received. topic:"test/topic/12345" dup:false qos:1 retain:false
{"message":"Hello world!","sequence":10}

ESの対象ドメインの「インデックス」タブでSerialNumber名でインデックスが新規作成され、対象インデックスに保存されたドキュメントの件数を確認できます。

KibanaにアクセスしてDev ToolでAPIをたたけば、どんなデータが保存されているか確認できます(Kibanaにアクセスするには自身のグローバルIPアドレスの許可設定をドメインアクセスポリシーに追加しておく必要がある)。

もう一度同じSerialNumberでFleet Provisioningを試してみると、意図通り失敗しました。

npx ts-node index.ts \
--endpoint xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com \
--ca_file AmazonRootCA1.pem \
--cert 証明書ファイル \
--key 秘密鍵ファイル \
--template_name es-test-template-1620304521632 \
--template_parameters '{"SerialNumber": "12345"}'
Subscribing to CreateKeysAndCertificate Accepted and Rejected topics..
Publishing to CreateKeysAndCertificate topic..
// 省略
RegisterThing ErrorResponse for statusCode=:403errorCode=:AccessDeniederrorMessage=:Access Denied

おわりに

実はIoT Coreへの初回MQTT Publish時にTopic RuleのActionに設定したindexの値で自動でESにインデックスを作成してくれるのですが、今回はFleet ProvisioningのpreProvisioningHookのお試しとしてESへアクセスして何かしらの前処理をする、という実装をしてみました。