[Apache Kafka] Kafka StreamsでStream処理をしてみる [node.js]

Apache Kafkaとは

Apache Kafkaとは、Linkedinが開発した分散メッセージキューで、
データストリーミング用のプラットフォームです。

メッセージキューとは、システム間でデータのハブととして機能し、
対象データを一時的に保持してくコンポーネントです。
キューをはさむことでシステム間を疎結合にし、通信を非同期化することができます。
AWSでいえば、sqskinesisが同様のサービスにあたります。

Apache Kafkaはスケーラビリティや大量データの扱いに長けており、耐障害性もあるスグレモノです。
本稿ではApache Kakfaとnode.js用モジュールのkafka-nodeとkafka-streamsをつかって
node.jsからkafkaへアクセスしてみます。

Amazon Kinesisとの比較

以前はAWSフルマネージド(kinesis)かどうか、という決定的な違いがあったのですが、
2018年11月にAmazon Managed Streaming for Apache Kafka (Amazon MSK)
リリースされたのでその違いもほぼなくなりました。
※ Amazon MSK : Apache Kafka をストリーミングデータ処理に使用する際のフルマネージド型サービス

ここではkafkaとkinesisの比較をしていますが、
主な違いは

  • データ保持期間 : kafka = 無制限 , kinesis = 1日〜7日
  • データサイズmax : kafka = default 1MB(設定可能) , kinesis = 1MB
  • 依存サービス : kafka = ZooKeeper , kinesis = DynamoDB

というような感じです。
絶対にどちらかしか実現できない機能はありませんが、
最初から大量のストリームデータ配信を想定している場合はkafka、
そうでない場合はkinesisを選択することが多いようです。

参考:
・https://www.ossnews.jp/compare/Apache_Kafka/Amazon_Kinesis
・http://www.itcheerup.net/2019/01/kafka-vs-kinesis/

Apache Kafkaのサンプル

まずはKafkaを動かしてみましょう。
今回はdockerをつかってkafkaを起動し、node.jsでproducer/consumerサンプルの確認をしてみます。

環境

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.12.6
  • Docker : 18.09.2
  • node.js : v11.14.0

Dockerイメージからkafka用コンテナを作成してコンテナにログインします。

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost  --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka

$ docker exec -it test_kafka bash

kafkaディレクトリに移動し、topicをtestという名前で作成します。

root@xxxx:/# cd /opt/kafka_2.11-0.10.1.0/
root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

topicを作成したらnode.jsでProducer側のプログラム(producer.js)を作成します。
本稿では、node.jsからkafkaにアクセスするためのkafka-nodeモジュールを使用します。
事前にkafka-nodeをnpmでインストールしておきましょう。

% cd path/yuor/node-app
% npm install kafka-node
// publisher.js

'use strict';
var kafka = require("kafka-node");

const Producer = kafka.HighLevelProducer;
const client = new kafka.KafkaClient({
    kafkaHost: "localhost:9092"
});
const producer = new Producer(client, {
    partitionerType: 1
});

producer.on("ready", () => {
    //プログラム引数で名前と年齢を受け取る
    const name = process.argv[2];
    const age = process.argv[3];
    const message = [
        {
            topic: "test",
            messages: JSON.stringify({name: name, age: age})
        }
    ];

    producer.send(message, (err, data) => {
        if (err) console.log(err);
        else console.log('send messages');
        process.exit();
    });
});

次にConsumer(consumer.js)を作成します。

'use strict';
var kafka = require('kafka-node');

const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({kafkaHost: "localhost:9092"});
const consumer = new Consumer(
    client,
    [{topic: "test", partision:0}],
    {
        groupId: "my-consumer",
        autoCommit: true,
        fromOffset: true
    }
);

consumer.on("message", (message, err) => {
    if (err) console.log("error : " + err);

    const json = JSON.parse(message.value);
    console.log("JSON:" + JSON.stringify(json));
    console.log("Name:" + json.name);
    console.log("Age:" + json.age);
});

consumer.on('error', function (err) {
    console.log('error', err);
});

producer.jsを実行してkafkaにメッセージを送ります。

$ node producer.js taro 30
send messages

consumerを起動すると、topicに対してメッセージを取得しに行きます。
(先にconsumer.jsを起動していてもメッセージが送られたときに取得する)

$ node consumer.js
JSON:{"name":"taro","age":"30"}
Name:taro
Age:30

これでkafkaの動作確認は終了です。

Kafka Streamsとは

Kafkaの動作確認もできたので、次はKafka Streamsを動かしてみましょう。
Kafka Streamsとは、Apache Kafka v0.10から同梱されているライブラリで、
これを使えばStream処理をある程度簡単に実装できるようになります。
例えば、
「サンプルAのtopicにデータが送られたら、それに対して処理を実行してサンプルBのtopicへ送る」
といった処理が可能になります。

KStreamとKTable

Kafkaに流れてくるStream(key-value形式)には2つの種類があるという考えのもと、
「KStream」「KTable」という2つのStreamタイプを使い分けることができます。

1つ目はKStream(record stream)と呼ばれる、
Kafkaからうけとったデータがそのまま追加されるタイプです。
ここにある例でいうと、↓にあるような2件のStreamデータが流れてきたとします。

// { user(key) : count }
{"alice" : 1}
{"alice" : 3}

このデータはユーザーごとのカウント数を表しており、KStreamを使用しているならcountは4となります。

