AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみた。

2022.04.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

Kinesis Data Streams を使用して、大規模なデータを収集し、処理することができます。この記事では、AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみました。ここでLambda関数はKinesis Data Streamのコンシューマーとして機能します。Lambda関数は、Kinesis Data Streamsからのイベントを消費します。データレコードがストリームに書き込まれると、Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。

 

 

やってみた

IAMロールの作成

  • Lambda関数にKinesis Data StreamsとCloudWatchへのアクセスを許可する実行ロールをしておきます。
  • 以下のポリシーを含むjsonファイルを作成しておきます。

 

{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "lambda.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
}

 

  • 次のコマンドを使用して、上記のポリシーでIAMロールを作成しておきます。
//create an IAM role
aws iam create-role --role-name "lambda-kinesis-role" --assume-role-policy-document file://assumeRole.json

//Output
{
    "Role": {
        "Path": "/",
        "RoleName": "lambda-kinesis-role",
        "RoleId": "............",
        "Arn": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
        "CreateDate": "2022-04-28T07:12:13+00:00",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "lambda.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}

 

  • [AWSLambdaKinesisExecutionRole]ポリシーをIAMロールにアタッチしておきます。
aws iam attach-role-policy --role-name lambda-kinesis-role  --policy-arn 'arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole'

 

Lambda 関数の作成

  • 次のコードでindex.jsファイルを作成しておきます。

 

exports.handler = function(event, context) {
    event.Records.forEach(function(record) {
        var data = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded data:', data);
    });
};

 

  • デプロイパッケージを作成しておきます。
zip function.zip index.js

 

  • 次のコマンドを使用してLambda関数を作成しておきます。
//create Lambda Function
aws lambda create-function --function-name Lambda-Kinesis \
      --zip-file fileb://function.zip --handler index.handler --runtime nodejs14.x \
      --role arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role \
      --region us-east-1

//Output
{
    "FunctionName": "Lambda-Kinesis",
    "FunctionArn": "arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:Lambda-Kinesis",
    "Runtime": "nodejs14.x",
    "Role": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
    "Handler": "index.handler",
    "CodeSize": 324,
    "Description": "",
    "Timeout": 3,
    "MemorySize": 128,
    "LastModified": "2022-04-28T07:32:51.018+0000",
    "CodeSha256": "..........",
    "Version": "$LATEST",
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "RevisionId": ".............",
    "State": "Pending",
    "StateReason": "The function is being created.",
    "StateReasonCode": "Creating",
    "PackageType": "Zip",
    "Architectures": [
        "x86_64"
    ],
    "EphemeralStorage": {
        "Size": 512
    }
}

 

 

Kinesis Data Streamの作成

  • 次のコマンドを使用してData Streamを作成しておきます。
aws kinesis create-stream --stream-name lambda-kinesis-stream --shard-count 1 --region us-east-1

 

  • [describe-stream]コマンドを実行して、ストリームの詳細を取得します。
//Describe the Data Stream
aws kinesis describe-stream --stream-name lambda-kinesis-stream --region us-east-1

//Output
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": ".............."
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "............."
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/lambda-kinesis-stream",
        "StreamName": "lambda-kinesis-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": "2022-04-28T13:04:46+05:30"
    }
}

 

 

Event Source Mappingの作成

  • Lambda でイベントソースを追加するために、関数名とデータストリームArnを使用して次のコマンドを実行しておきます。 

aws lambda create-event-source-mapping --function-name Lambda-Kinesis \
     --event-source  arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/lambda-kinesis-stream \
     --batch-size 100 --starting-position LATEST --region us-east-1

 

 

テストする

  • イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加しておきます。次のコマンドを使用して、データレコードをデータストリームに追加しておきます。

 

//add data record to kinesis data stream
aws kinesis put-record --stream-name lambda-kinesis-stream --partition-key 1 \
     --data "Hello, Welcome" --cli-binary-format raw-in-base64-out --region us-east-1

//Output
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "11111122222223333333344444444555555556666667899754"
}

 

  • データレコードが追加されると、Lambda関数が呼び出されます。この関数は、レコードからデータをデコードしてログに記録します。CloudWatchログでアウトプットを見ることができます。

 

まとめ

AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみました。他のコンシューマーとKinesis Data Streamsを試すことができます。

Reference : 

Kinesis Data Streams with Lambda Function using AWS CLI