【アップデート】Lambdaのイベントソースマッピングでイベントのフィルタが可能になりました

非同期Lambdaのコスト削減とロジック簡素化が期待できるアップデート
2021.11.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

MAD事業部@大阪の岩田です。先日のアップデートによってLambdaのイベントソースマッピングでイベントのフィルタが可能になりました。

現在フィルタがサポートされるイベントソースは

  • Kinesis Data Streams
  • DynamoDB(DynamoDB Streams)
  • SQS

の3種です。MQやMSKはサポートされません。

何がうれしいのか

これまではイベントソースに流れてきた全てのレコードがLambdaのイベントデータに引き渡されていましたが、ユースケースによっては必ずしも全てのレコードに対してロジックを実行する必要がありません。私は以前IoTシステムのバックエンドを構築した際に「インテリジェントシャドウパターン」のような設計を採用したことがあるのですが、DynamoDB StreamsからLambdaを起動して後続処理を流したいのはDynamoDBにINSERTされた場合のみでした。しかしDynamoDB StreamsにはMODIFYやREMOVEのイベントも流れてくるのでLambda側のロジックでイベント種別を判定してMODIFYやREMOVEのイベントは処理をスキップする処理を実装していました。

図は最新 IoT デザインパターン 〜AWS IoT と AWS Greengrass を用いた構築パターン〜より引用

今回のアップデートを利用することでLambda側での判定処理が不要になるので

  • Lambdaのコードの簡素化
  • 不要なLambda実行に伴うコストの削減

などが期待できます。

フィルタの概要

イベントソースマッピングに対してフィルタリング条件を指定するとLambdaに引き渡すイベントデータをフィルタできます。フィルタリングは条件は以下のような構造で定義されます。

{
   "Filters": [
        {
            "Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}"
        },
        {
            "Pattern": ...略
        }
    ]
}

Patternの部分で定義されるフィルタは最大5つまで指定でき、上限緩和申請を行うことで最大10個まで指定できるようになります。フィルタを複数指定した場合、イベントソースに流れてきたレコードがいずれかのフィルタにマッチするとLambdaのイベントデータに引き渡されます。

フィルタはJSON形式の文字列で、JSON形式に展開すると以下のような構造です

{
    "Metadata1": [ pattern1 ],
    "data": {
        "Data1": [ pattern2 ]
    }
}

フィルタは以下3つの要素で構成されます。

  • メタデータプロパティ
    • イベントデータのメタデータ
    • 上記サンプルのMetadata1部分
    • 例えばKinesis Data Streamsがイベントソースの場合はkinesisSchemaVersionpartitionKeyが、DynamoDB Streamsがイベントソースの場合はeventNameがメタデータプロパティにあたります
  • データプロパティ
    • 上記サンプルのData1部分
    • イベントデータのボディの各プロパティに相当します
    • データプロパティの親にあたるプロパティはイベントソースによって異なります
    • Kinesis Data Streamsのdata、SQSの場合はbody、DynamoDB Streamsの場合はdynamodbです
  • フィルタルール
    • メタデータプロパティもしくはデータプロパティに適用するフィルタを定義します。

フィルタルールには以下の比較演算子がサポートされています

比較演算子 比較したい内容 フィルタルールの記述
Null UserIDがNULLか? "UserID": [ null ]
Empty LastNameが空文字列か? "LastName": [""]
Equals Nameが"Alice"か? "Name": [ "Alice" ]
And Locationが "New York" かつDay が "Monday"か? "Location": [ "New York" ], "Day": ["Monday"]
Or PaymentType が "Credit" もしくは "Debit"か? "PaymentType": [ "Credit", "Debit"]
Not Weather が "Raining" でないか? "Weather": [ { "anything-but": [ "Raining" ] } ]
Numeric (equals) Price が 100か? "Price": [ { "numeric": [ "=", 100 ] } ]
Numeric (range) Price が 10より超過 ~ 20未満か? "Price": [ { "numeric": [ ">", 10, "<=", 20 ] } ]
Exists プロパティProductName が存在するか? "ProductName": [ { "exists": true } ]
Does not exist プロパティProductName が存在しないか? "ProductName": [ { "exists": false } ]
Begins with Region が us-から始まるか? "Region": [ {"prefix": "us-" } ]

イベントのフィルタは以下のようなフローで評価されます。

図はFiltering event sources for AWS Lambda functionsより引用

フィルタ条件を満たしたレコードのみがLambdaで処理するバッチに入るので、フィルタ条件を満たさないレコードがバッチサイズを無駄に消費することはありません。

やってみる

簡単にイベントソースのフィルタを試してみます。以前自前実装していた、DynamoDB Streamsに流れてきたレコードのうち、INSERTのみを処理するフィルタを試してみます。

まずは適当なDynamoDBのテーブルを作成し、ストリームを有効化します。

パーティションキー:idのみ定義したシンプルなテーブルです。テーブル名はfilter-testとしています

続いてLambdaを作成します。イベントデータの中身を1レコードづつログに出力するシンプルなLambdaです

def lambda_handler(event, context):
    for rec in event['Records']:
        print(rec)

Lambdaのトリガーとして先程のテーブルのストリームを指定します。

フィルタリング条件には

{
  "filters": [
    {
      "pattern": "{\"eventName\": [\"INSERT\"]}"
    }
  ]
}

を指定しています。patternの直下にはメタデータプロパティであるeventNameを指定し、比較演算子には["INSERT"]を指定しています。これでDynamoDBにアイテムが新規登録された場合だけLambdaが起動するようになります。

準備ができたのでDynamoDBにCRUD操作を行いつつ、CloudWatch LogsからLambdaのログを確認してみましょう

まずは新規追加

$aws dynamodb put-item --table-name filter-test --item '{"id":{"S":"1"}}'

追加したアイテムの更新

$aws dynamodb update-item  --table-name filter-test --key '{"id":{"S":"1"}}' --update-expression 'SET key1=:val1' --expression-attribute-values '{":val1":{"S":"val1"}}'

追加したアイテムの削除

$aws dynamodb delete-item --table-name filter-test --key '{"id":{"S":"1"}}'

CloudWatch Logsを確認するとアイテムの新規追加時のみLambdaが起動していることが分かります

まとめ

ユースケース次第では非常に嬉しいアップデートではないでしょうか?これもいわゆるre:Invent予選落ちというやつでしょうかね。re:Inventの直前にLambdaの小型アップデートがいくつか発表されつつ、re:Inventで大きなアップデートが発表されるというのが恒例行事なので、次週からも色々なアップデートに期待していきたいです。

参考