Kinesis Data StreamsとLambdaのイベントフィルタリング機能を利用して、条件に応じたLambdaを起動する

2022.01.31

はじめに

2021年12月にAWS Lambdaは、Amazon SQS、Amazon DynamoDB、Amazon Kinesis をイベントソースとするイベントフィルタリングをサポートしました。

弊社ブログでは以下の記事が参考になります

今回は本機能を利用して、Kinesis Data Streamsで受け取ったイベントをイベントフィルタリングし、フィルタ条件に応じたLambdaを起動する構成を試します。

前提

  • CDK v2を利用
  • CDKとLambdaはTypeScriptを利用

環境情報は以下の通り

ツール/パッケージ バージョン
Node.js 16.3.0
aws-cdk-lib 2.8.0
aws-cdk 2.8.0
aws-sdk 2.1062.0
esbuild 0.14.12

構成

以下の図のように各LambdaのイベントソースとしてKinesis Data Streamsを設定し、イベントフィルタリング設定をします。

kinesis-event-source-mapping

※ 図はFiltering event sources for AWS Lambda functionsを参考

以下のJSONのeventTypeがA,B,Cのパターンがあり、Lambdaが起動される前段でイベントがフィルタリングされ、イベントに応じたLambdaを起動することを想定しています。

{
  "eventType": "A",
  "message": 1
}

本構成は以下のメリットがあると考えています。

  • フィルタ条件に合わないイベント分のLambda起動コストが削減可能
  • イベントの中身に応じたLambdaを起動したい要件がある
    • Lambdaからペイロードの中身を判定し、Lambdaを呼び出している場合、呼び出し元Lambdaの起動コストを削減可能

CDK

lib/event-source-mapping.ts

import { Stack, StackProps } from "aws-cdk-lib";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import * as Kinesis from "aws-cdk-lib/aws-kinesis";
import * as Lambda from "aws-cdk-lib/aws-lambda";
import { Construct } from "constructs";
import { EventSourceMapping, StartingPosition } from "aws-cdk-lib/aws-lambda";

export class KinesisEventFilteringSampleStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const eventTypeList = ["A", "B", "C"];

    // Kinesis Data Streamsを作成
    const dispatchKinesis = new Kinesis.Stream(this, "dispatchKinesis", {
      streamName: `dispatch-kinesis`,
      shardCount: 1
    });

    // EventTypeを処理するLambdaを作成
    // EventType毎に1Lambdaなので、合計3Lambda作成
    const eventTypeAndLambdaMap = eventTypeList.map(eventType => {
      const lambda = new NodejsFunction(this, `event${eventType}Lambda`, {
        functionName: `event-${eventType.toLowerCase()}-lambda`,
        entry: "./src/receive-event-lambda.ts",
        runtime: Lambda.Runtime.NODEJS_14_X,
        environment: {}
      });

      // Kinesis Data StreamsにLambdaへ読み込み/書き込み権限を付与
      dispatchKinesis.grantReadWrite(lambda);

      return {
        eventType: eventType,
        lambda: lambda
      };
    });

    eventTypeAndLambdaMap.map(eventTypeAndLambda => {
      const eventSourceMapping = new EventSourceMapping(
        this,
        `event${eventTypeAndLambda.eventType}SourceMapping`,
        {
          target: eventTypeAndLambda.lambda,
          batchSize: 10,
          eventSourceArn: dispatchKinesis.streamArn,
          startingPosition: StartingPosition.LATEST
        }
      );

      // event-a-lambdaには、eventType Aのみが流れるように設定
      // event-b-lambdaには、eventType Bのみが流れるように設定
      // event-c-lambdaには、eventType Cのみが流れるように設定
      const cfnEventSourceMapping = eventSourceMapping.node
        .defaultChild as Lambda.CfnEventSourceMapping;
      cfnEventASourceMapping.addPropertyOverride("FilterCriteria", {
        // イベントフィルタ条件を指定(※1)
        Filters: [
          {
            Pattern: `{ "data" : {"eventType": ["${eventTypeAndLambda.eventType}"]}}`
          }
        ]
      });
    });
  }
}

※1 フィルタの条件式で利用できる文法は、以下の資料が参考になります。

Lambda

実行するLambdaは、届いたイベントが何か確認できるようにsequenceNumberとdataをロギングするようにします。

src/receive-event-lambda.ts

interface Event {
  Records: {
    kinesis: {
      sequenceNumber: string;
      data: string;
    };
    eventID: string;
    eventSource: string;
  }[];
}

export const handler = async (event: Event) => {
  event.Records.map((record, recordIndex) => {
    console.log(`recordIndex: ${recordIndex}`);
    console.log(`sequenceNumber: ${record.kinesis.sequenceNumber}`);
    console.log(
      `data: ${Buffer.from(record.kinesis.data, "base64").toString()}`
    );
  });
};

イベントを送信する

AWS CLIを利用して、Kinesis Data Streamsへイベントを送信します

Kinesis Data Streamsへイベントを送信するAWS CLIコマンド

aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "1"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "2"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "3"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "4"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "5"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "6"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "7"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "8"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "9"}'

コマンドを実行すると、SequenceNumberが返却されます。実行結果として返却されたSequenceNumberが、各Lambdaにロギングされているか確認します。(後述)

実行結果

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014666993390080342218544424943618",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014666994599006161833173599649794",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014666995807931981447871493832706",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014666997016857801062569388015618",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014666998225783620677267282198530",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014666999434709440291965176381442",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014667000643635259906594351087618",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014667001852561079521292245270530",
    "EncryptionType": "KMS"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49626248226145595354231014667003061486899135990139453442",
    "EncryptionType": "KMS"
}

CloudWatch Logsを確認する

/aws/lambda/event-a-lambda

eventType=Aのイベントのみにフィルタリングされていることが分かります

/aws/lambda/event-a-lambda

/aws/lambda/event-b-lambda

eventType=Bのイベントのみにフィルタリングされていることが分かります /aws/lambda/event-b-lambda

/aws/lambda/event-c-lambda

/aws/lambda/event-c-lambda

さいごに

今回はLambdaのイベントフィルタリング機能を試してみました。SQSやDynamoDB Streamsでも同様の機能が利用可能です。後述したイベントソースに紐づいたLambdaで一部のイベントのみ処理する要件がある場合、活用することでコスト削減やLambdaの実装の複雑さの解消につながると考えています。

参考

Lambda event filtering using AWS CDK