[レポート] AWS Lambdaストリーミングイベントソースのマスター #SVS323-R1 #reinvent

本記事はre:Invent 2019のセッション「SVS323-R1 - Mastering AWS Lambda streaming event sources」のレポートです。ストリーミングシステムのアーキテクチャの方針、運用、トラブルシューティング、ベストプラクティスなどが詰まった素晴らしいセッションですのでぜひご一読ください。
2019.12.31

Mastering AWS Lambda streaming event sources

本記事はre:Invent 2019のセッション「SVS323-R1 - Mastering AWS Lambda streaming event sources」のレポートです。

スピーカー

  • Adam Wagner [Senior Developer Advocate – Solutions Architect, Amazon Web Services]

概要

このセッションでは、LambdaでAmazon DynamoDBストリームとAmazon Kinesis Data Streamsを使用する詳細について説明します。 一般的なアーキテクチャパターンについて説明し、DynamoDBのさまざまなスケーリングオプションとそれらがストリームに与える影響について説明します。 次に、Kinesisスケーリングについて説明し、ファンアウトの強化とそれがいつ役立つかを説明します。 最後に、信頼性、管理、監視、および問題が発生した場合の対処方法について説明します。

スライド

レポート

イベントソースのストリーミング

Amazon Kinesis

  • ビデオまたはデータのストリームをリアルタイムで簡単に収集、処理、分析できるサービス
  • 本セッションではAmazon Kinesis Data Streamsにフォーカス

Amazon DynamoDB

  • フルマネージドなNoSQLサービス
  • ドキュメントまたはキーバリュー値を格納
  • いかなるワークロードにもスケール可能
  • 高速かつ一貫性
  • きめ細やかなアクセスコントロール
  • イベントドリブンプログラミングに最適

DynamoDB Streams

  • Itemの変更のストリーム
  • Exactly Onceの配信保証
  • キーによる厳密な順序付け
  • 耐久性、拡張性
  • フルマネージド
  • 24時間のデータ保持
  • 1秒以内のレイテンシ
  • Lambdaのイベントソースのサポート

本セッションのテーマ

Kinesis Data Streamsで構成した場合、およびDynamoDB Streamsで構成した場合についての解説になります。

Kinesis Data Streamsのポイント

左から順に解説します。まずKinesis Data StreamsではData Producerが新しいレコードをストリームに追加します。Lambdaサービスは、1秒に1回、新しいレコードのストリームをポーリングします。新しいレコードが到着したら、ストリームをSubscribeしているLambda Functionが新しいレコードのデータとともに発火します。5つまでの複数のLambda FunctionがSubscribeできます。

Lambda Functionが呼ばれた際、Lambda Functionが成功レスポンスを返却した後にLambdaサービスは次のレコードをポーリングを開始します。Lambda Functionでエラーが発生した場合は、成功するまで、またはレコードの有効期限が切れるまで同じLambda Functionを呼びます。

DynamoDB Streamsのポイント

DynamoDB Tableに対する操作が全てキャプチャされます。Lambdaサービスは、1秒あたりに4回、新しいレコードのストリームをポーリングします。新しいレコードが到着したら、ストリームをSubscribeしているLambda Functionが新しいレコードのデータとともに発火します。2つまでのLambda FunctionがSubscribeできます。

スケーリング

Kinesis Data Streamsのスケーリング

シャードの追加
  • シャードを追加するとスケールする
  • UpdateShardCount APIを呼ぶだけで増やすことが可能(簡単)
    • ターゲットのシャード数を設定すると、Kinesis Data Streamsがシャードの分割とマージを処理する
  • あるいはSplitShardとMergeShardsにより、よりターゲットを絞ったスケーリングが可能になる

例えば次のようにシャード数を増やします。

$ aws kinesis update-shard-count \
  --stream-name reinvent19-01 \
  --target-shard-count 8 \
  --scaling-type UNIFORM_SCALING
{
"StreamName": "reinvent19-01", "CurrentShardCount": 4, "TargetShardCount": 8
}
UpdateShardCount APIの制限(できないこと)
  • ストリームごとに24時間ごとに2回以上スケーリング
  • ストリームの現在のシャード数を2倍以上に拡大
  • ストリームの現在のシャード数の半分以下に縮小する
  • ストリームで500を超えるシャードまでスケールアップする
  • 結果が500シャード未満でない限り、500シャードを超えるストリームを縮小する
  • アカウントのシャード制限を超えるスケールアップ
シャード分割
  • ストリームは、シャードを分割することにより拡大する
  • シャードを分割すると、親シャードのパーティションキースペースを分割する2つの新しい子シャードが作成される
  • Lambdaは、親シャードからのすべてのレコードを処理するまで、子シャードからのレコードの受信を開始しない
