【レポート】Amazon ECSとKafka Streamsを活用した低コストで高機能なストリーミングアプリケーションの構築 #AWSSummit

DA事業本部の春田です。

AWS Summit Online絶賛開催中!ということで、本記事では「CUS-47: Building Semi-Realtime Processing System with Kafka and Kafka Streams on Amazon ECS」の内容についてまとめていきます。

セッション情報

  • Repro 株式会社 CTO 橋立 友宏 氏

より短かいスパンでより大規模な情報を分析可能にすることが求められる現代のデータ基盤を実現するため、Amazon ECS と Kafka Streams フレームワークを活用して低コストで高機能なストリーミングアプリケーションを構築するノウハウについて解説します。

※セッション動画は以下リンク

アジェンダ

  1. ReproのリアルタイムデータフローとKafkaの活用について
  2. Apache Kafkaとは
  3. Kafka Streamsをより詳しく理解する
  4. EC2とECSにKafkaとKafka Streams Workersをデプロイする
  5. Kafka Streamsに向いたインスタンスやボリューム選択
  6. (時間があれば)パフォーマンスチューニングについて
  7. まとめ

ReproのリアルタイムデータフローとKafkaの活用について

  • Reproについて
    • Marketing Solution CompanyとしてCustomer Engagement Platformを提供
    • 大規模なデータをすぐに活用可能にするためのデータフローが非常に重要なサービス

  • Reproのイベントストリーム概要
    • リアルタイムのイベントの集計、ユーザープロフィールの保存等を行い、ストレージレイヤーに保存
    • ユーザーの行動をAPIやS3で受け取る
    • データをKafkaに集めて、リアルタイム集計をしたり、ユーザープロフィールの保存などを行う
    • 加工したデータはストレージレイヤーに保存される
      • 最近はCassandraを活用
    • 集計後、再度Kafkaに流して、ユーザー分類を行う
    • 今後もKafkaを用いて拡張していく予定

Apache Kafkaとは

  • Apache Kafkaとは
    • Kafka公式ドキュメント → Distributed Streaming Platform
    • レコードのストリームをpub/subする
      • キューとは似て非なるもの
    • 対障害性を持った方法でレコードを永続化
    • パーティショニングされたトピックという単位でレコードを保持
      • パーティション毎にデータに時系列は保証される
      • パーテションは複数ノードに渡って分散可能 → スケーラビリティ
  • 参照: Apache Kafka

  • キューとの相違点
    • Subscribers(Consumers)はKafka内のレコードを削除(消費)しない
    • SubscribersはKafka Brokerに保持されているoffsetの値を基に、取得するレコードの位置を決める
    • Publishers(Producers)がパーティショニングの責任を持つ
      • Kafka Brokerは到着したレコードをパーティショニングするロジックを持っていない
      • パーティショニングのアルゴリズムはクライアントライブラリに実装 → 各々のクライアント実装によって異なる
    • 複数のワーカーが同じレコードを処理しないようにするために、ConsumerGroupという概念を用いる
      • パーテション単位&グループ単位でレコードを処理

  • Kafkaの設計方針
    • Kafkaはデータストレージでもある
    • Kafkaは受け取ったデータを即時ディスクに書き込み他のノードにレプリケーション
      • → この時、OSのファイルシステムをそのまま利用して、単純にファイルとして書き込みを行う
      • → Kafka Brokerのパフォーマンスは、ファイルシステムのスループットやOSのページキャッシュサイズに大きく依存
      • → 高いディスク性能や大容量のメモリが必要
    • もしKafka BrokerをEC2にデプロイするなら、r5 seriesと複数EBSを利用することをオススメする

Kafka Streamsをより詳しく理解する

  • JavaとScalaを利用してリアルタイムストリーム処理を行うアプリケーションを構築するためのフレームワーク
  • Apache Flinkに似ているが、Kafkaのエコシステムだけで完結
  • ストリーム処理を簡単に記述するためのDSLを多く提供していて、JavaのStream APIのようにアプリケーションを記述できる
  • 複雑なアプリケーションを記述するための低レベルAPIとしてProcessor APIも提供