2つ目はKTable(changelog stream)で、
もしすでに同じキーのデータが存在するならデータが更新されます。
上記2件のStreamデータをKTableで処理した場合、countは3となります。
なお、KTableにおいてNULLを持つデータは、そのデータのキーに対する削除を表します。

その他Kafakaの特徴

他にもKafkaではいろいろな機能を持っています。
Streamとテーブルのjoinができたり、集約(max/min/avg/sum) も可能です。
また、window関数を使用して任意の期間についてStreamデータをグルーピングすることも可能です。
join/aggregation/windowingについては公式ドキュメント等をご確認ください。

node-kafka-streamsを使ったサンプル

では、kafka-streamsを使った簡単なサンプルを作成してみましょう。
ここにあるwordCountを少しかえて試してみます。
ここではinput-topicとoutput-topicの2つのtopicを使用するので、
コンテナにログインして作成しましょう。

root@xxxx:/# cd /opt/kafka_2.11-0.10.1.0/
root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic
root@xxxx:/# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic

次に、Kafka Streamsを使用するためのモジュール、kafka-streamsもnpmでインストールします。

% cd path/yuor/node-app
% npm install kafka-streams

Kafka Streamsを使ったサンプルです。
ここではinput-topicにメッセージを送り、そのメッセージの数をcountしてoutput-topicに送ります。
例えば、
"ftuit banana"
"ftuit orange"
"ftuit apple"
と3つのメッセージをinput-topicにおくった場合、
"fluit 3"
というふうに、メッセージの最初の文字列(ここだとfluit)をキーとして
その数をoutpu-topicに送ります。

"use strict";

//wordCount.js

const { KafkaStreams } = require("kafka-streams");
const { nativeConfig: config } = require("./config.js");

const keyMapperEtl = (kafkaMessage) => {
    const value = kafkaMessage.value.toString("utf8");
    console.log("message : " + kafkaMessage);
    const elements = value.toLowerCase().split(" ");
    return {
        someField: elements[0],
    };
};

const kafkaStreams = new KafkaStreams(config);

kafkaStreams.on("error", (error) => {
    console.log("Error occured:", error.message);
});

const stream = kafkaStreams.getKStream();

//input-topicから取得したデータを
//キー毎にカウントしてoutput-topicに送る(count >= 3のキー)
stream
    .from("input-topic")
    .map(keyMapperEtl)
    .countByKey("someField", "count")
    .filter(kv => kv.count >= 3) 
    .map(kv => kv.someField + " " + kv.count)
    .tap(kv => console.log(kv))
    .to("output-topic");


Promise.all([
    stream.start()
]).then(() => {
    console.log("started..");
    // 50秒したらStreamをclose
    setTimeout(() => {
        kafkaStreams.closeAll();
        console.log("stopped..");
    }, 50000);
});

Streamの設定ファイルです。

"use strict";

//config.js

//dont use these settings for production, it will set your broker on fire..
const batchOptions = {
    batchSize: 5,
    commitEveryNBatch: 1,
    concurrency: 1,
    commitSync: false,
    noBatchCommits: false
};

const nativeConfig = {
    noptions: {
        "metadata.broker.list": "localhost:9092", //native client requires broker hosts to connect to
        "group.id": "kafka-streams-test-native",
        "client.id": "kafka-streams-test-name-native",
        "event_cb": true,
        "compression.codec": "snappy",
        "api.version.request": true,

        "socket.keepalive.enable": true,
        "socket.blocking.max.ms": 100,

        "enable.auto.commit": false,
        "auto.commit.interval.ms": 100,

        "heartbeat.interval.ms": 250,
        "retry.backoff.ms": 250,

        "fetch.min.bytes": 100,
        "fetch.message.max.bytes": 2 * 1024 * 1024,
        "queued.min.messages": 100,

        "fetch.error.backoff.ms": 100,
        "queued.max.messages.kbytes": 50,

        "fetch.wait.max.ms": 1000,
        "queue.buffering.max.ms": 1000,

        "batch.num.messages": 10000
    },
    tconf: {
        "auto.offset.reset": "earliest",
        "request.required.acks": 1
    },
    batchOptions
};

module.exports = {
    nativeConfig
};

wordCount.jsを起動し、input-topicに対して下記のようなメッセージをおくってみると、
"fruit banana"
"fruit orange"
"fruit apple"
のような同一キーを指定したメッセージを3つ送ると、
output-topicに対して
"fruit 3"
といったキーのカウント数を送ります。

最後に

今回はApache KafkaとKafka Streamsの動きを簡単に確認してみました。
ストリーム処理が非常に簡単に動かすことができたと思います。
他にも使えそうな機能が多くあるので、確認してみてください。

なお、動作確認がおわったら、不必要なコンテナはdocker rmで削除しておきましょう。

# コンテナを削除
$ docker rm test_kafka

参考サイト

  • https://qiita.com/41semicolon/items/60f92a1db6dfce4303c5
  • https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3
  • https://qiita.com/41semicolon/items/60f92a1db6dfce4303c5
  • https://qiita.com/mkyz08/items/a3b866c46ca49c52e647
  • http://pppurple.hatenablog.com/entry/2019/03/28/235810
  • https://www.slideshare.net/techblogyahoo/kafka-streams-kafkajp
  • https://github.com/SOHU-Co/kafka-node
  • https://github.com/tulios/kafkajs
  • https://qiita.com/minarai/items/f571db36a19806aee491
  • https://github.com/nodefluent/kafka-streams
  • https://vicki.substack.com/p/you-dont-need-kafka