I tried building a serverless Kafka environment with Amazon MSK Serverless + Lambda + EventBridge Pipes

I tried building a serverless Kafka environment with Amazon MSK Serverless + Lambda + EventBridge Pipes

2025.09.02

I am from the Data Business Division, uehara.

In this article, I will build a serverless Kafka environment using Amazon MSK Serverless + Lambda + EventBridge Pipes.

Introduction

About Amazon MSK Serverless

Amazon MSK Serverless is a cluster type of Amazon MSK (Amazon Managed Streaming for Apache Kafka) that allows you to run Apache Kafka without managing or scaling cluster capacity.

For basic concepts of Kafka, articles like the following may be helpful.

https://medium.com/inspiredbrilliance/kafka-basics-and-core-concepts-5fd7a68c3193

The basic components of Kafka are as follows:

  • Topic: A category for classifying messages. Similar to a database table concept
  • Partition: A unit that divides a topic. Enables parallel processing and scalability
  • Message: Individual data that is sent and received in Kafka
  • Offset: A sequential number indicating the position of a message within a partition

0902_kafka_01
Source

Based on these components, there are four roles in a Kafka environment:

  • Producer: The side that sends messages
  • Consumer: The side that receives and processes messages
  • Consumer Group: Groups multiple consumers to achieve load balancing
  • Broker: The server that makes up the Kafka cluster

0902_kafka_02
Source

The role that MSK plays is that of a "Broker".

About Authentication

Kafka, like connecting to a database, has the concept of "authentication", but among several authentication methods, MSK Serverless only supports "IAM authentication".

0902_kafka_05
Source

For more details on authentication, please check the official documentation.### About Lambda Event Source Mapping
Kafka is a "pull-based architecture."

This means that the Consumer needs to fetch data on their own timing in response to data sent by the Producer. (The Consumer needs to poll)

However, by using Lambda's event source mapping, you can detect when a Producer sends data and immediately deliver that data to Lambda.

In event source mapping, this is achieved through an internal process that polls the Kafka broker every second.

0902_kafka_03
Source

However, as shown in the above document, Lambda invocation by event source mapping is done "synchronously."

This is by design, but in synchronous execution, when Lambda fails to process a record, Lambda will invoke the function again on the same record and continue processing until it succeeds or until the record disappears from the stream due to storage or time limitations.

In other words, while it's not exactly an "infinite loop," if there are problems with the processing, Lambda will continue to be invoked as long as the record exists in the stream.

For instance, if the issue is a temporary network problem, continuing to restart may be fine, but if the processing logic itself has a problem and fails, restarting won't solve the issue, which is not very desirable behavior.

To avoid this situation, you need to either handle error-causing messages within the Lambda's processing logic (such as setting aside messages that cause errors while allowing Lambda to complete normally), configure a failure destination for when errors occur, or implement other similar solutions.

About EventBridge Pipe

You can also detect message delivery to Kafka by using EventBridge Pipes.

For EventBridge Pipes as well, it appears that internally it uses Lambda's event source mapping resource.

0902_kafka_04
Source

In this case, I want to detect Kafka messages with EventBridge Pipe and execute Lambda "asynchronously."

*In the above case, if EventBridge successfully launches Lambda, it will notify Kafka of successful completion (advance the Offset) regardless of whether the Lambda execution itself succeeds or fails.## Application Architecture for This Project
The application architecture for this project is as follows:

0902_kafka_06

Since the event source mapping poller operates within the VPC, PrivateLink or NAT GW is required to call AWS service APIs. In this case, we have configured a NAT GW.

For the Producer that sends messages, we will run it in CloudShell operating within the VPC.

In addition, EventBridge Pipe will asynchronously execute Lambda, and failed messages will be forwarded to a DLQ.

Resource Creation### File Preparation

The directory structure for the resources we will create is as follows. We will use AWS SAM to create these resources.

			
			.
├── function
│   └── msk_processor.py
├── samconfig.toml
└── template.yaml

		

samconfig.toml is the configuration file for SAM deployment. For this project, I've set it up as follows:

samconfig.toml
			
			version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "uehara-msk-test"
s3_bucket = "<deployment S3 bucket name>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true

		

template.yaml is the template file that defines the resources. (The code is folded due to its length)

template.yaml
template.yaml
			
			AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with Serverless Kafka"