// Sample Code: https://kafka.apache.org/25/documentation/streams/
public static void main(final String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    // メソッドチェーンで書ける
    KTable<String, Long> wordCounts = textLines
        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
    wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();
}
  • Kafka Streamsのコアコンセプト
    • Topology
      • 一連のストリーム処理をトポロジーとして扱う
      • トポロジーはプロセッサーノードとエッジで構成されるDAG(数珠つなぎのグラフ構造)
      • Source Processor
        • 入力を表すプロセッサー
      • Stream Processor
        • 処理の内容を表すプロセッサー
      • Sink Processor
        • 出力を表すプロセッサー
    • Stream Thread
      • Kafka Streamsのワーカーの単位
      • 一台のマシンの中で複数のStream Threadを動作させることができる
    • Task
      • パーティション毎にインスタンス化(具体化)されたトポロジー
      • 一つのStream Threadは複数のTaskを担当できる
    • 参照

  • Stream DSLの基本概念
    • KStream
      • ステートレスなストリームを表すモデル
      • MapやFilterなど、あるレコードの処理が単体で完結できて、状態を保持しなくていいもの
    • KTable
      • ステートフルなストリームを表すモデル
      • CountやAggregateといった、複数のレコードが情報の出力に必要になる処理
      • StateStoreというKey-valueストアで状態を保持
      • フォールトトレランスのために接続されたKafka Topicで関連付け
    • Global KTable
      • KTableと同様にステートフルなデータを表わすが、全ワーカーから全てのデータに参照可能
      • マスタデータの参照などに使用

  • DSLを利用する際の注意点
    • 処理対象のレコードがどのパーティションに割り当てられているかを意識する必要がある
    • 各々のKafka Streamsワーカーはパーティション毎に独立していて、あるTaskは他のTaskで処理されているレコードを利用できない
    • 複数のトピックを利用してデータを結合したりする場合、パーティション総数とパーティションロジックを揃えておく必要がある
      • クライアントアプリごとにパーティションロジックが異なる可能性があるため注意
      • → いくつかのDSLはレコードのre-partitioningを発生させる
        • 例) selectKey、mapKey
    • 不要なRe-partitioningはパフォーマンスに悪影響を及ぼす可能性

  • StateStoreのバリエーション
    • StateStoreにはいくつかのタイプがある
      • KeyValue
      • KeyValue with timestamp
      • Time Windowed KeyValue
      • Time Windowed KeyValue with timestamp
    • そして、バックエンドに二種類の方式が用意されている。
      • Persistent Store (bases on RocksDB)
        • メモリに乗り切らないデータ、落ちても復旧できる
      • In-memory Store
        • 揮発していい一時データ
  • StateStoreの耐久性
    • Persistent StateStoreは、Kafka Topicと関連付けられている
    • StateStore APIがRocksDBにレコードを書き込むと、同時に関連付けられたKafkaトピックにも書き込みを行う
    • もしワーカーがダウンすると、他のワーカーがKafkaトピックからデータを取得してStateStoreを復旧する

  • Kafka Streamsのユースケース
    • ユーザーの行動履歴をリアルタイムにカウントして集計する
    • モバイルデバイスへのプッシュ通知のために、スロットリング機能を備えた柔軟なキューを実装する
    • ユーザーのアクセスに合わせて、別のノードにデータをpostbackする
    • 他のデータストアに転送するための中継処理

EC2とECSにKafkaとKafka Streams Workersをデプロイする

  • 弊社のKafkaとKafka Streamsの構成
    • Kafka Brokers: R5.2xlarge × 18
      • EBS × 4
    • Kafka Streams: i3.xlarge × 120
  • Apache Kafka運用のためのEBS活用
    • st1ストレージのコストパフォーマンスが高い
      • 冗長性はKafka自体が担保するので、ストライピングしてスループットとIOPSを稼ぐ
      • 高負荷だったりデータ量が少なくて済む場合はio1を使うのが良い
    • EBSスナップショットはKafkaのデータバックアップを簡易化してくれて非常に便利
      • もしディスクをストライピングしている場合は、インスタンス単位でスナップショットが取れるマルチボリュームスナップショットを利用
    • Data Lifecycle Managerを利用すると、自動でバックアップの世代管理やタグ付けをやってくれるので活用を推奨
  • Kafka Streamsのためのインスタンス選択
    • Kafka StreamsのワーカーはCPU、メモリ、ストレージのパフォーマンス全てがボトルネックになり得る
      • 特にStateStoreのバックエンドであるRocksDBが多くのIOPSとキャッシュメモリを必要とする
    • ステートフルな集約を多用する場合は、ローカルにNVMeストレージを持っているストレージ最適化インスタンスを使うのが良い
      • 例) i3、i3en、m5d、r5d
      • インスタンスごとストレージが失われてもデータの永続性はKafka Topicによって担保される
        • → インスタンスローカルストレージと相性が良い
    • ステートレスなアプリケーションの場合は、CPUがボトルネックになるのでコンピュート最適化インスタンスを選択する
      • 例) c5
  • スポットインスタンスの活用
    • Kafka Streamsのクラスターにスポットインスタンスを活用して60%のコストカットを実現
    • ストリーム処理のスループットとコスト感は、概ね以下の通り
      • かなり複雑な処理のスループット: 100K records/sec
        • 80 x i3.xlarge instances
        • $8500/month
      • シンプルなデータ変換処理のスループット: 1M records/sec
        • 6 x r5.xlarge instances
        • $300/month

