AWS IoT Rules EngineのSQLがLambda関数呼び出しをサポートしてたので使ってみた
はじめに
こんにちは、中山です。
AWS IoTにはRules Engineという仕組みがあります。この機能を使うことによって、例えばIoT機器からMQTTで送られてきたメッセージをKinesis Data StreamsなどのAWSサービスに送信することが可能になります。送信されたメッセージはSQLベースの言語によって選択/加工/条件判定といった複雑な処理が可能となっており、IoT機器 - AWS間でデータ連携を行う上で欠かせない機能の1つです。
現在私が関わっている案件ではこのRules Engineをフル活用してアプリケーションを作っています。その際、去年発表されたSQL内でのLambda関数のサポートを使う機会がありました。かなり便利かつ弊社ブログで誰も紹介してない?ようだったので、本エントリでご紹介したいと思います。
目次
これを使うと何がうれしいの?
この機能のうれしい点を一言で言うと SQL単体では難しいより複雑な条件判定/データの加工 が可能になることです。
AWS IoT Rules EngineのSQLには多くの関数がサポートされており、非常に便利に使えます。ただし、基本的にこれらの関数は送信されたメッセージに対する操作を主目的としており、SQL内で外部サービスのデータを参照するといったことはできません( get_thing_shadow
/ machinelearning_predict
関数は例外)。
SQL内でのLambda関数の実行は SELECT
/ WHERE
句内でサポートされています。
WHERE
句での利用シーンを考えてみましょう。この句の役割はRuleを評価する条件が満たされているか判定することです。Lambda関数を組み合わせることによって、例えばIoT機器から送られてきた位置情報を元に現在の場所を位置情報サービスから取得、ある条件を満たした場合にActionを実行するといったことが可能になります。
SELECT
句で使った場合は何がうれしいのでしょうか。この句はメッセージなどから必要なデータを取得、Actionで呼び出されるAWSサービスに送信することが役割です。ここでLambda関数を実行させることによって、IoT機器からは直接送信できないようなより複雑なデータを後続サービスに連携することが可能です。
一点注意点を。ドキュメントにも記載されていますが、Lambda関数の実行時間は気をつけてください。現時点で2000msまでしかサポートされていないため、これを超過する処理になってしまうと正しく処理が行われなくなる可能性があります。
Rules Engine limits the execution duration of Lambda Functions. Lambda function calls from rules should be completed within 2000ms.
という訳で、早速使ってみましょう。
使ってみた
今回は WHERE
句内でLambda関数を呼び出し、DynamoDBテーブルにあるアイテムが存在しなかったらActionを実行させる、という仕組みを作ってみます。DynamoDBテーブルをロックのように使うイメージです。Actionとして呼ぶ出すAWSサービスは実行されたことが分かりやすいようにLambda関数にしておきます。
まずSQLです。今回は以下のようにしてみました。
- AWS IoT Rules EngineのSQL
SELECT * FROM '$aws/things/+/shadow/update' WHERE aws_lambda("arn:aws:lambda:ap-northeast-1:111111111111:function:where-func-iot:live", {"deviceId": topic(3)}).isIllegal <> True
Device Shadowが更新された場合にRules Engineを発火させます( FROM
句)。
WHERE
句の aws_lambda
関数でLambda関数を実行させることが可能です。引数に渡すのはLambda関数のARN(Qualifiedでも可)とLambda関数に渡すデータ(JSON形式)です。最後の方についている isIllegal
は実行されたLambda関数からの戻り値を意味します。
つまり、全体としてDevice Shadowが更新された場合にRules Engineにマッチし、 isIllegal
が True
ではない場合にActionを実行するという意味になります。
続いて、 aws_lambda
関数で起動されるLambda関数について。以下のようにしてみました。かなり手抜きですが、DynamoDBテーブルから該当のアイテムが存在しているかチェックしているだけですね。
- SQL内で呼び出されるLambda関数
import os import boto3 from boto3.dynamodb.conditions import Key dynamodb = boto3.resource('dynamodb') def emit(flag): dict_ = { 'isIllegal': flag } print(dict_) return dict_ def item_exists(device_id): table_name = os.environ['TABLE_NAME'] params = { 'KeyConditionExpression': Key('deviceId').eq(device_id) } resp = dynamodb.Table(table_name).query(**params)['Items'] if resp: return True else: return False def handler(event, context): flag = False try: flag = item_exists(event['deviceId']) except Exception as e: print(e) finally: return emit(flag)
最後にActionで実行されるLambda関数です。サンプルなので単に起動確認してるだけにしました。
- Actionで実行されるLambda関数
def handler(event, context): print(event)
実際に試してみると分かりますが、DynamoDBにThing名と同じアイテムをPut ItemするとActionが実行されないことが分かります。
AWS SAMを利用した構成管理
動くことは分かりました。さらに、やっぱりサーバーレスアプリケーションであれば全体を構成管理したい。
という訳で、今回はAWS SAMを使った方法をご紹介します。
... Globals: Function: Handler: index.handler Runtime: python3.6 AutoPublishAlias: live ... WhereFunc: Type: AWS::Serverless::Function Properties: FunctionName: !Sub where-func-${AWS::StackName} CodeUri: src/handlers/where_func Role: Fn::ImportValue: !Sub ${InfraStackNamePrefix}-WhereFuncRoleArn Environment: Variables: TABLE_NAME: Fn::ImportValue: !Sub ${InfraStackNamePrefix}-DynamoDBTableName ... ActionFunc: Type: AWS::Serverless::Function Properties: FunctionName: !Sub action-func-${AWS::StackName} CodeUri: src/handlers/action_func Role: Fn::ImportValue: !Sub ${InfraStackNamePrefix}-ActionFuncRoleArn Events: IoTRule: Type: IoTRule Properties: AwsIotSqlVersion: 2016-03-23 Sql: !Sub | SELECT * FROM '$aws/things/+/shadow/update' WHERE aws_lambda("${WhereFunc.Alias}", {"deviceId": topic(3)}).isIllegal <> True ...
AWS::Serverless::Function
リソースで作成しているLambda関数はそれぞれ以下の通りです。
WhereFunc
: SQLで呼び出されるLambda関数ActionFunc
: Actionで実行されるLambda関数
ActionFunc
の Events
プロパティでRules Engineを定義していることが分かります。
今回のご紹介した機能をAWS SAMで定義する際に少しハマりそうな点はAWS::Lambda::Permissionリソースです。AWS IoTがLambda関数をInvokeさせる必要があるので、それ用のリソースポリシーを付与して上げる必要があります。これをAWS SAMで定義する場合は以下のように設定できます。
... WhereFuncIoTRuleInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref WhereFunc.Alias Principal: iot.amazonaws.com SourceArn: !GetAtt ActionFuncIoTRule.Arn SourceAccount: !Ref AWS::AccountId Action: lambda:InvokeFunction ...
SourceArn
プロパティに設定している ActionFuncIoTRule
という論理リソースに注目してください。
これは自身で設定したものではなく、AWS SAMが Events
プロパティから自動で生成したRules Engineです。AWS SAMは最終的にCloudFormationに展開されるため、展開後のテンプレートで作成される論理リソースも参照できます。自動生成された物理リソース名はランダムな文字列が付与されるので、組み込み関数を使って参照するようにしましょう。
まとめ
いかがだったでしょうか。
aws_lambda
関数を利用したRules Engine内でのLambda関数呼び出しについてご紹介しました。便利な機能なのでぜひ使ってみていただければ。
本エントリがみなさんの参考になれば幸いに思います。