GreengrassとAWS IoTルールエンジンを使って、デバイスの接続状態をDynamoDBに保存する

サーバーレス開発部@札幌の佐藤です。

最近は、AWS IoT Greengrassを使用することが多いです。その際に、Greengrass Core デバイスの接続状況をDynamoDBに保存するユースケースがあったため、紹介します。

仮想的なIoTデバイスとしてEC2を設定

今回はテスト用として、仮想的なIoTデバイスとしてEC2を立ち上げます。

下記の記事にまとまっていますので、手順通りに進めてEC2をセットアップして、Greengrassデーモンを起動します。

デバイスがなくても大丈夫!EC2でAWS IoT Greengrassを動かす #reinvent

Greengrass Coreデバイスの接続情報を検知する

AWS IoTではライフサイクルイベントでデバイスの接続/切断情報がMQTTトピックにパブリッシュされます。この機能を使って、IoTデバイスの接続/切断状態を検知します。

パブリッシュされるMQTTトピック

接続時

$aws/events/presence/connected/clientId

切断時

$aws/events/presence/disconnected/clientId

試してみる

  • AWS IoTのテストを選択し、トピックへのサブスクリプションの欄に以下を記述し、トピックへのサブスクライブ
$aws/events/presence/#

  • EC2にSSHで接続し、以下のコマンドでGreengrass Coreデーモンを開始します
cd /greengrass/ggc/core
sudo ./greengrass start
  • デーモンを開始すると、以下のように接続情報のトピックのペイロードがパブリッシュされます。
{
  "clientId": "greengrass_connected_test_Core",
  "timestamp": 1558330955711,
  "eventType": "connected",
  "sessionIdentifier": "6b77c891-576f-47dd-b44c-XXXXXXXXXXXX",
  "principalIdentifier": "89024a381377baXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
  "versionNumber": 2
}
  • 以下のコマンドでGreengrass Coreデーモンを停止します
cd /greengrass/ggc/core
sudo ./greengrass stop
  • デーモンを停止すると、以下のように切断情報のペイロードがパブリッシュされます
{
  "clientId": "greengrass_connected_test_Core",
  "timestamp": 1558330930654,
  "eventType": "disconnected",
  "clientInitiatedDisconnect": false,
  "sessionIdentifier": "d2dbdeca-3cce-4e83-XXXXXXXXXXX",
  "principalIdentifier": "89024a381377ba91136fbaXXXXXXXXXXXXXXXXXXXXXXXX",
  "versionNumber": 0
}

上記のeventTypeを使用して、接続情報をDynamoDBに保存していきます。

デバイスの接続情報をDynamoDBに保存する

上記のライフサイクルイベントで接続情報が検知できることがわかったので、AWS IoTのルールエンジンを使ってパブリッシュされたペイロードをLambdaを用いてDynamoDBに保存してみたいと思います。

接続情報保存用のDynamoDBとLambdaを作成

今回は、AWS SAMを使用してDynamoDB、AWS IoTルールエンジンをデプロイしていきます。以下のSAMテンプレートを作成します。今回はPython 3.7を使用します。

SAMテンプレート

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: 'Greengrass Connection Test'
Globals:
  Function:
    Runtime: python3.7
    Timeout: 10
    Environment:
      Variables:
        CONNECTED_TBL: !Ref ConnectedTable
Resources:
  ConnectedTable:
    Type: "AWS::DynamoDB::Table"
    Properties:
      AttributeDefinitions:
        -
          AttributeName: "id"
          AttributeType: "S"
      KeySchema:
        -
          AttributeName: "id"
          KeyType: "HASH"
      BillingMode: PAY_PER_REQUEST
  LambdaExecuteRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - 'lambda.amazonaws.com'
          Action: sts:AssumeRole
      Policies:
        -
          PolicyName: "allow_all"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              -
                Effect: "Allow"
                Action:
                  - "*"
                Resource:
                  -  "*"
    # AWS Iotルールエンジンを作成する
  UpdateConnected:
    Type: "AWS::Serverless::Function"
    Properties:
      Handler: "handler.handler"
      Role: !GetAtt LambdaExecuteRole.Arn
      Events:
        IoT:
          Type: IoTRule
          Properties:
            AwsIotSqlVersion: 2016-03-23
            Sql: >-
              SELECT
                *
              FROM
                '$aws/events/presence/connected/#'
  UpdateDisconnected:
    Type: "AWS::Serverless::Function"
    Properties:
      Handler: "handler.handler"
      Role: !GetAtt LambdaExecuteRole.Arn
      Events:
        IoT:
          Type: IoTRule
          Properties:
            AwsIotSqlVersion: 2016-03-23
            Sql: >-
              SELECT
                *
              FROM
                '$aws/events/presence/disconnected/#'

Lambdaハンドラ

import boto3
import logging
import os

region = os.getenv('AWS_DEFAULT_REGION', 'ap-northeast-1')
logger = logging.getLogger()
dynamodb = boto3.resource('dynamodb')

def handler(event: dict, context: dict):

    try:

        # Greengrass Core からパブリッシュされたIdと接続状態を取得
        client_id = event['clientId']
        event_type = event['eventType']

        # eventTypeの値から接続状態を取得
        if event_type == 'disconnected':
            is_connected = False
        elif event_type == 'connected':
            is_connected = True
        else:
            return

        table = dynamodb.Table(os.getenv('CONNECTED_TBL'))
        data = table.get_item(Key={ 'id': client_id })

        if 'Item' not in data:
            res = table.put_item(
                Item={
                'id': client_id,
                'is_connected': is_connected
            })
        else:
            res = table.update_item(
                Key={ 'id': client_id },
                UpdateExpression='set is_connected = :i',
                ExpressionAttributeValues={ ':i': is_connected },
                ReturnValues="UPDATED_NEW"
            )

        return

    except Exception as e:
        logger.error(e)
        raise e

上記のSAMテンプレートとLambdaハンドラを同じフォルダに起き、以下のCLIコマンドを実行してデプロイします。

sam package --template-file sam.yml --output-template-file packaged.yml --s3-bucket デプロイパッケージを置くS3バケット
sam deploy --template-file packaged.yml --stack-name 任意のスタック名 --capabilities CAPABILITY_IAM

動作確認

デプロイが完了したら実際に動作確認をしていきます。EC2にSSHで接続し、Greengrass Coreデーモンを起動・停止してみます。

デーモン起動

sudo /greengrass/ggc/core/greengrassd start

起動するとAWS IoTのルールエンジンを通してLambdaが起動して、 is_connectedtrueになっています。

デーモン停止

sudo /greengrass/ggc/core/greengrassd stop

停止するとAWS IoTのルールエンジンを通してLambdaが起動して、 is_connectedfalseになっています。

まとめ

AWS IoTのルールエンジンを使用して、デバイスの接続状態を保存することができました。 ルールエンジンは、Lambdaを実行する他にも、DynamoDBに直接データを保存したり、SQSキューにデータを送信したりと様々なユースケースに対応できます。