この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
昨年のre:Invent2018で発表されたフルマネージドのKafkaクラスタのサービス、Amazon MSKのパブリックプレビューが東京リージョンに来ました。まだ、パブリックプレビューですが記念に試してみました。
Kafka用クライアントの準備
MSK/kafkaクラスタは、VPC内に構築するためパブリックアクセスできません。そのため、Kafka用クライアントはVPC内にEC2インスタンスを作成する必要があります。Kafka用クライアントには、java-1.8.0とkafka_2.12-2.1.0のインストールは必要です。
Javaのインストール
[ec2-user@ip-10-0-0-168 ~]$sudo yum -y install java-1.8.0
読み込んだプラグイン:priorities, update-motd, upgrade-helper
依存性の解決をしています
--> トランザクションの確認を実行しています。
---> パッケージ java-1.8.0-openjdk.x86_64 1:1.8.0.201.b09-0.43.amzn1 を インストール
--> 依存性の処理をしています: java-1.8.0-openjdk-headless(x86-64) = 1:1.8.0.201.b09-0.43.amzn1 のパッケージ: 1:java-1.8.0-openjdk-1.8.0.201.b09-0.43.amzn1.x86_64
--> トランザクションの確認を実行しています。
---> パッケージ java-1.8.0-openjdk-headless.x86_64 1:1.8.0.201.b09-0.43.amzn1 を インストール
:
:
インストール:
java-1.8.0-openjdk.x86_64 1:1.8.0.201.b09-0.43.amzn1
依存性関連をインストールしました:
avahi-libs.x86_64 0:0.6.25-12.17.amzn1 cups-libs.x86_64 1:1.4.2-67.21.amzn1
gnutls.x86_64 0:2.12.23-21.18.amzn1 java-1.8.0-openjdk-headless.x86_64 1:1.8.0.201.b09-0.43.amzn1
jbigkit-libs.x86_64 0:2.0-11.4.amzn1 libtiff.x86_64 0:4.0.3-27.29.amzn1
lksctp-tools.x86_64 0:1.0.10-7.7.amzn1
完了しました!
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ sudo alternatives --config java
2 プログラムがあり 'java' を提供します。
選択 コマンド
-----------------------------------------------
*+ 1 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
2 /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java
Enter を押して現在の選択 [+] を保持するか、選択番号を入力します:2
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ java -version
openjdk version "1.8.0_201"
OpenJDK Runtime Environment (build 1.8.0_201-b09)
OpenJDK 64-Bit Server VM (build 25.201-b09, mixed mode)
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$
kafka_2.12-2.1.0インストール
このインストールは、Topicの作成やメッセージの送受信用プログラムを利用するためにインストールしました。プロダクションでは必ずしも必要ではありません。
[ec2-user@ip-10-0-0-168 ~]$ wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
--2019-04-30 09:58:10-- https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
archive.apache.org (archive.apache.org) をDNSに問いあわせています... 163.172.17.199
archive.apache.org (archive.apache.org)|163.172.17.199|:443 に接続しています... 接続しました。
HTTP による接続要求を送信しました、応答を待っています... 200 OK
長さ: 55201623 (53M) [application/x-gzip]
`kafka_2.12-2.1.0.tgz' に保存中
kafka_2.12-2.1.0.tgz 100%[=======================================================>] 52.64M 6.52MB/s in 9.5s
2019-04-30 09:58:21 (5.56 MB/s) - `kafka_2.12-2.1.0.tgz' へ保存完了 [55201623/55201623]
[ec2-user@ip-10-0-0-168 ~]$ ll
合計 53908
-rw-rw-r-- 1 ec2-user ec2-user 55201623 11月 20 19:16 kafka_2.12-2.1.0.tgz
[ec2-user@ip-10-0-0-168 ~]$ tar -xzf kafka_2.12-2.1.0.tgz
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ ll
合計 52
-rw-r--r-- 1 ec2-user ec2-user 32216 11月 9 19:50 LICENSE
-rw-r--r-- 1 ec2-user ec2-user 336 11月 9 19:50 NOTICE
drwxr-xr-x 3 ec2-user ec2-user 4096 11月 9 19:54 bin
drwxr-xr-x 2 ec2-user ec2-user 4096 11月 9 19:54 config
drwxr-xr-x 2 ec2-user ec2-user 4096 4月 30 13:23 libs
drwxr-xr-x 2 ec2-user ec2-user 4096 11月 9 19:54 site-docs
AWSCLIのアップデート
執筆時点のAMI(ami-00a5245b4816c38e6)にインストールされているAWSCLIは古いため、MSKのサブコマンドがインストールされていませんでしたので、別途AWSCLIをアップデートします。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ sudo pip install -U awscli
Collecting awscli
Downloading https://files.pythonhosted.org/packages/ec/78/01f03354bf02d4c3d1a0de01cbe77529b79df4ea078284f1172690834586/awscli-1.16.148-py2.py3-none-any.whl (1.5MB)
100% |████████████████████████████████| 1.5MB 802kB/s
Collecting colorama<=0.3.9,>=0.2.5 (from awscli)
Downloading https://files.pythonhosted.org/packages/db/c8/7dcf9dbcb22429512708fe3a547f8b6101c0d02137acbd892505aee57adf/colorama-0.3.9-py2.py3-none-any.whl
Collecting rsa<=3.5.0,>=3.1.2 (from awscli)
Downloading
:
:
Found existing installation: botocore 1.10.82
Uninstalling botocore-1.10.82:
Successfully uninstalled botocore-1.10.82
Found existing installation: awscli 1.15.83
Uninstalling awscli-1.15.83:
Successfully uninstalled awscli-1.15.83
Successfully installed PyYAML-3.13 awscli-1.16.148 botocore-1.12.138 colorama-0.3.9 docutils-0.14 futures-3.2.0 jmespath-0.9.4 pyasn1-0.4.5 python-dateutil-2.8.0 rsa-3.4.2 s3transfer-0.2.0 six-1.12.0 urllib3-1.24.2
You are using pip version 9.0.3, however version 19.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Amazon MSK/kafkaクラスタの構築
構築
今回はマネジメントコンソールからAmazon MSK/kafkaクラスタの構築します。re:Invent2018発表の際にはバージョンは1.1.0のみでしたが、バージョンは2.1.0を指定しました。
3つのアベイラビリティゾーンに作成しなければなりませんので、Kafkaブローカは3つ以上になります。
1つのアベイラビリティゾーンに作成するKafkaブローカを指定できます。
「高度な設定」で、設定をカスタマイズできます。今回は「デフォルト設定の使用」のまま作成しました。
Topicの作成
ZookeeperConnectString の取得
Topic作成時に必要となるZookeeperConnectString
を取得します。この値はAPI経由で取得しますが、今回はAWSCLIを利用します。引数にcluster-arnの引数にarn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3
を指定しました。
aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
なお、コマンドの出力のState
がCREATING
の場合は、ACTIVE
になってからコマンドを再実行してください。結果は、以下のとおりです。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
{
"ClusterInfo": {
"EncryptionInfo": {
"EncryptionAtRest": {
"DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:1234567890123:key/928e86a0-8289-4eb6-8a2d-aff7afdd96b0"
}
},
"BrokerNodeGroupInfo": {
"BrokerAZDistribution": "DEFAULT",
"ClientSubnets": [
"subnet-ffbxxxxx",
"subnet-8f2xxxxx",
"subnet-0c7xxxxxxxxxxxxx"
],
"StorageInfo": {
"EbsStorageInfo": {
"VolumeSize": 1000
}
},
"SecurityGroups": [
"sg-5b58b23e"
],
"InstanceType": "kafka.m5.large"
},
"ClusterName": "classmethod-cluster",
"CurrentBrokerSoftwareInfo": {
"KafkaVersion": "2.1.0"
},
"CreationTime": "2019-04-30T13:21:52.115Z",
"NumberOfBrokerNodes": 3,
"ZookeeperConnectString": "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181",
"State": "ACTIVE",
"CurrentVersion": "K1VC38T7YXB528",
"ClusterArn": "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3",
"EnhancedMonitoring": "DEFAULT"
}
}
Topicの作成
kafka-topics.shのzookeeper
引数にZookeeperConnectString
の値10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181
を設定してコマンドを実行します。トピック名はClassmethodTopic
を指定しました。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-topics.sh --create --zookeeper "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181" --replication-factor 3 --partitions 1 --topic ClassmethodTopic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "ClassmethodTopic".
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-topics.sh --list --zookeeper "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181"
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
ClassmethodTopic
__consumer_offsets
メッセージの送受信
メッセージの送受信には、先ほどダウンロードしたkafka_2.12-2.1.0のコマンドを用いて動作確認します。ここではkafkaに対してメッセージを送信する役割を「Producer」、kafkaからメッセージを受信する役割を「Consumer」と呼びます。
※ Kafkaは、GoFのデザインパターンProducer-Consumerパターンの用語を用いられることが多いです。
BootstrapBrokerStringの取得
メッセージの送受信に必要となるBootstrapBrokerStringの取得します。この値はAPI経由で取得しますが、今回はAWSCLIを利用します。引数にcluster-arnの引数に"arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
を指定しました。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ aws kafka get-bootstrap-brokers --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
{
"BootstrapBrokerString": "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092"
}
MSK(Kafka)にメッセージを送信 - Producer
bin/kafka-console-producer.sh --broker-list "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" --topic ClassmethodTopic
上記コマンド実行後、以下のメッセージをコンソールに続けて入力します。
一つ、人の世の生血をすすり
二つ、ふらちな悪行残名
三つ、醜い浮き世の鬼を退治てくれよう、桃太郎
MSK(Kafka)からメッセージを受信 - Consumer
別のターミナルを立ち上げ、以下のコマンドを実行します。すると、メッセージ送信で入力したメッセージが受信されました。マルチバイト(日本語)も問題ありません。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" --topic ClassmethodTopic --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
一つ、人の世の生血をすすり
二つ、ふらちな悪行残名
三つ、醜い浮き世の鬼を退治てくれよう、桃太郎
最後に
HadoopやSparkを用いたストリームサービスをクラウドにマイグレーションする際に、EMR+セルフマネージドなKafkaをEC2上に構築するではなく、EMR+MSKにできる日が近くなりそうです。プレビューなのでGAが最終的に同じ仕様とは限りませんが、使い勝手の良いサービスになりそうで今から楽しみです。
合わせて読みたい
[新サービス]フルマネージドなApache Kafka、Amazon Managed Streaming for Kafka (MSK)が発表されました #reinvent
[レポート]ANT398 – Amazon Managed Streaming for Kafka (Amazon MSK)入門 #reinvent