AWS IoT Rules EngineのSQLがLambda関数呼び出しをサポートしてたので使ってみた

2018.05.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

こんにちは、中山です。

AWS IoTにはRules Engineという仕組みがあります。この機能を使うことによって、例えばIoT機器からMQTTで送られてきたメッセージをKinesis Data StreamsなどのAWSサービスに送信することが可能になります。送信されたメッセージはSQLベースの言語によって選択/加工/条件判定といった複雑な処理が可能となっており、IoT機器 - AWS間でデータ連携を行う上で欠かせない機能の1つです。

現在私が関わっている案件ではこのRules Engineをフル活用してアプリケーションを作っています。その際、去年発表されたSQL内でのLambda関数のサポートを使う機会がありました。かなり便利かつ弊社ブログで誰も紹介してない?ようだったので、本エントリでご紹介したいと思います。

目次

これを使うと何がうれしいの?

この機能のうれしい点を一言で言うと SQL単体では難しいより複雑な条件判定/データの加工 が可能になることです。

AWS IoT Rules EngineのSQLには多くの関数がサポートされており、非常に便利に使えます。ただし、基本的にこれらの関数は送信されたメッセージに対する操作を主目的としており、SQL内で外部サービスのデータを参照するといったことはできません( get_thing_shadow / machinelearning_predict 関数は例外)。

SQL内でのLambda関数の実行は SELECT / WHERE 句内でサポートされています。

WHERE 句での利用シーンを考えてみましょう。この句の役割はRuleを評価する条件が満たされているか判定することです。Lambda関数を組み合わせることによって、例えばIoT機器から送られてきた位置情報を元に現在の場所を位置情報サービスから取得、ある条件を満たした場合にActionを実行するといったことが可能になります。

SELECT 句で使った場合は何がうれしいのでしょうか。この句はメッセージなどから必要なデータを取得、Actionで呼び出されるAWSサービスに送信することが役割です。ここでLambda関数を実行させることによって、IoT機器からは直接送信できないようなより複雑なデータを後続サービスに連携することが可能です。

一点注意点を。ドキュメントにも記載されていますが、Lambda関数の実行時間は気をつけてください。現時点で2000msまでしかサポートされていないため、これを超過する処理になってしまうと正しく処理が行われなくなる可能性があります。

Rules Engine limits the execution duration of Lambda Functions. Lambda function calls from rules should be completed within 2000ms.

という訳で、早速使ってみましょう。

使ってみた

今回は WHERE 句内でLambda関数を呼び出し、DynamoDBテーブルにあるアイテムが存在しなかったらActionを実行させる、という仕組みを作ってみます。DynamoDBテーブルをロックのように使うイメージです。Actionとして呼ぶ出すAWSサービスは実行されたことが分かりやすいようにLambda関数にしておきます。

まずSQLです。今回は以下のようにしてみました。

  • AWS IoT Rules EngineのSQL
SELECT *
FROM '$aws/things/+/shadow/update'
WHERE aws_lambda("arn:aws:lambda:ap-northeast-1:111111111111:function:where-func-iot:live", {"deviceId": topic(3)}).isIllegal <> True

Device Shadowが更新された場合にRules Engineを発火させます( FROM 句)。

WHERE 句の aws_lambda 関数でLambda関数を実行させることが可能です。引数に渡すのはLambda関数のARN(Qualifiedでも可)とLambda関数に渡すデータ(JSON形式)です。最後の方についている isIllegal は実行されたLambda関数からの戻り値を意味します。

つまり、全体としてDevice Shadowが更新された場合にRules Engineにマッチし、 isIllegalTrue ではない場合にActionを実行するという意味になります。

続いて、 aws_lambda 関数で起動されるLambda関数について。以下のようにしてみました。かなり手抜きですが、DynamoDBテーブルから該当のアイテムが存在しているかチェックしているだけですね。

  • SQL内で呼び出されるLambda関数
import os

import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')


def emit(flag):
    dict_ = {
        'isIllegal': flag
    }
    print(dict_)

    return dict_


def item_exists(device_id):
    table_name = os.environ['TABLE_NAME']
    params = {
        'KeyConditionExpression': Key('deviceId').eq(device_id)
    }

    resp = dynamodb.Table(table_name).query(**params)['Items']

    if resp:
        return True
    else:
        return False


def handler(event, context):
    flag = False

    try:
        flag = item_exists(event['deviceId'])
    except Exception as e:
        print(e)
    finally:
        return emit(flag)

最後にActionで実行されるLambda関数です。サンプルなので単に起動確認してるだけにしました。

  • Actionで実行されるLambda関数
def handler(event, context):
    print(event)

実際に試してみると分かりますが、DynamoDBにThing名と同じアイテムをPut ItemするとActionが実行されないことが分かります。

AWS SAMを利用した構成管理

動くことは分かりました。さらに、やっぱりサーバーレスアプリケーションであれば全体を構成管理したい。

という訳で、今回はAWS SAMを使った方法をご紹介します。

...
Globals:
  Function:
    Handler: index.handler
    Runtime: python3.6
    AutoPublishAlias: live
...
  WhereFunc:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub where-func-${AWS::StackName}
      CodeUri: src/handlers/where_func
      Role:
        Fn::ImportValue: !Sub ${InfraStackNamePrefix}-WhereFuncRoleArn
      Environment:
        Variables:
          TABLE_NAME:
            Fn::ImportValue: !Sub ${InfraStackNamePrefix}-DynamoDBTableName
...
  ActionFunc:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub action-func-${AWS::StackName}
      CodeUri: src/handlers/action_func
      Role:
        Fn::ImportValue: !Sub ${InfraStackNamePrefix}-ActionFuncRoleArn
      Events:
        IoTRule:
          Type: IoTRule
          Properties:
            AwsIotSqlVersion: 2016-03-23
            Sql: !Sub |
              SELECT *
              FROM '$aws/things/+/shadow/update'
              WHERE aws_lambda("${WhereFunc.Alias}", {"deviceId": topic(3)}).isIllegal <> True
...

AWS::Serverless::Function リソースで作成しているLambda関数はそれぞれ以下の通りです。

  • WhereFunc : SQLで呼び出されるLambda関数
  • ActionFunc : Actionで実行されるLambda関数

ActionFuncEvents プロパティでRules Engineを定義していることが分かります。

今回のご紹介した機能をAWS SAMで定義する際に少しハマりそうな点はAWS::Lambda::Permissionリソースです。AWS IoTがLambda関数をInvokeさせる必要があるので、それ用のリソースポリシーを付与して上げる必要があります。これをAWS SAMで定義する場合は以下のように設定できます。

...
  WhereFuncIoTRuleInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref WhereFunc.Alias
      Principal: iot.amazonaws.com
      SourceArn: !GetAtt ActionFuncIoTRule.Arn
      SourceAccount: !Ref AWS::AccountId
      Action: lambda:InvokeFunction
...

SourceArn プロパティに設定している ActionFuncIoTRule という論理リソースに注目してください。

これは自身で設定したものではなく、AWS SAMが Events プロパティから自動で生成したRules Engineです。AWS SAMは最終的にCloudFormationに展開されるため、展開後のテンプレートで作成される論理リソースも参照できます。自動生成された物理リソース名はランダムな文字列が付与されるので、組み込み関数を使って参照するようにしましょう。

まとめ

いかがだったでしょうか。

aws_lambda 関数を利用したRules Engine内でのLambda関数呼び出しについてご紹介しました。便利な機能なのでぜひ使ってみていただければ。

本エントリがみなさんの参考になれば幸いに思います。