Kinesis Data Streamをソース、LambdaをターゲットにEventBridge Pipeを構築するポイントをまとめてみた(AWS SAMテンプレート付き)

ストリーム処理は面白いよ!
2023.05.09

Amazon EventBridge Pipesはプロデューサー・コンシューマー型のイベントを連携するサービスです。 Pipesでイベントを取得するソース、イベントを処理するターゲットを指定するだけで、グルーコード無しにシームレスにイベント駆動処理できます。

ソースにストリームサービスのAmazon Kinesis Data Stream、ターゲットにAWS Lambdaを指定して、ストリーム処理パイプラインをEventBridge Pipesでいい感じに構築するポイントを紹介します。

AWS SAMで検証環境構築

GitHubのレポジトリ aws-samples/serverless-patterns には、サーバーレス・パターンごとにCDK/SAMなどのテンプレートが掲載されています。

どういうわけか、Kinesis Data Stream - EventBridge - Lambda のサーバーレスパターンが存在しなかったため、AWS Serverless Application Model(AWS SAM)向けテンプレートを用意しました。

GitHub : quiver/blog-sam-eventbridge-pipes-kinesis-lambda

以下のコマンドでデプロイできます。

$ git clone https://github.com/quiver/blog-sam-eventbridge-pipes-kinesis-lambda.git
$ cd blog-sam-eventbridge-pipes-kinesis-lambda
$ sam --version
SAM CLI, version 1.81.0
$ sam deploy --guided

Producer は AWS CLI です。

Kinesis Data Stream

入力イベントに対して複数の処理をする Fan-Out 処理をやりたかったため、ストリームサービスの Kinesis Data Stream を採用しました。イベント:処理=1:1の場合、SQS のようなメッセージキューを採用ください。

ストリームのキャパシティ管理

ストリームのキャパシティ管理は、割高だけれども手離れよくオートスケールする On-Demand 方式と安価だけれども明示的にキャパシティを指定してスケールアウトする Provisioned 方式があります。

ワークロードがダイナミックに変動するシステムで運用負荷を軽減したい場合、シャード数を明示的に指定する Provisioned モードよりも、オートスケールする On-Demand モードが向いているかもしれません。

運用負荷と利用費のトレードオフを検討してください。

参考 :

データ保持期間

Kinesis ストリーム内のデータは、指定した期間だけストリームにデータが残り続け、その間は何度でもデータを取り出せます。

これにより、複数のワーカーが1つのデータを処理したり、一時的なスパイクに対応して時間をかけてゆっくり処理したり、処理に失敗したレコードを再処理するといったことができます。

このデータ保持期間は Data Retention Period として1日〜365日の間で指定可能です。

保持期間を長くすることで、データのリカバリが強化されますが、利用費も増加します。

参考 :

メッセージブローカーの処理

今回の構成では、メッセージブローカーに EventBrige Pipesを利用しています。

Amazon EventBridge Pipesはプロデューサー・コンシューマー型のイベントを連携するサービスであり、ソースとターゲット間を point-to-point に連携します。

ソースには、以下の様なメッセージキューやストリームサービスを指定できます。

  • DynamoDB stream
  • Kinesis stream
  • Amazon MSK topic(Apache KafkaのAWSマネージド・サービス)
  • Amazon MQ message broker (ActiveMQ/RabbitQMのAWSマネージド・サービス)
  • Amazon SQS queue

ターゲットには、以下の様なサービスを指定できます。

  • ウェブフック用URL
  • API Gateway
  • ECSタスク
  • Lambda

EventBrige Pipesは、従来からあるLambdaのイベントソースマッピングをより汎用的に実現するサービスとお考えください。

Kinesisはシャード単位(NOT ストリーム単位)でレコードが時系列順にソートされており、この順序はPipes経由でも維持されます。

実行ロール

イベントソースとポーリングし、取得したメッセージを引数にターゲットを呼び出すのはPipesの仕事です。 そのため、Pipesの実行ロールには以下のアクションを許可します。

