[アップデート] Lambda のイベントソースに Amazon MSK が設定できるようになりました

Lambda 連携がめちゃ簡単になりました!
2020.08.15

先日のアップデートで Amazon MSK(Managed Streaming for Apache Kafka) を Lambda のソースイベントとして利用できるようになりました!

アップデートされて直ぐに触ってたのですが、まだ MSK まわりは情報も少なく、うまく検証が進まなかったのですが、本日リリースされていた公式ブログでようやく理解が深まりました。(Thank you, James!!)

何がうれしいのか

Amazon MSK はフルマネージドな Apache Kafka のサービスで、最大のメリットは「完全互換」があることでしょう。現在、オンプレ環境で利用中の Apache Kafka を利用したデータパイプラインを大きな改修することなく AWS に移行することが容易です。

しかし、Lambda などの AWS サービスと連携させるためには Kafka Connector などのフレームワークを EC2 で準備し、コンシューマー側の Sink Connector からトピックを pull し Lambda を呼び出す、といった仕組みが必要でした。

そのため、従来だと AWS サービス連携させるようなユースケースでは、やはり Amazon Kinesis Data Streams が推奨とされてきました。

それが今回のアップデートによって、コンシューマ側の Connector なしに Lambda を呼び出すことができるようになりました。Lambda を起点にその他の AWS サービスとの連携も柔軟にできますので、 Sink Connector を Lambda に置き換えることで可用性は Lambda 基盤に任せられますし、EC2 のコスト負担もなくなるので嬉しいかぎりではないでしょうか。

注意点

MSK ブローカーから Lambda を呼び出す

Amazon MSK はイベントソースマッピングを介して Lambda を呼び出します。そのため、MSK ブローカーが AWS のエンドポイントにアクセスできる経路が必要となります。多くの場合、ブローカーはプライベートサブネットに配置されていると思いますので、NAT ゲートウェイが必要となります。

(引用元:Using Amazon MSK as an event source for AWS Lambda

また、Lambda も MSK クラスターが配置されいている VPC へのアクセスを必要とするため、VPC Lambda として作成します。

レコードの Key-Value セットは base64 エンコードされる

Amazon MSK レコードの Key-Value セットは base64 でエンコードされていますので、必要に応じてデコードが必要となります。

Amazon language=MSKレコードイベントの例

Received event:{
  "eventSource": "aws:kafka",
  "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/Exampgit add lesMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
  "records": {
    "AWSKafkaTopic-0": [
      {
        "topic": "AWSKafkaTopic",
        "partition": 0,
        "offset": 0,
        "timestamp": 1595035749700,
        "timestampType": "CREATE_TIME",
        "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
        "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
      }
    ]
  }
}

サポートされない Kafka の機能

以下の機能は Lambda のイベントソースではサポートされません。

  • 認証 : SSL および SASL ベースの認証はサポートされません
  • Schema Registry

やってみる

はじめに

今回の検証はバージニアリージョンで実施しています。東京リージョンでも機能はサポートされているのですが、執筆時点においては何故か MSK のトピックにメッセージを投入しても Lambda を呼び出すことが出来ませんでした。

イベントソースマッピングのステータスを確認すると PROBLEM: Connection error, Please check your event source connection configuration. と表示されます。

ずっと悩んでいたのですが、同樣の構成をバージニアで作ったところ、サクッと確認できましたのでリージョン固有の問題なのかもしれません。(いずれ解決されると思います)

事前準備

以下の環境は事前に作成済みとします。

  • VPC
  • サブネット
  • NAT Gateway
  • セキュリティグループ
  • MSK クライアント(EC2)
  • Amazon MSK クラスター

セキュリティグループ

MSK クラスターとの接続には以下のポートが必要となります。

ポート 用途
9092 ブローカーがプレーンテキストでプロヂューサーおよびコンシューマと通信する
9094 ブローカーがTLSでプロヂューサーおよびコンシューマと通信する
2181 Apache ZooKeeperノードと通信する

当セキュリティグループ自身からの通信のみを許可するように設定しておき、MSK クラスター、MSK クライアント、VPC Lambda にアタッチしておくとシンプルにまとめることが出来ます。

MSK クライアント

Amazon MSK はブローカーノードや、ZooKeeper ノードなどの基盤はマネージドサービスとして提供されていますが、Kafka そのものの設定等は AWS API では提供されておらず、従来の Kafka API を利用します。

そのため、Kafka API を発行するためのクライアントが必要となります。チュートリアルを参考にすると容易にセットアップできます。

Amazon MSK クラスターの作成

こちらも公式ガイドのチュートリアル に従い MSK クラスターを作成し、¥AWSKafkaTutorialTopic トピックまで作成しておきます。

Lambda トリガーの設定

IAM ロールの作成

Amazon MKS 向けの AWS 管理ポリシー として AWSLambdaMSKExecutionRole が提供されていますので、当該ポリシーをアタッチした IAM ロールを作成します。信頼ポリシーも忘れずに lambda.amazonaws.com を設定してください。

AWSLambdaMSKExecutionRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka:DescribeCluster",
                "kafka:GetBootstrapBrokers",
                "ec2:CreateNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeVpcs",
                "ec2:DeleteNetworkInterface",
                "ec2:DescribeSubnets",
                "ec2:DescribeSecurityGroups",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

関数の作成

今回は公式ブログよりコードを拝借いたしましたので、Node.js 12.x で作成しています。コードは以下のとおり。

exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}