スループットの考慮事項
  • シャード内の最大のメッセージ量 : MIN(1MiB/レコードサイズ平均, 1000)
  • Lambdaの最大メッセージ数(スタンバイ時) : MIN(1MiB/レコードサイズ平均, 1000)
  • Lambdaの最大メッセージ数(エッジケース) : MIN(6MiB/レコードサイズ平均, 10000)
Parallelization Factor (新機能)
  • シャードごとにLambda並列化を追加
  • 最大設定は10
  • パーティションキーごとのバッチ処理。パーティションキーごとに処理を維持
  • Kinesis Data StreamsとDynamoDB Streamsの両方で動作
ベストプラクティス
  • Application Auto Scalingを利用する
  • 保守的な拡張を行う(トラフィックのバーストのオーバーヘッドを残す)
  • シャードカウントをLambdaのスループットに合わせてスケーリングするか、Parallelization Factorを使用する
  • とにかくテスト。ユニットテストを測定してパフォーマンスの低下を監視し、大規模なテストも行う

DynamoDB Streamsのスケーリング

シャードについて
  • Kinesisとの違いは、シャードのコントロールがダイレクトに行えないこと
  • シャード数はテーブルのデータ量とRCU/WCUに依存する
On-demand vs Provisioned Capacity
  • Provisioned Capacity
    • RCU/WCUを設定
    • オプションとして自動スケーリングが可能
    • 定常的なワークロードに適している
  • On-demand
    • 従量課金
    • 完全な自動スケーリング
    • バーストが発生するようなワークロードに適している
    • 多くのサーバーレスワークロードにマッチする
On-demandのスケーリング
  • 大きなスパイクをサポート
  • 一般に、一定量のトラフィックを提供するテーブルの能力が倍になると、ストリーム内のシャードの数も倍になる
  • 重要なポイント
    • 迅速にスケールアップ可能
    • LambdaやDownstreamのサービスもマッチしたスケールが可能
    • 並列数が1以上の場合は注意が必要
      • Lambdaの呼び出し数を監視する
      • アカウントレベルの呼び出し数のアラームを設定する
      • 制限のあるすべてのLambda Functionについても同様

モニタリング・エラーハンドリング

Kinesis Data Streamsのモニタリング

CloudWatchで実施する。

  • Kinesis
    • GetRecords.IteratorAgeMilliseconds
    • IncomingRecords/IncomingBytes
    • ReadProvisionedThroughputExceeded
    • WriteProvisionedThroughputExceeded
  • Lambda
    • Errors
    • IteratorAge
    • Throttles

DynamoDB Streamsのモニタリング

CloudWatchで実施する。

  • DynamoDB Streams
    • ReturnedRecordsCount/RetunedBytes
    • UserErrors
  • Lambda
    • Errors
    • IteratorAge
    • Throttles
    • Duration

エラーハンドリングのオプション (新機能)

エラーについて、デフォルト設定からカスタマイズが行えるようになりました。

  • 最大リトライ数 (0 ~ 10,000)
  • 最大の秒間レコード生存期間 (60 ~ 604,800)
  • Lambda Function失敗時の分割バッチ
  • エラー時の通知先

Lambda Function失敗時の分割バッチ (新機能)

失敗したバッチを再帰的に分割し、レコードの小さなサブセットで再試行し、最終的に問題のあるレコードを分離します。

  • デフォルトでfalse設定
  • これらの再試行はMaximumRetryAttemptsにはカウントされない
  • 関数がべき等であることを確認する

エラー時の通知先 (新機能)

SNSトピックまたはSQSキューに、失敗したレコードのバッチに関するメタデータが送信されます。

  • 設定された再試行制限または最大記録期間に達した後にのみ通知される
  • 分割されたバッチ再試行は再試行制限にカウントされないことに注意
  • 実際のレコードは含まれていないが、それらを取得するために必要なすべての情報が含まれている

一般的な問題

Kinesis Data StreamsのIteratorAgeの急速な増加

  • はじめの質問
    • ストリームにサブスクライブしているLambda Functionはいくつか?
    • Lambda Functionでエラーが発生しているか?
    • Lambda Functionでスロットルが発生しているか?
    • IncomingRecordsまたはIncomingBytesに大幅な増加があるか?
  • 解決方法
    • Lambda Functionのエラー
      • SQSキューまたはSNSトピックをエラー通知先に設定
      • MaximumRetryAttempts, BisectBatchOnFunctionError, MaximumRecordAgeInSecondsをメトリクスに設定
      • Lambda Functionを修正しアップロードする
    • Lambda Functionのスロットル
      • Lambda Functionの制限を緩和する
    • IncommingRecordsまたはIncommingBytesが大幅に増加した場合
      • 一時的なものである場合は待機することで解決する場合があるため、IteratorAgeを見て、高すぎないことを確認します
      • ストリームデータの保持期間を延長する(最大7日間まで延長できる)
      • 並列化係数を増やす
      • ストリーム内のシャードの数を増やす
      • Lambda Functionに割り当てられたメモリを増やすか、関数のパフォーマンスを最適化する

