AWS IoT Analytics を CDK で構築してみた

はじめに

テントの中から失礼します、CX事業本部のてんとタカハシです!

前回の記事の続編で、AWS IoT Analytics を CDK で構築してみました。作るものだったりのお話は下記の記事に書いていますので、そちらをご参照ください。

今回のソースコードは下記に置いています。

GitHub - iam326/aws-iot-analytics-sample-cdk

環境

$ sw_vers
ProductName:	Mac OS X
ProductVersion:	10.15.7
BuildVersion:	19H2

$ aws --version
aws-cli/2.0.28 Python/3.7.4 Darwin/19.6.0 botocore/2.0.0dev32

$ cdk --version
1.70.0 (build c145314)

スタックの実装

今回は下記サービスごとにスタックを分けています。

  • Lambda
  • AWS IoT Analytics
  • AWS IoT Core

Lambda

AWS IoT Analytics の Pipeline で Lambda と連携するために Function を作成します。AWS IoT Analytics から Invoke できるようにパーミッションの設定も行います。

lib/lambda-stack.ts

import * as cdk from '@aws-cdk/core';
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';

interface LambdaStackProps extends cdk.StackProps {
  projectName: string;
}

export class LambdaStack extends cdk.Stack {
  public readonly function: lambda.Function;

  constructor(scope: cdk.Construct, id: string, props: LambdaStackProps) {
    super(scope, id, props);

    const { projectName } = props;

    const lambdaExecutionRole = new iam.Role(this, 'lambdaExecutionRole', {
      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
      path: '/',
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName(
          'service-role/AWSLambdaBasicExecutionRole'
        ),
      ],
    });

    const lambdaFunction = new lambda.Function(this, 'lambdaFunction', {
      functionName: `${projectName}_pipeline_lambda_function`,
      handler: 'index.lambda_handler',
      role: lambdaExecutionRole,
      runtime: lambda.Runtime.PYTHON_3_7,
      code: lambda.Code.fromAsset('src'),
    });
    lambdaFunction.addPermission('LambdaFunctionPermission', {
      principal: new iam.ServicePrincipal('iotanalytics.amazonaws.com'),
      action: 'lambda:InvokeFunction',
    });

    this.function = lambdaFunction;
  }
}

Lambda の実装は別ファイルに置いています。Pub されたデータの属性を、別属性にコピーするだけの処理です。

src/index.py

import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)


def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))
    for e in event:
        if 'temperature' in e:
            e['temperature_copy'] = e['temperature']
    logger.info("event after processing: {}".format(event))
    return event

AWS IoT Analytics

前回の記事と同じで様々なアクティビティを試しています。データセットを5分ごとに更新するようにして、当日のデータのみ取得するようにしています。

lib/iot-analytics-stack.ts

import * as cdk from '@aws-cdk/core';
import * as iotAnalytics from '@aws-cdk/aws-iotanalytics';

interface IotAnalyticsStackProps extends cdk.StackProps {
  projectName: string;
  pipelineLambdaActivityFunctionName: string;
}

export class IotAnalyticsStack extends cdk.Stack {
  public readonly channel: iotAnalytics.CfnChannel;

  constructor(scope: cdk.Construct, id: string, props: IotAnalyticsStackProps) {
    super(scope, id, props);

    const { projectName, pipelineLambdaActivityFunctionName } = props;

    const channelName = `${projectName}_iot_analytics_channel`;
    const iotAnalyticsChannel = new iotAnalytics.CfnChannel(
      this,
      'IotAnalyticsChannel',
      {
        channelName,
        channelStorage: {
          serviceManagedS3: {},
        },
      }
    );
    this.channel = iotAnalyticsChannel;

    const datastoreName = `${projectName}_iot_analytics_datastore`;
    const iotAnalyticsDatastore = new iotAnalytics.CfnDatastore(
      this,
      'IotAnalyticsDatastore',
      {
        datastoreName,
        datastoreStorage: {
          serviceManagedS3: {},
        },
      }
    );

    const iotAnalyticsPipeline = new iotAnalytics.CfnPipeline(
      this,
      'IotAnalyticsPipeline',
      {
        pipelineName: `${projectName}_iot_analytics_pipeline`,
        pipelineActivities: [
          {
            channel: {
              name: 'pipeline_channel_activity',
              channelName,
              next: 'pipeline_add_attributes_activity',
            },
            addAttributes: {
              name: 'pipeline_add_attributes_activity',
              attributes: {
                'device.id': 'id',
                'device.name': 'name',
              },
              next: 'pipeline_remove_attributes_activity',
            },
            removeAttributes: {
              name: 'pipeline_remove_attributes_activity',
              attributes: ['device'],
              next: 'pipeline_filter_activity',
            },
            filter: {
              name: 'pipeline_filter_activity',
              filter: 'temperature >= 10 AND temperature <= 40',
              next: 'pipeline_math_activity',
            },
            math: {
              name: 'pipeline_math_activity',
              attribute: 'temperature_f',
              math: 'temperature * 1.8 + 32',
              next: 'pipeline_lambda_activity',
            },
            lambda: {
              name: 'pipeline_lambda_activity',
              batchSize: 1,
              lambdaName: pipelineLambdaActivityFunctionName,
              next: 'pipeline_datastore_activity',
            },
            datastore: {
              name: 'pipeline_datastore_activity',
              datastoreName,
            },
          },
        ],
      }
    );

    const iotAnalyticsDataset = new iotAnalytics.CfnDataset(
      this,
      'IotAnalyticsDataset',
      {
        datasetName: `${projectName}_iot_analytics_dataset`,
        actions: [
          {
            actionName: 'SqlAction',
            queryAction: {
              sqlQuery: `SELECT * FROM ${datastoreName} WHERE __dt > current_date - interval '1' day`,
            },
          },
        ],
        retentionPeriod: {
          numberOfDays: 1,
          unlimited: false,
        },
        triggers: [
          {
            schedule: {
              scheduleExpression: 'rate(5 minute)',
            },
          },
        ],
      }
    );
    iotAnalyticsDataset.addDependsOn(iotAnalyticsDatastore);
  }
}