Resources:
  # VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.10.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: uehara-msk-vpc

  # Public subnet (AZ: ap-northeast-1a)
  PublicSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.128/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: true
      Tags:
        - Key: Name
          Value: msk-public-subnet-1a

  # Private subnet 1 (AZ: ap-northeast-1a)
  PrivateSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.0/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: msk-private-subnet-1a

  # Private subnet 2 (AZ: ap-northeast-1c)
  PrivateSubnet2:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.64/26
      AvailabilityZone: ap-northeast-1c
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: msk-private-subnet-1c

  # Internet Gateway
  InternetGateway:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: msk-internet-gateway

  # Attach Internet Gateway to VPC
  AttachGateway:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref InternetGateway

  # Elastic IP for NAT Gateway
  NATGatewayEIP:
    Type: AWS::EC2::EIP
    DependsOn: AttachGateway
    Properties:
      Domain: vpc
      Tags:
        - Key: Name
          Value: msk-nat-gateway-eip

  # NAT Gateway
  NATGateway:
    Type: AWS::EC2::NatGateway
    Properties:
      AllocationId: !GetAtt NATGatewayEIP.AllocationId
      SubnetId: !Ref PublicSubnet1
      Tags:
        - Key: Name
          Value: msk-nat-gateway

  # Public Route Table
  PublicRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: msk-public-route-table

  # Public Route (to Internet Gateway)
  PublicRoute:
    Type: AWS::EC2::Route
    DependsOn: AttachGateway
    Properties:
      RouteTableId: !Ref PublicRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway

  # Association between Public Subnet and Route Table
  PublicSubnetRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PublicSubnet1
      RouteTableId: !Ref PublicRouteTable

  # Private Route Table
  PrivateRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: msk-private-route-table

  # Private Route (to NAT Gateway)
  PrivateRoute:
    Type: AWS::EC2::Route
    Properties:
      RouteTableId: !Ref PrivateRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      NatGatewayId: !Ref NATGateway

  # Association between Private Subnet 1 and Route Table
  PrivateSubnet1RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet1
      RouteTableId: !Ref PrivateRouteTable

  # Association between Private Subnet 2 and Route Table
  PrivateSubnet2RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet2
      RouteTableId: !Ref PrivateRouteTable

  # Security Group
  MSKSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for MSK Serverless cluster
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 9098
          ToPort: 9098
          CidrIp: 10.10.0.0/16
          Description: MSK Serverless bootstrap servers (VPC access)
        - IpProtocol: tcp
          FromPort: 9092
          ToPort: 9092
          CidrIp: 10.10.0.0/16
          Description: MSK Kafka protocol (VPC access)
      Tags:
        - Key: Name
          Value: msk-serverless-sg

  # MSK Serverless Cluster
  MSKServerlessCluster:
    Type: AWS::MSK::ServerlessCluster
    Properties:
      ClusterName: uehara-msk-serverless-cluster
      VpcConfigs:
        - SubnetIds:
            - !Ref PrivateSubnet1
            - !Ref PrivateSubnet2
          SecurityGroups:
            - !Ref MSKSecurityGroup
      ClientAuthentication:
        Sasl:
          Iam:
            Enabled: true

  # Lambda Execution Role
  MSKProcessorExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: msk-processor-execution-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AmazonSQSFullAccess

  # MSK Event Source Lambda Function
  MSKProcessorFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: msk-processor-function
      CodeUri: function/
      Handler: msk_processor.lambda_handler
      Runtime: python3.11
      Architectures:
        - x86_64
      Timeout: 300
      MemorySize: 512
      Role: !GetAtt MSKProcessorExecutionRole.Arn
      Layers:
        - "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
      Environment:
        Variables:
          LOG_LEVEL: INFO
      EventInvokeConfig:
        MaximumRetryAttempts: 2
        MaximumEventAgeInSeconds: 300
      DeadLetterQueue:
        Type: SQS
        TargetArn: !GetAtt DeadLetterQueue.Arn

  # DLQ
  DeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: msk-dlq
      MessageRetentionPeriod: 1209600  # 14 days

  # Pipe Execution Role
  PipeExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service: pipes.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: MSKAccessPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kafka:DescribeCluster
                  - kafka:DescribeClusterV2
                  - kafka:GetBootstrapBrokers
                  - ec2:CreateNetworkInterface
                  - ec2:DescribeNetworkInterfaces
                  - ec2:DescribeVpcs
                  - ec2:DeleteNetworkInterface
                  - ec2:DescribeSubnets
                  - ec2:DescribeSecurityGroups
                Resource: "*"
              - Effect: Allow
                Action:
                  - kafka-cluster:Connect
                  - kafka-cluster:AlterCluster
                  - kafka-cluster:DescribeCluster
                Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/uehara-msk-serverless-cluster/*"
              - Effect: Allow
                Action:
                  - kafka-cluster:*Topic*
                  - kafka-cluster:WriteData
                  - kafka-cluster:ReadData
                Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/uehara-msk-serverless-cluster/*"
              - Effect: Allow
                Action:
                  - kafka-cluster:AlterGroup
                  - kafka-cluster:DescribeGroup
                Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/uehara-msk-serverless-cluster/*"
        - PolicyName: LambdaInvoke
          PolicyDocument:
            Statement:
              - Effect: Allow
                Action: lambda:InvokeFunction
                Resource: !GetAtt MSKProcessorFunction.Arn

  # EventBridge Pipe
  MSKPipe:
    Type: AWS::Pipes::Pipe
    Properties:
      Name: msk-to-lambda-pipe
      Source: !GetAtt MSKServerlessCluster.Arn
      Target: !GetAtt MSKProcessorFunction.Arn
      RoleArn: !GetAtt PipeExecutionRole.Arn

      SourceParameters:
        ManagedStreamingKafkaParameters:
          TopicName: test-topic
          StartingPosition: LATEST
          BatchSize: 10

      TargetParameters:
        LambdaFunctionParameters:
          InvocationType: FIRE_AND_FORGET

Outputs:
  VpcId:
    Description: ID of the created VPC
    Value: !Ref VPC
    Export:
      Name: !Sub "${AWS::StackName}-VpcId"

  MSKClusterArn:
    Description: ARN of the MSK Serverless cluster
    Value: !GetAtt MSKServerlessCluster.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKClusterArn"

  PrivateSubnet1Id:
    Description: ID of Private Subnet 1
    Value: !Ref PrivateSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet1"

  PrivateSubnet2Id:
    Description: ID of Private Subnet 2
    Value: !Ref PrivateSubnet2
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet2"

  SecurityGroupId:
    Description: ID of MSK Security Group
    Value: !Ref MSKSecurityGroup
    Export:
      Name: !Sub "${AWS::StackName}-MSKSecurityGroup"

  MSKProcessorFunctionArn:
    Description: ARN of the MSK Processor Lambda function
    Value: !GetAtt MSKProcessorFunction.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"

  MSKProcessorExecutionRoleArn:
    Description: ARN of the MSK Processor Lambda execution role
    Value: !GetAtt MSKProcessorExecutionRole.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorExecutionRoleArn"

  PublicSubnet1Id:
    Description: ID of Public Subnet 1
    Value: !Ref PublicSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PublicSubnet1"

  InternetGatewayId:
    Description: ID of Internet Gateway
    Value: !Ref InternetGateway
    Export:
      Name: !Sub "${AWS::StackName}-InternetGateway"

  NATGatewayId:
    Description: ID of NAT Gateway
    Value: !Ref NATGateway
    Export:
      Name: !Sub "${AWS::StackName}-NATGateway"

  NATGatewayEIPId:
    Description: ID of NAT Gateway Elastic IP
    Value: !Ref NATGatewayEIP
    Export:
      Name: !Sub "${AWS::StackName}-NATGatewayEIP"

  PublicRouteTableId:
    Description: ID of Public Route Table
    Value: !Ref PublicRouteTable
    Export:
      Name: !Sub "${AWS::StackName}-PublicRouteTable"

  PrivateRouteTableId:
    Description: ID of Private Route Table
    Value: !Ref PrivateRouteTable
    Export:
      Name: !Sub "${AWS::StackName}-PrivateRouteTable"

		

:::For details about what parameters can be configured in the ManagedStreamingKafkaParameters section of EventBridge Pipe, please check the following documentation:

https://docs.aws.amazon.com/AWSCloudFormation/latest/TemplateReference/aws-properties-pipes-pipe-pipesourcemanagedstreamingkafkaparameters.html

In the above configuration, we set a topic named test-topic with a batch size (maximum number of messages processed at once) of 10.

The starting position is set to LATEST, which specifies from which position messages should be read. (EventBridge Pipe allows setting either LATEST or TRIM_HORIZON)

0902_kafka_07
Source

The configuration for asynchronous Lambda execution is in the TargetParameters section where InvocationType is set to FIRE_AND_FORGET.

By default, it is set to REQUEST_RESPONSE (synchronous execution), so if no explicit error handling is implemented, Lambda will be repeatedly triggered when errors occur as mentioned earlier. (Reference)

For the Lambda itself, we've configured EventInvokeConfig for asynchronous invocation settings, with the maximum retry attempts set to 2.

If failures exceed this limit, the message will be transferred to the configured DLQ.

The IAM role used by EventBridge Pipe includes policies necessary for MSK IAM authentication.

Finally, here is the Lambda function script file msk_processor.py:

msk_processor.py
			
			import base64
import json
import logging
from typing import Any, Dict, List

# Log configuration
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_kafka_record(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Process an individual Kafka record

    Args:
        record (Dict[str, Any]): Kafka record received from EventBridge Pipe

    Returns:
        Dict[str, Any]: Processed record data
    """
    try:
        topic_name = record.get("topic", "unknown")

        # Decode Kafka message
        if "value" in record:
            # Decode Base64 encoded message
            message_bytes = base64.b64decode(record["value"])
            message_str = message_bytes.decode("utf-8")

            try:
                # Try parsing as JSON
                message_data = json.loads(message_str)
            except json.JSONDecodeError:
                # If not JSON, treat as string
                message_data = message_str
        else:
            message_data = None

        # Extract EventBridge Pipe metadata
        partition = record.get("partition", "unknown")
        offset = record.get("offset", "unknown")
        timestamp = record.get("timestamp", "unknown")
        event_source = record.get("eventSource", "unknown")
        event_source_arn = record.get("eventSourceArn", "unknown")

        logger.info(
            f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
        )
        logger.info(f"Message content: {message_data}")

        # Implement business logic here

        processed_data = {
            "topic": topic_name,
            "partition": partition,
            "offset": offset,
            "timestamp": timestamp,
            "event_source": event_source,
            "event_source_arn": event_source_arn,
            "original_message": message_data,
            "processing_status": "success",
        }

        return processed_data

    except Exception as e:
        logger.error(f"Error processing record: {str(e)}")
        return {
            "topic": record.get("topic", "unknown"),
            "partition": record.get("partition", "unknown"),
            "offset": record.get("offset", "unknown"),
            "error": str(e),
            "processing_status": "error",
        }

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """
    Lambda function to process Kafka messages from EventBridge Pipe

    Args:
        event (Dict[str, Any] | List[Dict[str, Any]]): Event data
            Typically received as an array of Kafka records
            If Kafka is directly specified as an event source, it will be in dictionary format with a "records" key
        context (Any): Lambda execution context

    Returns:
        Dict[str, Any]: Lambda function execution result
    """
    logger.info(f"Received event: {json.dumps(event, default=str)}")

    processed_records = []

    try:
        # EventBridge Pipe event structure: event is directly an array of records
        if isinstance(event, list):
            logger.info(f"Processing {len(event)} records from EventBridge Pipe")

            for record in event:
                processed_record = process_kafka_record(record)
                processed_records.append(processed_record)
        else:
            # Processing for compatibility when directly connected with MSK
            logger.warning("Received non-array event, checking for legacy MSK format")
            if "records" in event:
                for topic_name, topic_records in event["records"].items():
                    logger.info(f"Processing topic (legacy format): {topic_name}")

                    for record in topic_records:
                        # Add topic name to record
                        record["topic"] = topic_name
                        processed_record = process_kafka_record(record)
                        processed_records.append(processed_record)

        logger.info(f"Successfully processed {len(processed_records)} records")

        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": f"Successfully processed {len(processed_records)} records",
                    "processed_records": len(processed_records),
                }
            ),
        }

    except Exception as e:
        logger.error(f"Error processing: {str(e)}")
        raise
```Basically, it assumes events from EventBridge Pipe, but it's also designed to work when MSK is directly specified as a Lambda source through event source mapping.

As this is a sample script, no specific business logic is implemented, and it only outputs the received messages to the log.

### Deployment
You can deploy using the following command.

```bash
$ sam deploy

		

When the deployment is complete, a stack will be created with the specified stack name.## Verification
First, launch CloudShell within the VPC we created.

0902_kafka_08

After CloudShell starts, download the Kafka CLI and libraries needed for MSK IAM authentication.

			
			# Download Kafka CLI tools in CloudShell
$ wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
$ tar -xzf kafka_2.13-3.9.1.tgz

# Download MSK IAM authentication library
$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.3.0/aws-msk-iam-auth-2.3.0-all.jar
$ cp aws-msk-iam-auth-2.3.0-all.jar ./kafka_2.13-3.9.1/libs/

# Create client configuration file
$ cat > client.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF

		

After completing these steps, the files in the root directory should look like this:

0902_kafka_09

Once ready, retrieve the Bootstrap servers and create a topic.

			
			# Get Bootstrap server information
$ aws kafka get-bootstrap-brokers \
  --cluster-arn "<ARN of the created Kafka cluster>"

# Create a topic
$ ./kafka_2.13-3.9.1/bin/kafka-topics.sh \
  --bootstrap-server <obtained hostname>:9098 \
  --command-config client.properties \
  --create \
  --topic test-topic \
  --partitions 3

		

After creating the topic, send a message.

			
			$ echo '{"message": "Hello Kafka"}' | 
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
  --bootstrap-server <obtained hostname>:9098 \
  --producer.config client.properties \
  --topic test-topic

		

When checking the Lambda logs, we can confirm that the message was successfully received.

0902_kafka_10

If we deliberately make Lambda fail by setting raise Exception(), we can confirm that after two retry attempts, the message is sent to the SQS configured as DLQ as shown below.

0902_kafka_11## Finally
This time, I built a serverless Kafka environment using Amazon MSK Serverless + Lambda + EventBridge Pipes.

I hope you found it helpful.

References

Share this article

FacebookHatena blogX

Related articles