Amazon MSK(フルマネージドのKafkaクラスタ)のパブリックプレビューが東京リージョンに来たので試してみました

はじめに

昨年のre:Invent2018で発表されたフルマネージドのKafkaクラスタのサービス、Amazon MSKのパブリックプレビューが東京リージョンに来ました。まだ、パブリックプレビューですが記念に試してみました。

Kafka用クライアントの準備

MSK/kafkaクラスタは、VPC内に構築するためパブリックアクセスできません。そのため、Kafka用クライアントはVPC内にEC2インスタンスを作成する必要があります。Kafka用クライアントには、java-1.8.0とkafka_2.12-2.1.0のインストールは必要です。

Javaのインストール

[ec2-user@ip-10-0-0-168 ~]$sudo yum -y install java-1.8.0
読み込んだプラグイン:priorities, update-motd, upgrade-helper
依存性の解決をしています
--> トランザクションの確認を実行しています。
---> パッケージ java-1.8.0-openjdk.x86_64 1:1.8.0.201.b09-0.43.amzn1 を インストール
--> 依存性の処理をしています: java-1.8.0-openjdk-headless(x86-64) = 1:1.8.0.201.b09-0.43.amzn1 のパッケージ: 1:java-1.8.0-openjdk-1.8.0.201.b09-0.43.amzn1.x86_64
--> トランザクションの確認を実行しています。
---> パッケージ java-1.8.0-openjdk-headless.x86_64 1:1.8.0.201.b09-0.43.amzn1 を インストール
:
:
インストール:
  java-1.8.0-openjdk.x86_64 1:1.8.0.201.b09-0.43.amzn1

依存性関連をインストールしました:
  avahi-libs.x86_64 0:0.6.25-12.17.amzn1              cups-libs.x86_64 1:1.4.2-67.21.amzn1
  gnutls.x86_64 0:2.12.23-21.18.amzn1                 java-1.8.0-openjdk-headless.x86_64 1:1.8.0.201.b09-0.43.amzn1
  jbigkit-libs.x86_64 0:2.0-11.4.amzn1                libtiff.x86_64 0:4.0.3-27.29.amzn1
  lksctp-tools.x86_64 0:1.0.10-7.7.amzn1

完了しました!

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ sudo alternatives --config java

2 プログラムがあり 'java' を提供します。

  選択       コマンド
-----------------------------------------------
*+ 1           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
   2           /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java

Enter を押して現在の選択 [+] を保持するか、選択番号を入力します:2

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ java -version
openjdk version "1.8.0_201"
OpenJDK Runtime Environment (build 1.8.0_201-b09)
OpenJDK 64-Bit Server VM (build 25.201-b09, mixed mode)
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$

kafka_2.12-2.1.0インストール

このインストールは、Topicの作成やメッセージの送受信用プログラムを利用するためにインストールしました。プロダクションでは必ずしも必要ではありません。

[ec2-user@ip-10-0-0-168 ~]$ wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
--2019-04-30 09:58:10--  https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
archive.apache.org (archive.apache.org) をDNSに問いあわせています... 163.172.17.199
archive.apache.org (archive.apache.org)|163.172.17.199|:443 に接続しています... 接続しました。
HTTP による接続要求を送信しました、応答を待っています... 200 OK
長さ: 55201623 (53M) [application/x-gzip]
`kafka_2.12-2.1.0.tgz' に保存中

kafka_2.12-2.1.0.tgz             100%[=======================================================>]  52.64M  6.52MB/s    in 9.5s

2019-04-30 09:58:21 (5.56 MB/s) - `kafka_2.12-2.1.0.tgz' へ保存完了 [55201623/55201623]

[ec2-user@ip-10-0-0-168 ~]$ ll
合計 53908
-rw-rw-r-- 1 ec2-user ec2-user 55201623 11月 20 19:16 kafka_2.12-2.1.0.tgz
[ec2-user@ip-10-0-0-168 ~]$ tar -xzf kafka_2.12-2.1.0.tgz
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ ll
合計 52
-rw-r--r-- 1 ec2-user ec2-user 32216 11月  9 19:50 LICENSE
-rw-r--r-- 1 ec2-user ec2-user   336 11月  9 19:50 NOTICE
drwxr-xr-x 3 ec2-user ec2-user  4096 11月  9 19:54 bin
drwxr-xr-x 2 ec2-user ec2-user  4096 11月  9 19:54 config
drwxr-xr-x 2 ec2-user ec2-user  4096  4月 30 13:23 libs
drwxr-xr-x 2 ec2-user ec2-user  4096 11月  9 19:54 site-docs

AWSCLIのアップデート

執筆時点のAMI(ami-00a5245b4816c38e6)にインストールされているAWSCLIは古いため、MSKのサブコマンドがインストールされていませんでしたので、別途AWSCLIをアップデートします。

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ sudo pip install -U awscli
Collecting awscli
  Downloading https://files.pythonhosted.org/packages/ec/78/01f03354bf02d4c3d1a0de01cbe77529b79df4ea078284f1172690834586/awscli-1.16.148-py2.py3-none-any.whl (1.5MB)
    100% |████████████████████████████████| 1.5MB 802kB/s
Collecting colorama<=0.3.9,>=0.2.5 (from awscli)
  Downloading https://files.pythonhosted.org/packages/db/c8/7dcf9dbcb22429512708fe3a547f8b6101c0d02137acbd892505aee57adf/colorama-0.3.9-py2.py3-none-any.whl
Collecting rsa<=3.5.0,>=3.1.2 (from awscli)
  Downloading 
