この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
先日のアップデートで Amazon MSK(Managed Streaming for Apache Kafka) を Lambda のソースイベントとして利用できるようになりました!
アップデートされて直ぐに触ってたのですが、まだ MSK まわりは情報も少なく、うまく検証が進まなかったのですが、本日リリースされていた公式ブログでようやく理解が深まりました。(Thank you, James!!)
何がうれしいのか
Amazon MSK はフルマネージドな Apache Kafka のサービスで、最大のメリットは「完全互換」があることでしょう。現在、オンプレ環境で利用中の Apache Kafka を利用したデータパイプラインを大きな改修することなく AWS に移行することが容易です。
しかし、Lambda などの AWS サービスと連携させるためには Kafka Connector などのフレームワークを EC2 で準備し、コンシューマー側の Sink Connector からトピックを pull し Lambda を呼び出す、といった仕組みが必要でした。
そのため、従来だと AWS サービス連携させるようなユースケースでは、やはり Amazon Kinesis Data Streams が推奨とされてきました。
それが今回のアップデートによって、コンシューマ側の Connector なしに Lambda を呼び出すことができるようになりました。Lambda を起点にその他の AWS サービスとの連携も柔軟にできますので、 Sink Connector を Lambda に置き換えることで可用性は Lambda 基盤に任せられますし、EC2 のコスト負担もなくなるので嬉しいかぎりではないでしょうか。
注意点
MSK ブローカーから Lambda を呼び出す
Amazon MSK はイベントソースマッピングを介して Lambda を呼び出します。そのため、MSK ブローカーが AWS のエンドポイントにアクセスできる経路が必要となります。多くの場合、ブローカーはプライベートサブネットに配置されていると思いますので、NAT ゲートウェイが必要となります。
(引用元:Using Amazon MSK as an event source for AWS Lambda)
また、Lambda も MSK クラスターが配置されいている VPC へのアクセスを必要とするため、VPC Lambda として作成します。
レコードの Key-Value セットは base64 エンコードされる
Amazon MSK レコードの Key-Value セットは base64 でエンコードされていますので、必要に応じてデコードが必要となります。
Amazon language=MSKレコードイベントの例
Received event:{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/Exampgit add lesMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
"records": {
"AWSKafkaTopic-0": [
{
"topic": "AWSKafkaTopic",
"partition": 0,
"offset": 0,
"timestamp": 1595035749700,
"timestampType": "CREATE_TIME",
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
}
]
}
}
サポートされない Kafka の機能
以下の機能は Lambda のイベントソースではサポートされません。
- 認証 : SSL および SASL ベースの認証はサポートされません
- Schema Registry
やってみる
はじめに
今回の検証はバージニアリージョンで実施しています。東京リージョンでも機能はサポートされているのですが、執筆時点においては何故か MSK のトピックにメッセージを投入しても Lambda を呼び出すことが出来ませんでした。
イベントソースマッピングのステータスを確認すると PROBLEM: Connection error, Please check your event source connection configuration.
と表示されます。
ずっと悩んでいたのですが、同樣の構成をバージニアで作ったところ、サクッと確認できましたのでリージョン固有の問題なのかもしれません。(いずれ解決されると思います)
事前準備
以下の環境は事前に作成済みとします。
- VPC
- サブネット
- NAT Gateway
- セキュリティグループ
- MSK クライアント(EC2)
- Amazon MSK クラスター
セキュリティグループ
MSK クラスターとの接続には以下のポートが必要となります。
ポート | 用途 |
---|---|
9092 | ブローカーがプレーンテキストでプロヂューサーおよびコンシューマと通信する |
9094 | ブローカーがTLSでプロヂューサーおよびコンシューマと通信する |
2181 | Apache ZooKeeperノードと通信する |
当セキュリティグループ自身からの通信のみを許可するように設定しておき、MSK クラスター、MSK クライアント、VPC Lambda にアタッチしておくとシンプルにまとめることが出来ます。
MSK クライアント
Amazon MSK はブローカーノードや、ZooKeeper ノードなどの基盤はマネージドサービスとして提供されていますが、Kafka そのものの設定等は AWS API では提供されておらず、従来の Kafka API を利用します。
そのため、Kafka API を発行するためのクライアントが必要となります。チュートリアルを参考にすると容易にセットアップできます。
Amazon MSK クラスターの作成
こちらも公式ガイドのチュートリアル に従い MSK クラスターを作成し、¥AWSKafkaTutorialTopic
トピックまで作成しておきます。
Lambda トリガーの設定
IAM ロールの作成
Amazon MKS 向けの AWS 管理ポリシー として AWSLambdaMSKExecutionRole
が提供されていますので、当該ポリシーをアタッチした IAM ロールを作成します。信頼ポリシーも忘れずに lambda.amazonaws.com
を設定してください。
AWSLambdaMSKExecutionRole
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka:DescribeCluster",
"kafka:GetBootstrapBrokers",
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeVpcs",
"ec2:DeleteNetworkInterface",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
関数の作成
今回は公式ブログよりコードを拝借いたしましたので、Node.js 12.x
で作成しています。コードは以下のとおり。
exports.handler = async (event) => {
// Iterate through keys
for (let key in event.records) {
console.log('Key: ', key)
// Iterate through records
event.records[key].map((record) => {
console.log('Record: ', record)
// Decode base64
const msg = Buffer.from(record.value, 'base64').toString()
console.log('Message:', msg)
})
}
}
MSK クラスターの所属する VPC にアクセスする必要があるため、VPC Lambda として作成しています。また、VPC Lambda にアタッチするセキュリティグループは事前に作成しておいた MSK クラスターまわりで共通のものを設定しました。
トリガーの追加(イベントソースマッピング設定)
トリガーの設定を確認すると [MSK] が選択可能になっていますので、これを選択します。事前に作成した MSK クラスターおよびトピック名を指定します。(チュートリアルに従って作成した場合、トピック名は AWSKafkaTutorialTopic
です)。開始位置は 水平トリム(Trim Horizon)
または最新(Latest)
のいずれかを選択し、追加します。
しばらくして以下のように Creating
から 有効
に変わっていれば設定完了です。
コンシューマグループはこのイベントソースマッピング UUID と同じ ID で作成されます。
$ aws lambda list-event-source-mappings --region us-east-1
EventSourceMappings:
- BatchSize: 100
EventSourceArn: arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/test-msk/2df3da3b-e692-489c-b076-026dafb88ecd-1
FunctionArn: arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:msk-test
LastModified: '2020-08-15T10:50:37.358000+09:00'
LastProcessingResult: No records processed
State: Enabled
StateTransitionReason: USER_INITIATED
Topics:
- AWSKafkaTutorialTopic
UUID: 870e908b-4d5a-4877-a287-1461d0eaef96
確認
準備ができたので、プロデューサーからメッセージを送信してみます。
$ ./kafka-console-producer.sh \
--broker-list "b-2.demo-cluster.4z02g6.c4.kafka.ap-northeast-1.amazonaws.com:9094,b-1.demo-cluster.4z02g6.c4.kafka.ap-northeast-1.amazonaws.com:9094" \
--producer.config client.properties \
--topic AWSKafkaTutorialTopic
>Classmethod!
Lambda のログを確認すると・・・
メッセージを受信できていますね!検証は以上です。
さいごに
これまで Lambda と連携するには、ひと手間が必要だった Amazon MSK ですが、今回のアップデートにより非常に簡単に MSK クラスターから Lambda 連携できるようになったことか確認できました。
Amazon MSK においても後続のサーバレスアーキテクチャと統合しやすくなる、良いアップデートではないでしょうか!
以上!大阪オフィスの丸毛(@marumo1981)でした!