Kinesis Data Streams ReadProvisionedThroughputExceeded

  • 5読み取り/秒。または2MiB/秒の場合は限界に達している
  • 拡張ファンアウトを使用するか、1件以上のSubscriberを削除する
  • Kinesis Data FirehoseとKinesis Data AnalyticsもSubscriberであることに注意する

DynamoDB StreamsのIteratorAgeの急速な増加

  • はじめの質問
    • ストリームにサブスクライブしているLambda Functionはいくつですか?
    • Lambda Functionはエラーまたはスロットルが発生しているか?
    • Lambda FunctionはDurationの増加が発生しているか?
    • DynamoDB Tableの書き込み(WCU)メトリックに大幅な増加があるか?
    • ReturnedRecordsCountまたはReturnedBytesに大幅な増加があるか?
  • 解決方法
    • DynamoDB Tableへの書き込み数が増加していた場合
      • 一時的なものである場合は待機することで解決する場合があるため、IteratorAgeを見て、高すぎないことを確認します
      • データ保持時間を延長することはできないため、より迅速に対応する必要がある
      • Lambda Functionに割り当てられたメモリを増やすか、関数のパフォーマンスを最適化する
      • 並列化係数を増やす
    • ストリームにサブスクライブしているLambda Functionが3つ以上ある場合は、ファンアウトを増やすためにKinesis Data Streamを追加することを検討する

パフォーマンスと最適化

パフォーマンスの観点

アプリケーションにとって何が重要かを考えることが重要です。それによって、取るべき最善策が決まります。

  • E2Eの遅延
  • コスト

Kinesis Data Streamsの拡張ファンアウト(EFO)

強化されたファンアウトにより、パフォーマンスを維持しながら、ストリームから同時に読み取る関数の数をスケーリングできます。またHTTP/2のデータ取得APIを利用することでProducerからLambda Functionまでのデータ配信速度が65%向上します。

通常のConsumerと拡張ファンアウトを適用したConsumerは、次のような観点で使い分けます。

  • 通常のConsumer
    • アプリケーションの総数が少ない(3個以下)
    • 遅延が致命的な問題にはならない
    • 新しいエラー処理オプションが必要
    • コストを最小限に抑えたい
  • 拡張ファンアウトを適用したConsumer
    • アプリケーションの総数が多い
      • 5個まで可能(緩和申請で変更可能)
    • 低遅延が要件にある
      • メッセージは通常、70ミリ秒未満で配信される

Kinesis Data Streamsの小さなメッセージを最適化する

  • シャードごとの書き込み制限
    • 1MiB/秒または1,000メッセージ/秒
    • 大量の小さなメッセージを使用すると、1,000メッセージ/秒の制限に簡単に到達する
    • これにより、シャードあたりのスループットが低下し、コストが高くなってしまう
  • 解決策 : Aggregation / de-aggregation options

低スループットなストリーム

  • 非常に小さなバッチでトリガーされるLambda Function
  • メッセージあたりのコストが高くなってしまう
  • ワークロードをアーカイブするには、結果のペイロードが小さすぎる
  • 解決策 : Batch Window
    • ストリームトリガーを調整する追加のノブ
    • トリガーする前に待機する時間を設定する(最大5分、秒単位で設定可能)
    • バッチサイズは引き続き考慮され、バッチウィンドウが起動する前に完全なバッチでトリガーされる
    • Kinesis Data StreamsとDynamoDB Streamsトリガーの両方で動作

Conclusion

  • ストリーミングシステムの目的を明確にする
  • システムのスケーリング方法を理解する
  • 障害に備え、新しいエラー処理オプションを利用する
  • 個々のコンポーネントとエンドツーエンドのテストを実施する

まとめ

Kinesis Data StreamsとDynamoDB Streamsはアーキテクチャとして非常によく似たものになりますが、シャードの直接指定ができる/できないなどの微妙な違いがあるため、ベストプラクティスが異なってくる点がよく分かりました。とはいえ受け口としてはLambda Functionを選択することは多いため、Lambda Functionのエラーハンドリングも安定性のあるストリーミングシステムを構築する上では重要な観点だと思います。

機能アップデートも盛んですし、今後も頻繁に利用されるアーキテクチャだと思います。本記事がストリーミングサービスの構築や運用の参考になれば幸いです。