AWS IoT Core

トピックiot/topicに Pub されたデータを AWS IoT Analytics に受け渡すようにしています。

lib/iot-core-stack.ts

import * as cdk from '@aws-cdk/core';
import * as iot from '@aws-cdk/aws-iot';
import * as iam from '@aws-cdk/aws-iam';

interface IotCoreStackProps extends cdk.StackProps {
  projectName: string;
  ioTCertificateName: string;
  iotAnalyticsChannelName: string;
}

export class IotCoreStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props: IotCoreStackProps) {
    super(scope, id, props);

    const { accountId, region } = new cdk.ScopedAws(this);
    const { projectName, ioTCertificateName, iotAnalyticsChannelName } = props;
    const ioTCertificateArn = `arn:aws:iot:${region}:${accountId}:cert/${ioTCertificateName}`;

    const policyName = `${projectName}_iot_policy`;
    const iotPolicy = new iot.CfnPolicy(this, 'IotPolicy', {
      policyDocument: {
        Version: '2012-10-17',
        Statement: [
          {
            Effect: 'Allow',
            Action: 'iot:*',
            Resource: '*',
          },
        ],
      },
      policyName,
    });

    const thingName = `${projectName}_iot_thing`;
    const iotThing = new iot.CfnThing(this, 'IotThing', { thingName });

    const iotPolicyPrincipalAttachment = new iot.CfnPolicyPrincipalAttachment(
      this,
      'IotPolicyPrincipalAttachment',
      {
        policyName,
        principal: ioTCertificateArn,
      }
    );
    iotPolicyPrincipalAttachment.addDependsOn(iotPolicy);

    const iotThingPrincipalAttachment = new iot.CfnThingPrincipalAttachment(
      this,
      'IotThingPrincipalAttachment',
      {
        thingName,
        principal: ioTCertificateArn,
      }
    );
    iotThingPrincipalAttachment.addDependsOn(iotThing);

    const iotBatchPutMessageRole = new iam.Role(
      this,
      'IotBatchPutMessageRole',
      {
        assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
        path: '/',
      }
    );
    iotBatchPutMessageRole.addToPolicy(
      new iam.PolicyStatement({
        actions: ['iotanalytics:BatchPutMessage'],
        resources: [
          `arn:aws:iotanalytics:${region}:${accountId}:channel/${iotAnalyticsChannelName}`,
        ],
      })
    );

    const IoTTopicRule = new iot.CfnTopicRule(this, 'IotTopicRule', {
      ruleName: `${projectName}_iot_topic_rule`,
      topicRulePayload: {
        actions: [
          {
            iotAnalytics: {
              channelName: iotAnalyticsChannelName,
              roleArn: iotBatchPutMessageRole.roleArn,
            },
          },
        ],
        awsIotSqlVersion: '2016-03-23',
        ruleDisabled: false,
        sql: "SELECT * FROM 'iot/topic'",
      },
    });
  }
}

スタックの依存関係

Lambda の Function 名や、AWS IoT Analytics の Channel 名を依存するスタックに渡してあげます。

bin/aws-iot-analytics-sample-cdk.ts

#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from '@aws-cdk/core';

import { LambdaStack } from '../lib/lambda-stack';
import { IotAnalyticsStack } from '../lib/iot-analytics-stack';
import { IotCoreStack } from '../lib/iot-core-stack';

export type Environment = {
  projectName: string;
  ioTCertificateName: string;
};

const app = new cdk.App();
const projectName: string = app.node.tryGetContext('projectName');
const ioTCertificateName: string = app.node.tryGetContext('ioTCertificateName');
const env: Environment = {
  projectName,
  ioTCertificateName,
};

const lambda = new LambdaStack(app, 'LambdaStack', env);
const iotAnalytics = new IotAnalyticsStack(app, 'IotAnalyticsStack', {
  ...env,
  pipelineLambdaActivityFunctionName: lambda.function.functionName,
});
const iotCoreStack = new IotCoreStack(app, 'IotCoreStack', {
  ...env,
  iotAnalyticsChannelName: iotAnalytics.channel.channelName as string,
});
iotCoreStack.addDependency(iotAnalytics);

デプロイ

下記でデプロイします。

$ yarn install
$ yarn build

$ cdk bootstrap

$ export AWS_IOT_CERTIFICATE_NAME="<証明書の名前>"
$ cdk deploy --context ioTCertificateName=${AWS_IOT_CERTIFICATE_NAME} IotCoreStack

AWS_IOT_CERTIFICATE_NAME は AWS IoT Core で作った証明書の名前です。

データを Pub したり 可視化 したり

前回の記事で、データを Pub して、可視化するまでの流れを記載しているので、そちらをご参照ください。

おわりに

前回、CloudFormation で頑張った分、今回はあまり時間をかけずに構築することができました。

AWS IoT Core、AWS IoT Analytics 両方とも high-level construct に対応していなかったので、CloudFormation と同じ構造で設定していく必要があり、コード量としてはあまり変わりませんでした。CDK の今後のアップデートに期待したいですね。

今回は以上になります。最後まで読んで頂きありがとうございました!