MSK クラスターの所属する VPC にアクセスする必要があるため、VPC Lambda として作成しています。また、VPC Lambda にアタッチするセキュリティグループは事前に作成しておいた MSK クラスターまわりで共通のものを設定しました。

トリガーの追加(イベントソースマッピング設定)

トリガーの設定を確認すると [MSK] が選択可能になっていますので、これを選択します。事前に作成した MSK クラスターおよびトピック名を指定します。(チュートリアルに従って作成した場合、トピック名は AWSKafkaTutorialTopic です)。開始位置は 水平トリム(Trim Horizon)または最新(Latest)のいずれかを選択し、追加します。

しばらくして以下のように Creating から 有効 に変わっていれば設定完了です。

コンシューマグループはこのイベントソースマッピング UUID と同じ ID で作成されます。

$ aws lambda list-event-source-mappings --region us-east-1
EventSourceMappings:
- BatchSize: 100
  EventSourceArn: arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/test-msk/2df3da3b-e692-489c-b076-026dafb88ecd-1
  FunctionArn: arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:msk-test
  LastModified: '2020-08-15T10:50:37.358000+09:00'
  LastProcessingResult: No records processed
  State: Enabled
  StateTransitionReason: USER_INITIATED
  Topics:
  - AWSKafkaTutorialTopic
  UUID: 870e908b-4d5a-4877-a287-1461d0eaef96

確認

準備ができたので、プロデューサーからメッセージを送信してみます。

$ ./kafka-console-producer.sh \
   --broker-list "b-2.demo-cluster.4z02g6.c4.kafka.ap-northeast-1.amazonaws.com:9094,b-1.demo-cluster.4z02g6.c4.kafka.ap-northeast-1.amazonaws.com:9094" \
   --producer.config client.properties \
   --topic AWSKafkaTutorialTopic
>Classmethod!

Lambda のログを確認すると・・・

メッセージを受信できていますね!検証は以上です。

さいごに

これまで Lambda と連携するには、ひと手間が必要だった Amazon MSK ですが、今回のアップデートにより非常に簡単に MSK クラスターから Lambda 連携できるようになったことか確認できました。

Amazon MSK においても後続のサーバレスアーキテクチャと統合しやすくなる、良いアップデートではないでしょうか!

以上!大阪オフィスの丸毛(@marumo1981)でした!