Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみた。

2021.12.16

Kinesis Data Streamsとは?

Kinesis Data Streams はサーバーレスデータストリーミングサービスです。Amazon Kinesis Data Streams を使用して、大規模なデータを収集し、処理することができます。Kinesis Data Streamsで、プロデューサーはKinesis データストリームにデータをプッシュし、コンシューマーはリアルタイムにデータを処理します。

この記事では、Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみました。データレコードがストリームに書き込まれると、Lambda Functionが呼び出され、データが処理されます。

 

 

やってみた

IAM ロールの作成

  • LambdaのIAMロールを作成しておきます。
  • この設定でIAMロールを作成しておきます。
    • 信頼されたエンティティの種類 : AWS のサービス
    • ユースケース : AWS Lambda
    • ポリシー  :  AWSLambdaKinesisExecutionRole
    • ロールの名前 :  Lambda-Kinesis-Role
  • このIAMロールは、Lambda Functionを作成するときに使用されます。

 

Lambda Functionの作成

  • AWS Lambdaコンソールで、Create functionを選択しておきます。
  • 関数名を入力して、RuntineでNode.jsを選択して、前の手順で作成したIAMロールを選択して、関数を作成しておきます。

 

 

  • 機能コードを次のコードに置き換えます。必要に応じてコードを変更できます。Kinesisデータはbase64でエンコードされているため、デコードしておきます。
exports.handler = function(event, context) {
    event.Records.forEach(function(record) {
        var data = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded data:', data);
    });
};

 

Kinesis Data Streamの作成

  • Amazon Kinesisコンソールで、Kinesis Data Streamsを選択して、Create data streamをクリックしておきます。

 

 

  • 次の設定でData Streamを作成しておきます。
    • Data stream capacity : Provisioned
    • Provisioned shards : 1

 

 

Lambda でイベントソースを追加する

  • CLIで、関数名とデータストリームArnを使用して次のコマンドを実行しておきます。
aws lambda create-event-source-mapping --function-name Lambda-Kinesis \
--event-source  arn:aws:kinesis:us-east-1:000000000:stream/Kinesis-Lambda-stream \
--batch-size 100 --starting-position LATEST --region us-east-1

 

  • イベントソースマッピングが作成されました。

 

データレコードを Kinesis Data Streamに追加する

  • CLIで次のコマンドを使用して、データレコードをデータストリームに追加しておきます。
aws kinesis put-record --stream-name Kinesis-Lambda-stream --partition-key 1 \
--data "Hello" --region us-east-1

 

 

  • 「Invalid base64」エラーが発生した場合は、[--cli-binary-format raw-in-base64-out ]をコマンドと一緒に使用してください。
aws kinesis put-record --stream-name Kinesis-Lambda-stream --partition-key 1 \
--data "Hello" --cli-binary-format raw-in-base64-out --region us-east-1

 

  • データレコードが追加されると、Lambda関数が呼び出されます。この関数は、レコードからデータをデコードしてログに記録します。CloudWatchログでアウトプットを見ることができます。

 

 

まとめ

Kinesis Data Streamsのイベントを処理する Lambda Functionを作成してみました。Real-time data analyticsなどの多くのシナリオでKinesisデータストリームを使用できます。Kinesis Data Streams は、さまざまなデータストリーミングの問題解決に使用できます。

Reference : Lambda function to consume events from a Kinesis stream