Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみた

Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみた

2025.09.02

データ事業本部のueharaです。

今回は、Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみたいと思います。

はじめに

Amazon MSK Serverlessについて

Amazon MSK ServerlessはAmazon MSK(Amazon Managed Streaming for Apache Kafka)のクラスタータイプで、クラスターの容量を管理およびスケールすることなくApache Kafkaを実行することができるサービスです。

Kafkaの基本的な概念については以下の記事などが参考になると思います。

https://medium.com/inspiredbrilliance/kafka-basics-and-core-concepts-5fd7a68c3193

Kafkaの基本的な構成要素としては以下の通りです。

  • Topic:メッセージを分類するカテゴリ。データベースのテーブルのような概念
  • Partition:Topicを分割した単位。並列処理とスケーラビリティを実現
  • Message:Kafkaで送受信される個々のデータ
  • Offset:パーティション内でのメッセージの位置を示す連番

0902_kafka_01
引用元

上記の構成要素をベースとし、Kafkaを利用する環境には以下の4つの役割が存在します。

  • Producer:メッセージを送信する側
  • Consumer:メッセージを受信・処理する側
  • Consumer Group:複数のConsumerをグループ化し、負荷分散を実現
  • Broker:Kafkaクラスターを構成するサーバー

0902_kafka_02
引用元

MSKが担う役割としては「Broker」となります。

認証について

KafkaもDBへの接続のように「認証」という概念があるのですが、いくつかある認証方式の中でもMSK Serverlessが対応しているのは「IAM認証」のみになります。

0902_kafka_05
引用元

認証について詳しく知りたい方は公式ドキュメントをご確認下さい。

Lambdaのイベントソースマッピングについて

Kafkaは「Pull型のアーキテクチャ」となります。

すなわち、Producerが送信したデータに対し、Consumerが任意のタイミングで自分でデータを取りに行く必要があります。(Consumerはポーリングする必要がある)

ただ、Lambdaのイベントソースマッピングを利用するとProducerのデータ送信を検知し、Lambdaに即座にデータを連携することができます。

これはイベントソースマッピングにおいて、内部的な処理としては1秒ごとにKafkaブローカーにポーリングを行うことで実現されています。

0902_kafka_03
引用元

ただ、上記資料の通り、イベントソースマッピングによるLambdaの起動は "同期的に(Synchronous)" 行われます。

これは仕様になるのですが、同期実行の場合Lambdaでレコードの処理に失敗した際Lambdaは同じレコードに対し関数を再度呼び出し、処理が成功するか、ストレージまたは時間の制限でレコードがストリームから消えるまで処理を継続します。

つまり、「無限ループ」とまでは言いませんが、処理に問題がある場合はストリームにレコードが存在する限りLambdaが起動し続けることになります。

例えば一時的なネットワークの問題等であれば再起動を続けるでも良いのですが、処理ロジック自体に問題があり失敗する場合は再起動しても解決する問題ではないので、あまり好ましい挙動とは言えません。

上記事象を回避したい場合は、Lambda内の処理ロジックとしてエラーが発生するメッセージに対応するか(エラーが発生したメッセージは別で取っておいてLambdaは正常終了する等)、障害発生時の送信先(Failure Destination)の設定、またはその他の類似する対応を実施する必要があります。

EventBridge Pipeについて

EventBridge Pipesを利用することでも、Kafkaへのメッセージ送信を検知することができます。

EventBridge Pipesについても、実際としては内部でLambdaのイベントソースマッピングリソースを使用しているようです。

0902_kafka_04
引用元

今回はEventBridge PipeでKafkaのメッセージを検知し、Lambdaを「非同期で」実行したいと思います。

※上記の場合、EventBridgeがLambdaの起動に成功すればLambda自体の実行成否に関わらずKafka側へは正常終了を通知する(Offsetを進める)。

今回作成するアプリケーションの構成図

今回作成するアプリケーションの構成図は以下の通りです。

0902_kafka_06

イベントソースマッピングのポーラーはVPC内で動作するため、AWSのサービスAPIを呼び出すためにPrivateLinkまたはNAT GWが必要となります。今回はNAT GWを設定しています。