:
:
  Found existing installation: botocore 1.10.82
    Uninstalling botocore-1.10.82:
      Successfully uninstalled botocore-1.10.82
  Found existing installation: awscli 1.15.83
    Uninstalling awscli-1.15.83:
      Successfully uninstalled awscli-1.15.83
Successfully installed PyYAML-3.13 awscli-1.16.148 botocore-1.12.138 colorama-0.3.9 docutils-0.14 futures-3.2.0 jmespath-0.9.4 pyasn1-0.4.5 python-dateutil-2.8.0 rsa-3.4.2 s3transfer-0.2.0 six-1.12.0 urllib3-1.24.2
You are using pip version 9.0.3, however version 19.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

Amazon MSK/kafkaクラスタの構築

構築

今回はマネジメントコンソールからAmazon MSK/kafkaクラスタの構築します。re:Invent2018発表の際にはバージョンは1.1.0のみでしたが、バージョンは2.1.0を指定しました。

3つのアベイラビリティゾーンに作成しなければなりませんので、Kafkaブローカは3つ以上になります。

1つのアベイラビリティゾーンに作成するKafkaブローカを指定できます。

「高度な設定」で、設定をカスタマイズできます。今回は「デフォルト設定の使用」のまま作成しました。

Topicの作成

ZookeeperConnectString の取得

Topic作成時に必要となるZookeeperConnectStringを取得します。この値はAPI経由で取得しますが、今回はAWSCLIを利用します。引数にcluster-arnの引数にarn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3を指定しました。

aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"

なお、コマンドの出力のStateCREATINGの場合は、ACTIVEになってからコマンドを再実行してください。結果は、以下のとおりです。

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
{
    "ClusterInfo": {
        "EncryptionInfo": {
            "EncryptionAtRest": {
                "DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:1234567890123:key/928e86a0-8289-4eb6-8a2d-aff7afdd96b0"
            }
        },
        "BrokerNodeGroupInfo": {
            "BrokerAZDistribution": "DEFAULT",
            "ClientSubnets": [
                "subnet-ffbxxxxx",
                "subnet-8f2xxxxx",
                "subnet-0c7xxxxxxxxxxxxx"
            ],
            "StorageInfo": {
                "EbsStorageInfo": {
                    "VolumeSize": 1000
                }
            },
            "SecurityGroups": [
                "sg-5b58b23e"
            ],
            "InstanceType": "kafka.m5.large"
        },
        "ClusterName": "classmethod-cluster",
        "CurrentBrokerSoftwareInfo": {
            "KafkaVersion": "2.1.0"
        },
        "CreationTime": "2019-04-30T13:21:52.115Z",
        "NumberOfBrokerNodes": 3,
        "ZookeeperConnectString": "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181",
        "State": "ACTIVE",
        "CurrentVersion": "K1VC38T7YXB528",
        "ClusterArn": "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3",
        "EnhancedMonitoring": "DEFAULT"
    }
}

Topicの作成

kafka-topics.shのzookeeper引数にZookeeperConnectStringの値10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181を設定してコマンドを実行します。トピック名はClassmethodTopicを指定しました。

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-topics.sh --create --zookeeper "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181" --replication-factor 3 --partitions 1 --topic ClassmethodTopic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "ClassmethodTopic".

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-topics.sh --list --zookeeper "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181"
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
ClassmethodTopic
__consumer_offsets

メッセージの送受信

メッセージの送受信には、先ほどダウンロードしたkafka_2.12-2.1.0のコマンドを用いて動作確認します。ここではkafkaに対してメッセージを送信する役割を「Producer」、kafkaからメッセージを受信する役割を「Consumer」と呼びます。

※ Kafkaは、GoFのデザインパターンProducer-Consumerパターンの用語を用いられることが多いです。

BootstrapBrokerStringの取得

メッセージの送受信に必要となるBootstrapBrokerStringの取得します。この値はAPI経由で取得しますが、今回はAWSCLIを利用します。引数にcluster-arnの引数に"arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"を指定しました。

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ aws kafka get-bootstrap-brokers --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
{
    "BootstrapBrokerString": "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092"
}

MSK(Kafka)にメッセージを送信 - Producer

bin/kafka-console-producer.sh --broker-list "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" --topic ClassmethodTopic

上記コマンド実行後、以下のメッセージをコンソールに続けて入力します。

一つ、人の世の生血をすすり
二つ、ふらちな悪行残名
三つ、醜い浮き世の鬼を退治てくれよう、桃太郎

MSK(Kafka)からメッセージを受信 - Consumer

別のターミナルを立ち上げ、以下のコマンドを実行します。すると、メッセージ送信で入力したメッセージが受信されました。マルチバイト(日本語)も問題ありません。

[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" --topic ClassmethodTopic --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
一つ、人の世の生血をすすり
二つ、ふらちな悪行残名
三つ、醜い浮き世の鬼を退治てくれよう、桃太郎

最後に

HadoopやSparkを用いたストリームサービスをクラウドにマイグレーションする際に、EMR+セルフマネージドなKafkaをEC2上に構築するではなく、EMR+MSKにできる日が近くなりそうです。プレビューなのでGAが最終的に同じ仕様とは限りませんが、使い勝手の良いサービスになりそうで今から楽しみです。

合わせて読みたい

[新サービス]フルマネージドなApache Kafka、Amazon Managed Streaming for Kafka (MSK)が発表されました #reinvent

[レポート]ANT398 – Amazon Managed Streaming for Kafka (Amazon MSK)入門 #reinvent