この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
2021年12月にAWS Lambdaは、Amazon SQS、Amazon DynamoDB、Amazon Kinesis をイベントソースとするイベントフィルタリングをサポートしました。
弊社ブログでは以下の記事が参考になります
今回は本機能を利用して、Kinesis Data Streamsで受け取ったイベントをイベントフィルタリングし、フィルタ条件に応じたLambdaを起動する構成を試します。
前提
- CDK v2を利用
- CDKとLambdaはTypeScriptを利用
環境情報は以下の通り
ツール/パッケージ | バージョン |
---|---|
Node.js | 16.3.0 |
aws-cdk-lib | 2.8.0 |
aws-cdk | 2.8.0 |
aws-sdk | 2.1062.0 |
esbuild | 0.14.12 |
構成
以下の図のように各LambdaのイベントソースとしてKinesis Data Streamsを設定し、イベントフィルタリング設定をします。
※ 図はFiltering event sources for AWS Lambda functionsを参考
以下のJSONのeventTypeがA,B,Cのパターンがあり、Lambdaが起動される前段でイベントがフィルタリングされ、イベントに応じたLambdaを起動することを想定しています。
{
"eventType": "A",
"message": 1
}
本構成は以下のメリットがあると考えています。
- フィルタ条件に合わないイベント分のLambda起動コストが削減可能
- イベントの中身に応じたLambdaを起動したい要件がある
- Lambdaからペイロードの中身を判定し、Lambdaを呼び出している場合、呼び出し元Lambdaの起動コストを削減可能
CDK
lib/event-source-mapping.ts
import { Stack, StackProps } from "aws-cdk-lib";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import * as Kinesis from "aws-cdk-lib/aws-kinesis";
import * as Lambda from "aws-cdk-lib/aws-lambda";
import { Construct } from "constructs";
import { EventSourceMapping, StartingPosition } from "aws-cdk-lib/aws-lambda";
export class KinesisEventFilteringSampleStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const eventTypeList = ["A", "B", "C"];
// Kinesis Data Streamsを作成
const dispatchKinesis = new Kinesis.Stream(this, "dispatchKinesis", {
streamName: `dispatch-kinesis`,
shardCount: 1
});
// EventTypeを処理するLambdaを作成
// EventType毎に1Lambdaなので、合計3Lambda作成
const eventTypeAndLambdaMap = eventTypeList.map(eventType => {
const lambda = new NodejsFunction(this, `event${eventType}Lambda`, {
functionName: `event-${eventType.toLowerCase()}-lambda`,
entry: "./src/receive-event-lambda.ts",
runtime: Lambda.Runtime.NODEJS_14_X,
environment: {}
});
// Kinesis Data StreamsにLambdaへ読み込み/書き込み権限を付与
dispatchKinesis.grantReadWrite(lambda);
return {
eventType: eventType,
lambda: lambda
};
});
eventTypeAndLambdaMap.map(eventTypeAndLambda => {
const eventSourceMapping = new EventSourceMapping(
this,
`event${eventTypeAndLambda.eventType}SourceMapping`,
{
target: eventTypeAndLambda.lambda,
batchSize: 10,
eventSourceArn: dispatchKinesis.streamArn,
startingPosition: StartingPosition.LATEST
}
);
// event-a-lambdaには、eventType Aのみが流れるように設定
// event-b-lambdaには、eventType Bのみが流れるように設定
// event-c-lambdaには、eventType Cのみが流れるように設定
const cfnEventSourceMapping = eventSourceMapping.node
.defaultChild as Lambda.CfnEventSourceMapping;
cfnEventASourceMapping.addPropertyOverride("FilterCriteria", {
// イベントフィルタ条件を指定(※1)
Filters: [
{
Pattern: `{ "data" : {"eventType": ["${eventTypeAndLambda.eventType}"]}}`
}
]
});
});
}
}
※1 フィルタの条件式で利用できる文法は、以下の資料が参考になります。
Lambda
実行するLambdaは、届いたイベントが何か確認できるようにsequenceNumberとdataをロギングするようにします。
src/receive-event-lambda.ts
interface Event {
Records: {
kinesis: {
sequenceNumber: string;
data: string;
};
eventID: string;
eventSource: string;
}[];
}
export const handler = async (event: Event) => {
event.Records.map((record, recordIndex) => {
console.log(`recordIndex: ${recordIndex}`);
console.log(`sequenceNumber: ${record.kinesis.sequenceNumber}`);
console.log(
`data: ${Buffer.from(record.kinesis.data, "base64").toString()}`
);
});
};
イベントを送信する
AWS CLIを利用して、Kinesis Data Streamsへイベントを送信します
Kinesis Data Streamsへイベントを送信するAWS CLIコマンド
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "1"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "2"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "A", "message": "3"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "4"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "5"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "B", "message": "6"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "7"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "8"}'
aws kinesis put-record --stream-name dispatch-kinesis --partition-key 123 --data '{"eventType": "C", "message": "9"}'
コマンドを実行すると、SequenceNumberが返却されます。実行結果として返却されたSequenceNumberが、各Lambdaにロギングされているか確認します。(後述)
実行結果
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014666993390080342218544424943618",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014666994599006161833173599649794",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014666995807931981447871493832706",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014666997016857801062569388015618",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014666998225783620677267282198530",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014666999434709440291965176381442",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014667000643635259906594351087618",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014667001852561079521292245270530",
"EncryptionType": "KMS"
}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49626248226145595354231014667003061486899135990139453442",
"EncryptionType": "KMS"
}
CloudWatch Logsを確認する
/aws/lambda/event-a-lambda
eventType=Aのイベントのみにフィルタリングされていることが分かります
/aws/lambda/event-b-lambda
eventType=Bのイベントのみにフィルタリングされていることが分かります
/aws/lambda/event-c-lambda
さいごに
今回はLambdaのイベントフィルタリング機能を試してみました。SQSやDynamoDB Streamsでも同様の機能が利用可能です。後述したイベントソースに紐づいたLambdaで一部のイベントのみ処理する要件がある場合、活用することでコスト削減やLambdaの実装の複雑さの解消につながると考えています。