この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
テントの中から失礼します、CX事業本部のてんとタカハシです!
前回の記事の続編で、AWS IoT Analytics を CDK で構築してみました。作るものだったりのお話は下記の記事に書いていますので、そちらをご参照ください。
今回のソースコードは下記に置いています。
GitHub - iam326/aws-iot-analytics-sample-cdk
環境
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.15.7
BuildVersion: 19H2
$ aws --version
aws-cli/2.0.28 Python/3.7.4 Darwin/19.6.0 botocore/2.0.0dev32
$ cdk --version
1.70.0 (build c145314)
スタックの実装
今回は下記サービスごとにスタックを分けています。
- Lambda
- AWS IoT Analytics
- AWS IoT Core
Lambda
AWS IoT Analytics の Pipeline で Lambda と連携するために Function を作成します。AWS IoT Analytics から Invoke できるようにパーミッションの設定も行います。
lib/lambda-stack.ts
import * as cdk from '@aws-cdk/core';
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
interface LambdaStackProps extends cdk.StackProps {
projectName: string;
}
export class LambdaStack extends cdk.Stack {
public readonly function: lambda.Function;
constructor(scope: cdk.Construct, id: string, props: LambdaStackProps) {
super(scope, id, props);
const { projectName } = props;
const lambdaExecutionRole = new iam.Role(this, 'lambdaExecutionRole', {
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
path: '/',
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
'service-role/AWSLambdaBasicExecutionRole'
),
],
});
const lambdaFunction = new lambda.Function(this, 'lambdaFunction', {
functionName: `${projectName}_pipeline_lambda_function`,
handler: 'index.lambda_handler',
role: lambdaExecutionRole,
runtime: lambda.Runtime.PYTHON_3_7,
code: lambda.Code.fromAsset('src'),
});
lambdaFunction.addPermission('LambdaFunctionPermission', {
principal: new iam.ServicePrincipal('iotanalytics.amazonaws.com'),
action: 'lambda:InvokeFunction',
});
this.function = lambdaFunction;
}
}
Lambda の実装は別ファイルに置いています。Pub されたデータの属性を、別属性にコピーするだけの処理です。
src/index.py
import logging
import sys
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
def lambda_handler(event, context):
logger.info("event before processing: {}".format(event))
for e in event:
if 'temperature' in e:
e['temperature_copy'] = e['temperature']
logger.info("event after processing: {}".format(event))
return event
AWS IoT Analytics
前回の記事と同じで様々なアクティビティを試しています。データセットを5分ごとに更新するようにして、当日のデータのみ取得するようにしています。
lib/iot-analytics-stack.ts
import * as cdk from '@aws-cdk/core';
import * as iotAnalytics from '@aws-cdk/aws-iotanalytics';
interface IotAnalyticsStackProps extends cdk.StackProps {
projectName: string;
pipelineLambdaActivityFunctionName: string;
}
export class IotAnalyticsStack extends cdk.Stack {
public readonly channel: iotAnalytics.CfnChannel;
constructor(scope: cdk.Construct, id: string, props: IotAnalyticsStackProps) {
super(scope, id, props);
const { projectName, pipelineLambdaActivityFunctionName } = props;
const channelName = `${projectName}_iot_analytics_channel`;
const iotAnalyticsChannel = new iotAnalytics.CfnChannel(
this,
'IotAnalyticsChannel',
{
channelName,
channelStorage: {
serviceManagedS3: {},
},
}
);
this.channel = iotAnalyticsChannel;
const datastoreName = `${projectName}_iot_analytics_datastore`;
const iotAnalyticsDatastore = new iotAnalytics.CfnDatastore(
this,
'IotAnalyticsDatastore',
{
datastoreName,
datastoreStorage: {
serviceManagedS3: {},
},
}
);
const iotAnalyticsPipeline = new iotAnalytics.CfnPipeline(
this,
'IotAnalyticsPipeline',
{
pipelineName: `${projectName}_iot_analytics_pipeline`,
pipelineActivities: [
{
channel: {
name: 'pipeline_channel_activity',
channelName,
next: 'pipeline_add_attributes_activity',
},
addAttributes: {
name: 'pipeline_add_attributes_activity',
attributes: {
'device.id': 'id',
'device.name': 'name',
},
next: 'pipeline_remove_attributes_activity',
},
removeAttributes: {
name: 'pipeline_remove_attributes_activity',
attributes: ['device'],
next: 'pipeline_filter_activity',
},
filter: {
name: 'pipeline_filter_activity',
filter: 'temperature >= 10 AND temperature <= 40',
next: 'pipeline_math_activity',
},
math: {
name: 'pipeline_math_activity',
attribute: 'temperature_f',
math: 'temperature * 1.8 + 32',
next: 'pipeline_lambda_activity',
},
lambda: {
name: 'pipeline_lambda_activity',
batchSize: 1,
lambdaName: pipelineLambdaActivityFunctionName,
next: 'pipeline_datastore_activity',
},
datastore: {
name: 'pipeline_datastore_activity',
datastoreName,
},
},
],
}
);
const iotAnalyticsDataset = new iotAnalytics.CfnDataset(
this,
'IotAnalyticsDataset',
{
datasetName: `${projectName}_iot_analytics_dataset`,
actions: [
{
actionName: 'SqlAction',
queryAction: {
sqlQuery: `SELECT * FROM ${datastoreName} WHERE __dt > current_date - interval '1' day`,
},
},
],
retentionPeriod: {
numberOfDays: 1,
unlimited: false,
},
triggers: [
{
schedule: {
scheduleExpression: 'rate(5 minute)',
},
},
],
}
);
iotAnalyticsDataset.addDependsOn(iotAnalyticsDatastore);
}
}
AWS IoT Core
トピックiot/topic
に Pub されたデータを AWS IoT Analytics に受け渡すようにしています。
lib/iot-core-stack.ts
import * as cdk from '@aws-cdk/core';
import * as iot from '@aws-cdk/aws-iot';
import * as iam from '@aws-cdk/aws-iam';
interface IotCoreStackProps extends cdk.StackProps {
projectName: string;
ioTCertificateName: string;
iotAnalyticsChannelName: string;
}
export class IotCoreStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props: IotCoreStackProps) {
super(scope, id, props);
const { accountId, region } = new cdk.ScopedAws(this);
const { projectName, ioTCertificateName, iotAnalyticsChannelName } = props;
const ioTCertificateArn = `arn:aws:iot:${region}:${accountId}:cert/${ioTCertificateName}`;
const policyName = `${projectName}_iot_policy`;
const iotPolicy = new iot.CfnPolicy(this, 'IotPolicy', {
policyDocument: {
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: 'iot:*',
Resource: '*',
},
],
},
policyName,
});
const thingName = `${projectName}_iot_thing`;
const iotThing = new iot.CfnThing(this, 'IotThing', { thingName });
const iotPolicyPrincipalAttachment = new iot.CfnPolicyPrincipalAttachment(
this,
'IotPolicyPrincipalAttachment',
{
policyName,
principal: ioTCertificateArn,
}
);
iotPolicyPrincipalAttachment.addDependsOn(iotPolicy);
const iotThingPrincipalAttachment = new iot.CfnThingPrincipalAttachment(
this,
'IotThingPrincipalAttachment',
{
thingName,
principal: ioTCertificateArn,
}
);
iotThingPrincipalAttachment.addDependsOn(iotThing);
const iotBatchPutMessageRole = new iam.Role(
this,
'IotBatchPutMessageRole',
{
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
path: '/',
}
);
iotBatchPutMessageRole.addToPolicy(
new iam.PolicyStatement({
actions: ['iotanalytics:BatchPutMessage'],
resources: [
`arn:aws:iotanalytics:${region}:${accountId}:channel/${iotAnalyticsChannelName}`,
],
})
);
const IoTTopicRule = new iot.CfnTopicRule(this, 'IotTopicRule', {
ruleName: `${projectName}_iot_topic_rule`,
topicRulePayload: {
actions: [
{
iotAnalytics: {
channelName: iotAnalyticsChannelName,
roleArn: iotBatchPutMessageRole.roleArn,
},
},
],
awsIotSqlVersion: '2016-03-23',
ruleDisabled: false,
sql: "SELECT * FROM 'iot/topic'",
},
});
}
}
スタックの依存関係
Lambda の Function 名や、AWS IoT Analytics の Channel 名を依存するスタックに渡してあげます。
bin/aws-iot-analytics-sample-cdk.ts
#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from '@aws-cdk/core';
import { LambdaStack } from '../lib/lambda-stack';
import { IotAnalyticsStack } from '../lib/iot-analytics-stack';
import { IotCoreStack } from '../lib/iot-core-stack';
export type Environment = {
projectName: string;
ioTCertificateName: string;
};
const app = new cdk.App();
const projectName: string = app.node.tryGetContext('projectName');
const ioTCertificateName: string = app.node.tryGetContext('ioTCertificateName');
const env: Environment = {
projectName,
ioTCertificateName,
};
const lambda = new LambdaStack(app, 'LambdaStack', env);
const iotAnalytics = new IotAnalyticsStack(app, 'IotAnalyticsStack', {
...env,
pipelineLambdaActivityFunctionName: lambda.function.functionName,
});
const iotCoreStack = new IotCoreStack(app, 'IotCoreStack', {
...env,
iotAnalyticsChannelName: iotAnalytics.channel.channelName as string,
});
iotCoreStack.addDependency(iotAnalytics);
デプロイ
下記でデプロイします。
$ yarn install
$ yarn build
$ cdk bootstrap
$ export AWS_IOT_CERTIFICATE_NAME="<証明書の名前>"
$ cdk deploy --context ioTCertificateName=${AWS_IOT_CERTIFICATE_NAME} IotCoreStack
AWS_IOT_CERTIFICATE_NAME
は AWS IoT Core で作った証明書の名前です。
データを Pub したり 可視化 したり
前回の記事で、データを Pub して、可視化するまでの流れを記載しているので、そちらをご参照ください。
おわりに
前回、CloudFormation で頑張った分、今回はあまり時間をかけずに構築することができました。
AWS IoT Core、AWS IoT Analytics 両方とも high-level construct に対応していなかったので、CloudFormation と同じ構造で設定していく必要があり、コード量としてはあまり変わりませんでした。CDK の今後のアップデートに期待したいですね。
今回は以上になります。最後まで読んで頂きありがとうございました!