メッセージを送信するProducerに関してはVPC内で動作させるCloudShellで実行しようと思います。

また、EventBridge PipeからLambdaを非同期実行し、失敗したメッセージについてはDLQに転送するようにします。

リソース作成

ファイル準備

今回作成するリソースのディレクトリ構成は以下の通りです。リソースについては AWS SAM を利用して作成することにします。

.
├── function
│   └── msk_processor.py
├── samconfig.toml
└── template.yaml

samconfig.toml はSAMでデプロイするための設定ファイルです。今回は以下のようにしてみました。

samconfig.toml
version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "uehara-msk-test"
s3_bucket = "<デプロイ用のS3バケット名>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true

template.yaml はリソースを定義したテンプレートファイルです。(長いためコードは折りたたんでおきます)

template.yaml
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with Serverless Kafka"

Resources:
  # VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.10.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: uehara-msk-vpc

  # パブリックサブネット (AZ: ap-northeast-1a)
  PublicSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.128/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: true
      Tags:
        - Key: Name
          Value: msk-public-subnet-1a

  # プライベートサブネット1 (AZ: ap-northeast-1a)
  PrivateSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.0/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: msk-private-subnet-1a

  # プライベートサブネット2 (AZ: ap-northeast-1c)
  PrivateSubnet2:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.64/26
      AvailabilityZone: ap-northeast-1c
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: msk-private-subnet-1c

  # インターネットゲートウェイ
  InternetGateway:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: msk-internet-gateway

  # インターネットゲートウェイをVPCにアタッチ
  AttachGateway:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref InternetGateway

  # NAT Gateway用のElastic IP
  NATGatewayEIP:
    Type: AWS::EC2::EIP
    DependsOn: AttachGateway
    Properties:
      Domain: vpc
      Tags:
        - Key: Name
          Value: msk-nat-gateway-eip

  # NAT Gateway
  NATGateway:
    Type: AWS::EC2::NatGateway
    Properties:
      AllocationId: !GetAtt NATGatewayEIP.AllocationId
      SubnetId: !Ref PublicSubnet1
      Tags:
        - Key: Name
          Value: msk-nat-gateway

  # パブリックルートテーブル
  PublicRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: msk-public-route-table

  # パブリックルート(インターネットゲートウェイへ)
  PublicRoute:
    Type: AWS::EC2::Route
    DependsOn: AttachGateway
    Properties:
      RouteTableId: !Ref PublicRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway

  # パブリックサブネットとルートテーブルの関連付け
  PublicSubnetRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PublicSubnet1
      RouteTableId: !Ref PublicRouteTable

  # プライベートルートテーブル
  PrivateRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: msk-private-route-table

  # プライベートルート(NAT Gatewayへ)
  PrivateRoute:
    Type: AWS::EC2::Route
    Properties:
      RouteTableId: !Ref PrivateRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      NatGatewayId: !Ref NATGateway

  # プライベートサブネット1とルートテーブルの関連付け
  PrivateSubnet1RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet1
      RouteTableId: !Ref PrivateRouteTable

  # プライベートサブネット2とルートテーブルの関連付け
  PrivateSubnet2RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet2
      RouteTableId: !Ref PrivateRouteTable

  # セキュリティグループ
  MSKSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for MSK Serverless cluster
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 9098
          ToPort: 9098
          CidrIp: 10.10.0.0/16
          Description: MSK Serverless bootstrap servers (VPC access)
        - IpProtocol: tcp
          FromPort: 9092
          ToPort: 9092
          CidrIp: 10.10.0.0/16
          Description: MSK Kafka protocol (VPC access)
      Tags:
        - Key: Name
          Value: msk-serverless-sg

  # MSK Serverlessクラスター
  MSKServerlessCluster:
    Type: AWS::MSK::ServerlessCluster
    Properties:
      ClusterName: uehara-msk-serverless-cluster
      VpcConfigs:
        - SubnetIds:
            - !Ref PrivateSubnet1
            - !Ref PrivateSubnet2
          SecurityGroups:
            - !Ref MSKSecurityGroup
      ClientAuthentication:
        Sasl:
          Iam:
            Enabled: true

  # Lambda実行ロール
  MSKProcessorExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: msk-processor-execution-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AmazonSQSFullAccess

  # MSKイベントソースLambda関数
  MSKProcessorFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: msk-processor-function
      CodeUri: function/
      Handler: msk_processor.lambda_handler
      Runtime: python3.11
      Architectures:
        - x86_64
      Timeout: 300
      MemorySize: 512
      Role: !GetAtt MSKProcessorExecutionRole.Arn
      Layers:
        - "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
      Environment:
        Variables:
          LOG_LEVEL: INFO
      EventInvokeConfig:
        MaximumRetryAttempts: 2
        MaximumEventAgeInSeconds: 300
      DeadLetterQueue:
        Type: SQS
        TargetArn: !GetAtt DeadLetterQueue.Arn

  # DLQ
  DeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: msk-dlq
      MessageRetentionPeriod: 1209600  # 14日

  # Pipe実行ロール
  PipeExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service: pipes.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: MSKAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kafka:DescribeCluster
                  - kafka:DescribeClusterV2
                  - kafka:GetBootstrapBrokers
                  - ec2:CreateNetworkInterface
                  - ec2:DescribeNetworkInterfaces
                  - ec2:DescribeVpcs
                  - ec2:DeleteNetworkInterface
                  - ec2:DescribeSubnets
                  - ec2:DescribeSecurityGroups
                Resource: "*"
              - Effect: Allow
                Action:
                  - kafka-cluster:Connect
                  - kafka-cluster:AlterCluster
                  - kafka-cluster:DescribeCluster
                Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/uehara-msk-serverless-cluster/*"
              - Effect: Allow
                Action:
                  - kafka-cluster:*Topic*
                  - kafka-cluster:WriteData
                  - kafka-cluster:ReadData
                Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/uehara-msk-serverless-cluster/*"
              - Effect: Allow
                Action:
                  - kafka-cluster:AlterGroup
                  - kafka-cluster:DescribeGroup
                Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/uehara-msk-serverless-cluster/*"
        - PolicyName: LambdaInvoke
          PolicyDocument:
            Statement:
              - Effect: Allow
                Action: lambda:InvokeFunction
                Resource: !GetAtt MSKProcessorFunction.Arn

  # EventBridge Pipe
  MSKPipe:
    Type: AWS::Pipes::Pipe
    Properties:
      Name: msk-to-lambda-pipe
      Source: !GetAtt MSKServerlessCluster.Arn
      Target: !GetAtt MSKProcessorFunction.Arn
      RoleArn: !GetAtt PipeExecutionRole.Arn

      SourceParameters:
        ManagedStreamingKafkaParameters:
          TopicName: test-topic
          StartingPosition: LATEST
          BatchSize: 10

      TargetParameters:
        LambdaFunctionParameters:
          InvocationType: FIRE_AND_FORGET

Outputs:
  VpcId:
    Description: ID of the created VPC
    Value: !Ref VPC
    Export:
      Name: !Sub "${AWS::StackName}-VpcId"

  MSKClusterArn:
    Description: ARN of the MSK Serverless cluster
    Value: !GetAtt MSKServerlessCluster.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKClusterArn"

  PrivateSubnet1Id:
    Description: ID of Private Subnet 1
    Value: !Ref PrivateSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet1"

  PrivateSubnet2Id:
    Description: ID of Private Subnet 2
    Value: !Ref PrivateSubnet2
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet2"

  SecurityGroupId:
    Description: ID of MSK Security Group
    Value: !Ref MSKSecurityGroup
    Export:
      Name: !Sub "${AWS::StackName}-MSKSecurityGroup"

  MSKProcessorFunctionArn:
    Description: ARN of the MSK Processor Lambda function
    Value: !GetAtt MSKProcessorFunction.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"

  MSKProcessorExecutionRoleArn:
    Description: ARN of the MSK Processor Lambda execution role
    Value: !GetAtt MSKProcessorExecutionRole.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorExecutionRoleArn"

  PublicSubnet1Id:
    Description: ID of Public Subnet 1
    Value: !Ref PublicSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PublicSubnet1"

  InternetGatewayId:
    Description: ID of Internet Gateway
    Value: !Ref InternetGateway
    Export:
      Name: !Sub "${AWS::StackName}-InternetGateway"

  NATGatewayId:
    Description: ID of NAT Gateway
    Value: !Ref NATGateway
    Export:
      Name: !Sub "${AWS::StackName}-NATGateway"

  NATGatewayEIPId:
    Description: ID of NAT Gateway Elastic IP
    Value: !Ref NATGatewayEIP
    Export:
      Name: !Sub "${AWS::StackName}-NATGatewayEIP"

  PublicRouteTableId:
    Description: ID of Public Route Table
    Value: !Ref PublicRouteTable
    Export:
      Name: !Sub "${AWS::StackName}-PublicRouteTable"

  PrivateRouteTableId:
    Description: ID of Private Route Table
    Value: !Ref PrivateRouteTable
    Export:
      Name: !Sub "${AWS::StackName}-PrivateRouteTable"

EventBridge Pipeで設定している ManagedStreamingKafkaParameters の部分に関して、具体的にどのようなパラメーターが設定できるのかは以下のドキュメントをご確認下さい。

https://docs.aws.amazon.com/AWSCloudFormation/latest/TemplateReference/aws-properties-pipes-pipe-pipesourcemanagedstreamingkafkaparameters.html

上記では test-topic という名前のトピックを設定し、バッチサイズ(1度に処理するメッセージの最大数)は10にしました。

開始ポジションは LATEST に設定していますが、これはどのような位置からメッセージを読むかを指定するものです。(EventBridge Pipeでは LATEST または TRIM_HORIZON を設定可能です)

0902_kafka_07
引用元

Lambdaを非同期実行するための設定は TargetParametersInvocationTypeFIRE_AND_FORGET としている部分です。

デフォルトでは REQUEST_RESPONSE (同期実行)となっているため、特に障害時の処理を明示的に入れてない場合は先に述べたようにエラー発生時にLambdaが繰り返し起動してしまいます。(参考

Lambda自体には EventInvokeConfig という非同期でLambdaを呼び出す場合の設定をしており、最大試行数は2に設定しています。

これを超えて失敗すると設定したDLQにメッセージが転送される形となります。

EventBridge Pipeが利用するIAMロールには、MSKのIAM認証に必要なポリシーを記述しています。

最後に、Lambda関数のスクリプトファイルである msk_processor.py は以下の通りです。

msk_processor.py
import base64
import json
import logging
from typing import Any, Dict, List

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_kafka_record(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    個別のKafkaレコードを処理する

    Args:
        record (Dict[str, Any]): EventBridge Pipeから受信したKafkaレコード

    Returns:
        Dict[str, Any]: 処理されたレコードデータ
    """
    try:
        topic_name = record.get("topic", "unknown")

        # Kafkaメッセージのデコード
        if "value" in record:
            # Base64でエンコードされたメッセージをデコード
            message_bytes = base64.b64decode(record["value"])
            message_str = message_bytes.decode("utf-8")

            try:
                # JSONとしてパース試行
                message_data = json.loads(message_str)
            except json.JSONDecodeError:
                # JSONでない場合は文字列として扱う
                message_data = message_str
        else:
            message_data = None

        # EventBridge Pipeのメタデータの抽出
        partition = record.get("partition", "unknown")
        offset = record.get("offset", "unknown")
        timestamp = record.get("timestamp", "unknown")
        event_source = record.get("eventSource", "unknown")
        event_source_arn = record.get("eventSourceArn", "unknown")

        logger.info(
            f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
        )
        logger.info(f"Message content: {message_data}")

        # ここでビジネスロジックを実装

        processed_data = {
            "topic": topic_name,
            "partition": partition,
            "offset": offset,
            "timestamp": timestamp,
            "event_source": event_source,
            "event_source_arn": event_source_arn,
            "original_message": message_data,
            "processing_status": "success",
        }

        return processed_data

    except Exception as e:
        logger.error(f"Error processing record: {str(e)}")
        return {
            "topic": record.get("topic", "unknown"),
            "partition": record.get("partition", "unknown"),
            "offset": record.get("offset", "unknown"),
            "error": str(e),
            "processing_status": "error",
        }

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    EventBridge PipeからのKafkaメッセージを処理するLambda関数

    Args:
        event (Dict[str, Any] | List[Dict[str, Any]]): イベントデータ
            通常はKafkaレコードの配列形式で受信される
            Kafkaを直接イベントソースとして指定した場合は辞書形式で "records" キーを含む
        context (Any): Lambda実行コンテキスト

    Returns:
        Dict[str, Any]: Lambda関数の実行結果
    """
    logger.info(f"Received event: {json.dumps(event, default=str)}")

    processed_records = []

    try:
        # EventBridge Pipeのイベント構造: eventは直接レコードの配列
        if isinstance(event, list):
            logger.info(f"Processing {len(event)} records from EventBridge Pipe")

            for record in event:
                processed_record = process_kafka_record(record)
                processed_records.append(processed_record)
        else:
            # MSKと直接接続された時の互換性のための処理
            logger.warning("Received non-array event, checking for legacy MSK format")
            if "records" in event:
                for topic_name, topic_records in event["records"].items():
                    logger.info(f"Processing topic (legacy format): {topic_name}")

                    for record in topic_records:
                        # レコードにトピック名を追加
                        record["topic"] = topic_name
                        processed_record = process_kafka_record(record)
                        processed_records.append(processed_record)

        logger.info(f"Successfully processed {len(processed_records)} records")

        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": f"Successfully processed {len(processed_records)} records",
                    "processed_records": len(processed_records),
                }
            ),
        }

    except Exception as e:
        logger.error(f"Error processing: {str(e)}")
        raise

基本的にはEventBridge Pipeからのイベントを想定していますが、イベントソースマッピングでLambdaのソースとしてMSKが直接指定された場合でも動くようにしております。

サンプルスクリプトのため特にビジネスロジックは実装しておらず、受け取ったメッセージをログに出力するのみとしています。

デプロイ

以下コマンドでデプロイ可能です。

$ sam deploy

デプロイが完了すると、指定したスタック名でスタックが作成されます。

動作確認

まず、今回作成したVPC内でCloudShellを起動します。

0902_kafka_08

CloudShellが起動したら、Kafka CLIやMSKのIAM認証のためのライブラリ等のDLを行います。

# CloudShellでKafka CLIツールをダウンロード
$ wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
$ tar -xzf kafka_2.13-3.9.1.tgz

# MSK IAM認証ライブラリのダウンロード
$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.3.0/aws-msk-iam-auth-2.3.0-all.jar
$ cp aws-msk-iam-auth-2.3.0-all.jar ./kafka_2.13-3.9.1/libs/

# クライアントの設定ファイルを作成
$ cat > client.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF

ここまで完了すると、直下のディレクトリファイルは以下のようになっていると思います。

0902_kafka_09

準備ができたら、Bootstrapサーバーの取得を行い、トピックの作成を行います。

# Bootstrap serverの取得
$ aws kafka get-bootstrap-brokers \
  --cluster-arn "<作成したKafkaクラスターのARN>"

# トピックの作成
$ ./kafka_2.13-3.9.1/bin/kafka-topics.sh \
  --bootstrap-server <取得したホスト名>:9098 \
  --command-config client.properties \
  --create \
  --topic test-topic \
  --partitions 3

トピックの作成が完了したら、メッセージの送信を行います。

$ echo '{"message": "Hello Kafka"}' | 
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
  --bootstrap-server <取得したホスト名>:9098 \
  --producer.config client.properties \
  --topic test-topic

Lambdaのログを確認すると、無事メッセージが取得できていることが確認できました。

0902_kafka_10

あえてLambdaを失敗するよう raise Exception() を設定して動作させると、2回の再試行の後以下の通りDLQに設定したSQSにメッセージが送信されていることも確認できました。

0902_kafka_11

最後に

今回は、Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみました。

参考になりましたら幸いです。

参考文献

この記事をシェアする

facebookのロゴhatenaのロゴtwitterのロゴ

© Classmethod, Inc. All rights reserved.