ソース操作用

  • kinesis:DescribeStream
  • kinesis:DescribeStreamSummary
  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:ListShards
  • kinesis:ListStreams
  • kinesis:SubscribeToShard

ターゲット操作用

  • lambda:InvokeFunction

Consumerの振る舞い

バッチサイズ、Consumerの並列度(ParallelizationFactor)、Consumer失敗時の処理など、Consumerの振る舞いは Pipes で設定します。

※ Pipesの編集画面のキャプチャ

Kinesis Data Streamがソース、Lambda がConsumerの場合のエラー処理は次の記事でまとめています。

Producerの処理

まずはプロデューサー側の処理です。

Kinesis Data Streamに対して、レコードを base64 エンコードして送信します。

このエンコードにより、バイナリデータも送信可能です。

$ aws kinesis put-record \
  --stream-arn YOUR-STREAM-NAME-ARN \
  --cli-binary-format raw-in-base64-out \
  --partition-key bar \
  --data '{ "reqId": "1-2-3-4", "status": 200, "client": "abc"}'

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49640567222783951011097960734367787140450389581107298306"
}

Kinesisはストリームをシャーディングしてスケールアウトする戦略をとっています。 シャードの振り分けには、パラメーターの --partition-key が利用されます。ランダムな値を設定しましょう。 SDK を利用していれば、裏で良しなに設定してくれるはずです。

レスポンスから、送信したレコードがどのシャード(ShardId)のどの位置(SequenceNumber)に保存されたかわかります。

Consumerの処理

次にコンシューマー側の処理です(Python Lambdaで実装)。

import base64
import json

def lambda_handler(event, context):
    for record in event:
        data = json.loads(base64.b64decode(record['data']))
        print(record)
        print(data)

ポーリングにEventBridge Pipesを利用すると、Consumerにはレコードがリストで渡ってきます。 Producerのレコード送信時とは逆の手順、つまり、base64デコードして実データを取得します。

CloudWatch Logsへのログ出力結果から、base64エンコードされた data がデコードされていることや、Producerが登録したレコードの ShardIdSequenceNumber と Lambda Consumerが受け取ったレコードの 'eventID': 'shardId-000000000000:49640567222783951011097960734367787140450389581107298306' が一致していることも確認できます。

リトライ処理に備え、Consumerは冪等に実装しましょう。

スループットを向上させるには、シャード分割したり、バッチサイズを増やしたり、シャードあたりのConsumerの並列度(ParallelizationFactor)を増やします。

参考 :

なお、Producerはシャードを束ねたストリームに対して操作しますが、Consumerは特定のシャードに対して操作します。

Consumerの並列度を指定する ParallelizationFactor の説明が "Concurrent batches per shard - Process multiple batches from the same shard concurrently"シャード観点であることを思い出しましょう。

とはいえ、EventBridge PipesやLambdaイベントソースマッピング等を使って処理している場合、ディベロッパーはどのシャードを操作するか、意識する必要はありません。これらポーリング系サービスが、各シャードに対して、チェックポイント管理しながら、シーケンシャルに処理してくれます。

例外的に、次のブログのようにConsumerにデッドレターキュー(DLQ)を設定し、DLQメッセージ内のシャードID、シーケンス番号から元のレコードを復元するような場合は、意識する必要があります。

最後に

Amazon Kinesis Data StreamのレコードをEventBridge Pipes経由でLambda Consumerで処理するポイントを紹介しました。

まとめながら、Consumerを中心に主要オプションを説明するだけでも、盛りだくさんなことに気が付きました。Kinesis拡張ファンアウトなど、説明できていない機能がまだまだたくさん有ります。

サイジングのことは一旦忘れ、ワーカーを冪等に実装し、エラーに堅牢にするところから始めるのが良いと思います。

余力があれば、監視についても追記したいと思います(5月下旬予定)。

それでは。