→ Kafka StreamsのワーカーでSpot Instanceを利用するのは多少問題もある

  • Re-balancingによるstop the world問題
    • Kafka ConsumerがConsumerGroupにジョインしたりグループから外れたりする時、パーティション割り当てのリバランスが発生
      • パーティションの総数が大きいと、リバランスにしばしば長時間かかる
      • 最近のバージョンで、Re-balance protocolの改善により、全停止する様なことは無くなったが、それでも部分的に処理が停止することがある
      • → Kafka consumersは頻繁なAuto Scalingには向かない
      • → ピーク時に必要なパフォーマンスを基にリソースを用意しておく方が良い
    • スポットインスタンスによって頻繁にノードが増減すると、最悪リバランスが無限ループして処理が長時間停止してしまう
  • StateStoreのレストア所要時間の問題
    • Kafka StreamsアプリケーションがStateStoreのデータを失うと、Kafka topicsからデータを再取得にかかる
      • 例) データを失う = worker instanceがterminateされる
    • StateStoreに保持されていたデータが多いほど, レストアには長い時間がかかる
    • Kafka StreamsはStateStoreのレストアを実行しているスレッドでは処理を停止する
    • レストア処理はKafka Brokerのネットワークとストレージに高負荷をかける可能性がある
      • それを見越したリソースコントロールをする必要がある

Kafka Streamsに向いたインスタンスやボリューム選択

  • Processor API
    • Kafka Streamsの低レベルAPI
    • DSLもこれを利用して実装されている
    • StateStoreを直接触ることができる
    • DSLにない複雑な処理を実装できる
      • 例) rangeJoin、grouping in multi time windows
    • 繰返し実行するスケジュール処理を実行できる
      • 他のデータソースからポーリングする処理やバッファを実装できる

  • 構造化データの利用
    • Kafka Streamsは任意のSerializerとDeserializerを利用できる
    • Confluent.ioは、JsonSerializer、AvroSerializer、ProtobufSerializerを提供
    • それらのスキーマを保持するためのスキーマレジストリも合わせて提供
    • Avroを利用するコードは以下の通り

  • Kafka Streams works synchronously
    • 各々のTaskはシングルスレッドで同期的に動作する(synchronously)
      • 一台のマシンに、複数のスレッドがある
    • Kafka Streamsは、シンプルなwhile-loopをベースにしている
      • 毎回のループが終わる度に、offsetコミットの必要性を確認し、実行待ちに入っているスケジュール処理の実行を行う
      • → 長時間のプロセスは、あるStream ThreadのTask全体をブロックする
      • → 複雑な処理を記述する場合は、長時間処理がブロックされない様に気を付ける必要がある

(時間があれば)パフォーマンスチューニングについて

  • Kafka Streamsの重要なコンフィグパラメーター
    • max.poll.records: 一度に取得するレコードの最大数
      • 取得数が少な過ぎるとポーリングの回数が増えて、パフォーマンスが低下する
    • max.fetch.bytes、max.partition.fetch.bytes: 一度に取得できるレコードの最大サイズ
    • commit.interval.ms: オフセットコミットを行う間隔
      • オフセットコミットは、Kafka Streamsにおいては同期処理
      • 頻繁なコミットはメインロジックをブロックしてしまう
    • RocksDBの設定によっても、Statefulアプリケーションのパフォーマンスは大きく変化する
  • Tuning of RocksDB
    • RocksDBは、LSM-treeベースのキーバリューストア
    • 特に重要な要素は以下の通り
      • WriteBuffer (MemTable)
      • SSTable Compaction
      • Block Cache
  • RocksDBConfigSetter
    • Kafka Streamsでは、RocksDBConfigSetterを実装することで設定を変更できる
    • ちなみに、RocksDBはキャッシュとWriteBufferのためにそれなりの量のメモリを必要とする
    • デフォルトの設定値は、かなり少ないのでproduction運用時は調整した方が良い
      • WriteBufferを増やす
      • Block Cacheにメモリを割り当てる
      • Write Amplificationを避けるため、SSTableのサイズを大きくする

まとめ

  • Apache Kafkaはリアルタイムストリーミングアプリケーションを構築する上で非常に重要な基盤
    • 短期間でデータを活用可能にすることが求められる昨今、ニーズは増えている
  • Kafka StreamsはKafkaを活用したストリーミングアプリケーションの構築を大幅に簡易化してくれる
    • 一方で、良いパフォーマンスを出すためには、それなりにチューニングが必要であり、裏側の動作やRocksDBについて知っておくことが重要
  • Kafkaでストリーミングアプリケーションを設計する時には、パーティションキーとパーティショニングアルゴリズムの選択が重要
    • 無駄なリパーティショニングを発生させないことがパフォーマンス向上に繋がる