この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
概要
Kinesis Data Streams を使用して、大規模なデータを収集し、処理することができます。この記事では、AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみました。ここでLambda関数はKinesis Data Streamのコンシューマーとして機能します。Lambda関数は、Kinesis Data Streamsからのイベントを消費します。データレコードがストリームに書き込まれると、Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。
やってみた
IAMロールの作成
- Lambda関数にKinesis Data StreamsとCloudWatchへのアクセスを許可する実行ロールをしておきます。
- 以下のポリシーを含むjsonファイルを作成しておきます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
- 次のコマンドを使用して、上記のポリシーでIAMロールを作成しておきます。
//create an IAM role
aws iam create-role --role-name "lambda-kinesis-role" --assume-role-policy-document file://assumeRole.json
//Output
{
"Role": {
"Path": "/",
"RoleName": "lambda-kinesis-role",
"RoleId": "............",
"Arn": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
"CreateDate": "2022-04-28T07:12:13+00:00",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
}
}
- [AWSLambdaKinesisExecutionRole]ポリシーをIAMロールにアタッチしておきます。
aws iam attach-role-policy --role-name lambda-kinesis-role --policy-arn 'arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole'
Lambda 関数の作成
- 次のコードでindex.jsファイルを作成しておきます。
exports.handler = function(event, context) {
event.Records.forEach(function(record) {
var data = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded data:', data);
});
};
- デプロイパッケージを作成しておきます。
zip function.zip index.js
- 次のコマンドを使用してLambda関数を作成しておきます。
//create Lambda Function
aws lambda create-function --function-name Lambda-Kinesis \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs14.x \
--role arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role \
--region us-east-1
//Output
{
"FunctionName": "Lambda-Kinesis",
"FunctionArn": "arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:Lambda-Kinesis",
"Runtime": "nodejs14.x",
"Role": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
"Handler": "index.handler",
"CodeSize": 324,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2022-04-28T07:32:51.018+0000",
"CodeSha256": "..........",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": ".............",
"State": "Pending",
"StateReason": "The function is being created.",
"StateReasonCode": "Creating",
"PackageType": "Zip",
"Architectures": [
"x86_64"
],
"EphemeralStorage": {
"Size": 512
}
}
Kinesis Data Streamの作成
- 次のコマンドを使用してData Streamを作成しておきます。
aws kinesis create-stream --stream-name lambda-kinesis-stream --shard-count 1 --region us-east-1
- [describe-stream]コマンドを実行して、ストリームの詳細を取得します。
//Describe the Data Stream
aws kinesis describe-stream --stream-name lambda-kinesis-stream --region us-east-1
//Output
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": ".............."
},
"SequenceNumberRange": {
"StartingSequenceNumber": "............."
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/lambda-kinesis-stream",
"StreamName": "lambda-kinesis-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": "2022-04-28T13:04:46+05:30"
}
}
Event Source Mappingの作成
-
Lambda でイベントソースを追加するために、関数名とデータストリームArnを使用して次のコマンドを実行しておきます。
aws lambda create-event-source-mapping --function-name Lambda-Kinesis \
--event-source arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/lambda-kinesis-stream \
--batch-size 100 --starting-position LATEST --region us-east-1
テストする
- イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加しておきます。次のコマンドを使用して、データレコードをデータストリームに追加しておきます。
//add data record to kinesis data stream
aws kinesis put-record --stream-name lambda-kinesis-stream --partition-key 1 \
--data "Hello, Welcome" --cli-binary-format raw-in-base64-out --region us-east-1
//Output
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "11111122222223333333344444444555555556666667899754"
}
- データレコードが追加されると、Lambda関数が呼び出されます。この関数は、レコードからデータをデコードしてログに記録します。CloudWatchログでアウトプットを見ることができます。
まとめ
AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみました。他のコンシューマーとKinesis Data Streamsを試すことができます。
Reference :