Kafka 用クライアントの Kafdrop で Aiven for Apache Kafka に接続する

Kafka の Web UI である Kafdrop を触ってみました。

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

ウィスキー、シガー、パイプをこよなく愛する大栗です。

クラウド データ プラットフォームの Aiven はデータベースだけでなく、Kafka のような他のデータ処理機能も利用可能です。Kafka 用のクライアントツール である Kafdrop で接続してみます。

Kafdrop?

Kafdrop は Kafka 用の Web UI でブローカーやトピックなどの情報やメッセージの表示を行えます。

obsidiandynamics/kafdrop

やってみる

Aiven for Apache Kafka を起動する

Aiven for Apache Kafka の起動は以下のエントリを参考にしてください。

サービスが起動したら、すぐにアクセスを許可する IP アドレスを絞ります。まず以下のようなサービスでアクセス元のグローバル IP アドレスを確認します。

次に Aiven Console で Kafka のサービスのOverviewタブにあるAllowed IP Addressesを確認します。おそらくデフォルトの0.0.0.0/0になっています。何処からでもアクセス可能な状態なので自分の IP アドレスからだけ接続できるように設定しましょう。Changeをクリックします。

先に設定されている0.0.0.0/0を消して、先程調べた自分の IP アドレスを設定します。設定したらSave Changesをクリックして確定します。

これで、取り敢えずはましになりました。

Kafka の接続情報を取得する

次に Aiven Console で Kafka のサービスのOverviewタブにあるConnection informationを確認して認証情報をダウンロードします。Access Key はservice.key、Access Certificate はservice.cert、CA Certificate はca.pem というファイル名でローカルマシンに保存します。

ダウンロードしたファイルを openssl コマンドで pkcs12 形式に変換します。

$ openssl pkcs12 -export -inkey service.key -in service.cert -out kafka.keystore.p12 -name demo_kafka_key
Enter Export Password:              # パスワードを設定
Verifying - Enter Export Password:  # パスワードを再度入力

keytool コマンドで CA 証明書をキーストアにロードします。

$ keytool -import -file ca.pem -alias KafkaCA -keystore kafka-client.truststore.jks
Enter keystore password:  # 6文字以上でパスワードを設定
Re-enter new password:    # パスワードを再度入力
Owner: CN=abcd1234-abcd-1234-abcd-12345678abcd Project CA
Issuer: CN=abcd1234-abcd-1234-abcd-12345678abcd Project CA
Serial number: 1a2b3c4d5e6f7g8h1a2b3c4d5e6f7g8h1a2b3c4d
Valid from: Thu Jan 06 04:43:48 UTC 2022 until: Sun Jan 04 04:43:48 UTC 2032
Certificate fingerprints:
 SHA1: 1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A
 SHA256: 1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A
Signature algorithm name: SHA384withRSA
Subject Public Key Algorithm: 3072-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
  CA:true
  PathLen:0
]

#2: ObjectId: 2.5.29.15 Criticality=false
KeyUsage [
  Key_CertSign
  Crl_Sign
]

#3: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 1A 1A 1A 1A 1A 1A 1A 1A   1A 1A 1A 1A 1A 1A 1A 1A  RLL..N...-.k.0.3
0010: 1A 1A 1A 1A                                        ./g;
]
]

Trust this certificate? [no]:  y  # y を入力
Certificate was added to keystore

Kafka に接続するための設定ファイルkafka.propertiesを作成します

security.protocol=SSL
ssl.keystore.password=$PASSWD         # openssl コマンドで設定したパスワードを入力
ssl.keystore.type=PKCS12
ssl.truststore.password=$TRUST-PASSWD # keytool コマンドで設定したパスワードを入力

これで接続するためのファイルであるkafka.keystore.p12kafka-client.truststore.jks``kafka.propertiesを作成しました。

Kafdrop で Kafka に接続する

Kafdrop を起動しますが、ここでは Docker を使用します。

$ 
:::    :::     :::     :::::::::: :::::::::  :::::::::   ::::::::  :::::::::        ::::::::  
:+:   :+:    :+: :+:   :+:        :+:    :+: :+:    :+: :+:    :+: :+:    :+:      :+:    :+: 
+:+  +:+    +:+   +:+  +:+        +:+    +:+ +:+    +:+ +:+    +:+ +:+    +:+             +:+ 
+#++:++    +#++:++#++: :#::+::#   +#+    +:+ +#++:++#:  +#+    +:+ +#++:++#+           +#++:  
+#+  +#+   +#+     +#+ +#+        +#+    +#+ +#+    +#+ +#+    +#+ +#+                    +#+ 
#+#   #+#  #+#     #+# #+#        #+#    #+# #+#    #+# #+#    #+# #+#             #+#    #+# 
###    ### ###     ### ###        #########  ###    ###  ########  ###              ########  

Writing Kafka properties into kafka.properties
Writing Kafka truststore into kafka.truststore.jks
Writing Kafka keystore into kafka.keystore.jks
2022-01-12 08:42:26.312  INFO ${sys:PID} [           main] k.Kafdrop$EnvironmentSetupListener       : Initializing JAAS config
2022-01-12 08:42:26.325  INFO ${sys:PID} [           main] k.Kafdrop$EnvironmentSetupListener       : env: null .isSecured kafka: false
2022-01-12 08:42:26.326  INFO ${sys:PID} [           main] k.Kafdrop$EnvironmentSetupListener       : Env: null
2022-01-12 08:42:26.547  INFO 1 [kground-preinit] o.h.v.i.u.Version                        : HV000001: Hibernate Validator 6.2.0.Final
・
・
・

これで Kafdrop が起動しているのでアクセスしてみます。ポート 9000 でコンテナを起動しているので http://localhost:9000 にアクセスします。Kafdrop でトピックを新しく作成してみましょう。一番下にある+ Newをクリックします。

Topic nameにトピック名を設定します。ここではTestTopic1とします。入力したら+Createをクリックします。

成功すると一番下に Successfully create topic TestTopic1 と出てきます。TestTopic1がリンクになっているのでクリックします。

このように Kafdrop 経由で Kafka の操作可能です。

さいごに

著者は Kafka を触ったことがなくどう使ったら良いかよく分かっていないのですが、Kafdrop で Kafka の状態を確認するのが容易になりました。これを機会にもう少し Kafka を触っていこうと思います。