Kinesis Data StreamsとLambdaのイベントフィルタリング機能を利用して、条件に応じたLambdaを起動する
はじめに
2021年12月にAWS Lambdaは、Amazon SQS、Amazon DynamoDB、Amazon Kinesis をイベントソースとするイベントフィルタリングをサポートしました。
弊社ブログでは以下の記事が参考になります
今回は本機能を利用して、Kinesis Data Streamsで受け取ったイベントをイベントフィルタリングし、フィルタ条件に応じたLambdaを起動する構成を試します。
前提
- CDK v2を利用
- CDKとLambdaはTypeScriptを利用
環境情報は以下の通り
ツール/パッケージ | バージョン |
---|---|
Node.js | 16.3.0 |
aws-cdk-lib | 2.8.0 |
aws-cdk | 2.8.0 |
aws-sdk | 2.1062.0 |
esbuild | 0.14.12 |
構成
以下の図のように各LambdaのイベントソースとしてKinesis Data Streamsを設定し、イベントフィルタリング設定をします。
※ 図はFiltering event sources for AWS Lambda functionsを参考
以下のJSONのeventTypeがA,B,Cのパターンがあり、Lambdaが起動される前段でイベントがフィルタリングされ、イベントに応じたLambdaを起動することを想定しています。
{ "eventType": "A", "message": 1 }
本構成は以下のメリットがあると考えています。
- フィルタ条件に合わないイベント分のLambda起動コストが削減可能
- イベントの中身に応じたLambdaを起動したい要件がある
- Lambdaからペイロードの中身を判定し、Lambdaを呼び出している場合、呼び出し元Lambdaの起動コストを削減可能
CDK
import { Stack, StackProps } from "aws-cdk-lib"; import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs"; import * as Kinesis from "aws-cdk-lib/aws-kinesis"; import * as Lambda from "aws-cdk-lib/aws-lambda"; import { Construct } from "constructs"; import { EventSourceMapping, StartingPosition } from "aws-cdk-lib/aws-lambda"; export class KinesisEventFilteringSampleStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); const eventTypeList = ["A", "B", "C"]; // Kinesis Data Streamsを作成 const dispatchKinesis = new Kinesis.Stream(this, "dispatchKinesis", { streamName: `dispatch-kinesis`, shardCount: 1 }); // EventTypeを処理するLambdaを作成 // EventType毎に1Lambdaなので、合計3Lambda作成 const eventTypeAndLambdaMap = eventTypeList.map(eventType => { const lambda = new NodejsFunction(this, `event${eventType}Lambda`, { functionName: `event-${eventType.toLowerCase()}-lambda`, entry: "./src/receive-event-lambda.ts", runtime: Lambda.Runtime.NODEJS_14_X, environment: {} }); // Kinesis Data StreamsにLambdaへ読み込み/書き込み権限を付与 dispatchKinesis.grantReadWrite(lambda); return { eventType: eventType, lambda: lambda }; }); eventTypeAndLambdaMap.map(eventTypeAndLambda => { const eventSourceMapping = new EventSourceMapping( this, `event${eventTypeAndLambda.eventType}SourceMapping`, { target: eventTypeAndLambda.lambda, batchSize: 10, eventSourceArn: dispatchKinesis.streamArn, startingPosition: StartingPosition.LATEST } ); // event-a-lambdaには、eventType Aのみが流れるように設定 // event-b-lambdaには、eventType Bのみが流れるように設定 // event-c-lambdaには、eventType Cのみが流れるように設定 const cfnEventSourceMapping = eventSourceMapping.node .defaultChild as Lambda.CfnEventSourceMapping; cfnEventASourceMapping.addPropertyOverride("FilterCriteria", { // イベントフィルタ条件を指定(※1) Filters: [ { Pattern: `{ "data" : {"eventType": ["${eventTypeAndLambda.eventType}"]}}` } ] }); }); } }
※1 フィルタの条件式で利用できる文法は、以下の資料が参考になります。
Lambda
実行するLambdaは、届いたイベントが何か確認できるようにsequenceNumberとdataをロギングするようにします。
interface Event { Records: { kinesis: { sequenceNumber: string; data: string; }; eventID: string; eventSource: string; }[]; } export const handler = async (event: Event) => { event.Records.map((record, recordIndex) => { console.log(`recordIndex: ${recordIndex}`); console.log(`sequenceNumber: ${record.kinesis.sequenceNumber}`); console.log( `data: ${Buffer.from(record.kinesis.data, "base64").toString()}` ); }); };
イベントを送信する
AWS CLIを利用して、Kinesis Data Streamsへイベントを送信します
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "1"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "2"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "3"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "4"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "5"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "6"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "7"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "8"}' aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "9"}'
コマンドを実行すると、SequenceNumberが返却されます。実行結果として返却されたSequenceNumberが、各Lambdaにロギングされているか確認します。(後述)
{ "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014666993390080342218544424943618", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014666994599006161833173599649794", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014666995807931981447871493832706", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014666997016857801062569388015618", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014666998225783620677267282198530", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014666999434709440291965176381442", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014667000643635259906594351087618", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014667001852561079521292245270530", "EncryptionType": "KMS" } { "ShardId": "shardId-000000000000", "SequenceNumber": "49626248226145595354231014667003061486899135990139453442", "EncryptionType": "KMS" }
CloudWatch Logsを確認する
/aws/lambda/event-a-lambda
eventType=Aのイベントのみにフィルタリングされていることが分かります
/aws/lambda/event-b-lambda
eventType=Bのイベントのみにフィルタリングされていることが分かります
/aws/lambda/event-c-lambda
さいごに
今回はLambdaのイベントフィルタリング機能を試してみました。SQSやDynamoDB Streamsでも同様の機能が利用可能です。後述したイベントソースに紐づいたLambdaで一部のイベントのみ処理する要件がある場合、活用することでコスト削減やLambdaの実装の複雑さの解消につながると考えています。