I tried building a serverless Kafka environment with Amazon MSK Serverless + Lambda + EventBridge Pipes
I am from the Data Business Division, uehara.
In this article, I will build a serverless Kafka environment using Amazon MSK Serverless + Lambda + EventBridge Pipes.
Introduction
About Amazon MSK Serverless
Amazon MSK Serverless is a cluster type of Amazon MSK (Amazon Managed Streaming for Apache Kafka) that allows you to run Apache Kafka without managing or scaling cluster capacity.
For basic concepts of Kafka, articles like the following may be helpful.
The basic components of Kafka are as follows:
- Topic: A category for classifying messages. Similar to a database table concept
- Partition: A unit that divides a topic. Enables parallel processing and scalability
- Message: Individual data that is sent and received in Kafka
- Offset: A sequential number indicating the position of a message within a partition
Based on these components, there are four roles in a Kafka environment:
- Producer: The side that sends messages
- Consumer: The side that receives and processes messages
- Consumer Group: Groups multiple consumers to achieve load balancing
- Broker: The server that makes up the Kafka cluster
The role that MSK plays is that of a "Broker".
About Authentication
Kafka, like connecting to a database, has the concept of "authentication", but among several authentication methods, MSK Serverless only supports "IAM authentication".
For more details on authentication, please check the official documentation.### About Lambda Event Source Mapping
Kafka is a "pull-based architecture."
This means that the Consumer needs to fetch data on their own timing in response to data sent by the Producer. (The Consumer needs to poll)
However, by using Lambda's event source mapping, you can detect when a Producer sends data and immediately deliver that data to Lambda.
In event source mapping, this is achieved through an internal process that polls the Kafka broker every second.
However, as shown in the above document, Lambda invocation by event source mapping is done "synchronously."
This is by design, but in synchronous execution, when Lambda fails to process a record, Lambda will invoke the function again on the same record and continue processing until it succeeds or until the record disappears from the stream due to storage or time limitations.
In other words, while it's not exactly an "infinite loop," if there are problems with the processing, Lambda will continue to be invoked as long as the record exists in the stream.
For instance, if the issue is a temporary network problem, continuing to restart may be fine, but if the processing logic itself has a problem and fails, restarting won't solve the issue, which is not very desirable behavior.
To avoid this situation, you need to either handle error-causing messages within the Lambda's processing logic (such as setting aside messages that cause errors while allowing Lambda to complete normally), configure a failure destination for when errors occur, or implement other similar solutions.
About EventBridge Pipe
You can also detect message delivery to Kafka by using EventBridge Pipes.
For EventBridge Pipes as well, it appears that internally it uses Lambda's event source mapping resource.
In this case, I want to detect Kafka messages with EventBridge Pipe and execute Lambda "asynchronously."
*In the above case, if EventBridge successfully launches Lambda, it will notify Kafka of successful completion (advance the Offset) regardless of whether the Lambda execution itself succeeds or fails.## Application Architecture for This Project
The application architecture for this project is as follows:
Since the event source mapping poller operates within the VPC, PrivateLink or NAT GW is required to call AWS service APIs. In this case, we have configured a NAT GW.
For the Producer that sends messages, we will run it in CloudShell operating within the VPC.
In addition, EventBridge Pipe will asynchronously execute Lambda, and failed messages will be forwarded to a DLQ.
Resource Creation### File Preparation
The directory structure for the resources we will create is as follows. We will use AWS SAM to create these resources.
.
├── function
│ └── msk_processor.py
├── samconfig.toml
└── template.yaml
samconfig.toml
is the configuration file for SAM deployment. For this project, I've set it up as follows:
version = 0.1
[default]
region = "ap-northeast-1"
[default.build.parameters]
debug = true
[default.deploy.parameters]
stack_name = "uehara-msk-test"
s3_bucket = "<deployment S3 bucket name>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true
template.yaml
is the template file that defines the resources. (The code is folded due to its length)
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with Serverless Kafka"
Resources:
# VPC
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.10.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: uehara-msk-vpc
# Public subnet (AZ: ap-northeast-1a)
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.128/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: msk-public-subnet-1a
# Private subnet 1 (AZ: ap-northeast-1a)
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.0/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: msk-private-subnet-1a
# Private subnet 2 (AZ: ap-northeast-1c)
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.64/26
AvailabilityZone: ap-northeast-1c
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: msk-private-subnet-1c
# Internet Gateway
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: msk-internet-gateway
# Attach Internet Gateway to VPC
AttachGateway:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
# Elastic IP for NAT Gateway
NATGatewayEIP:
Type: AWS::EC2::EIP
DependsOn: AttachGateway
Properties:
Domain: vpc
Tags:
- Key: Name
Value: msk-nat-gateway-eip
# NAT Gateway
NATGateway:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NATGatewayEIP.AllocationId
SubnetId: !Ref PublicSubnet1
Tags:
- Key: Name
Value: msk-nat-gateway
# Public Route Table
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: msk-public-route-table
# Public Route (to Internet Gateway)
PublicRoute:
Type: AWS::EC2::Route
DependsOn: AttachGateway
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
# Association between Public Subnet and Route Table
PublicSubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet1
RouteTableId: !Ref PublicRouteTable
# Private Route Table
PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: msk-private-route-table
# Private Route (to NAT Gateway)
PrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NATGateway
# Association between Private Subnet 1 and Route Table
PrivateSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref PrivateRouteTable
# Association between Private Subnet 2 and Route Table
PrivateSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable
# Security Group
MSKSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for MSK Serverless cluster
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 9098
ToPort: 9098
CidrIp: 10.10.0.0/16
Description: MSK Serverless bootstrap servers (VPC access)
- IpProtocol: tcp
FromPort: 9092
ToPort: 9092
CidrIp: 10.10.0.0/16
Description: MSK Kafka protocol (VPC access)
Tags:
- Key: Name
Value: msk-serverless-sg
# MSK Serverless Cluster
MSKServerlessCluster:
Type: AWS::MSK::ServerlessCluster
Properties:
ClusterName: uehara-msk-serverless-cluster
VpcConfigs:
- SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroups:
- !Ref MSKSecurityGroup
ClientAuthentication:
Sasl:
Iam:
Enabled: true
# Lambda Execution Role
MSKProcessorExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: msk-processor-execution-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
# MSK Event Source Lambda Function
MSKProcessorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: msk-processor-function
CodeUri: function/
Handler: msk_processor.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Timeout: 300
MemorySize: 512
Role: !GetAtt MSKProcessorExecutionRole.Arn
Layers:
- "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
Environment:
Variables:
LOG_LEVEL: INFO
EventInvokeConfig:
MaximumRetryAttempts: 2
MaximumEventAgeInSeconds: 300
DeadLetterQueue:
Type: SQS
TargetArn: !GetAtt DeadLetterQueue.Arn
# DLQ
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: msk-dlq
MessageRetentionPeriod: 1209600 # 14 days
# Pipe Execution Role
PipeExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: pipes.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: MSKAccessPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kafka:DescribeCluster
- kafka:DescribeClusterV2
- kafka:GetBootstrapBrokers
- ec2:CreateNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:DescribeVpcs
- ec2:DeleteNetworkInterface
- ec2:DescribeSubnets
- ec2:DescribeSecurityGroups
Resource: "*"
- Effect: Allow
Action:
- kafka-cluster:Connect
- kafka-cluster:AlterCluster
- kafka-cluster:DescribeCluster
Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/uehara-msk-serverless-cluster/*"
- Effect: Allow
Action:
- kafka-cluster:*Topic*
- kafka-cluster:WriteData
- kafka-cluster:ReadData
Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/uehara-msk-serverless-cluster/*"
- Effect: Allow
Action:
- kafka-cluster:AlterGroup
- kafka-cluster:DescribeGroup
Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/uehara-msk-serverless-cluster/*"
- PolicyName: LambdaInvoke
PolicyDocument:
Statement:
- Effect: Allow
Action: lambda:InvokeFunction
Resource: !GetAtt MSKProcessorFunction.Arn
# EventBridge Pipe
MSKPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: msk-to-lambda-pipe
Source: !GetAtt MSKServerlessCluster.Arn
Target: !GetAtt MSKProcessorFunction.Arn
RoleArn: !GetAtt PipeExecutionRole.Arn
SourceParameters:
ManagedStreamingKafkaParameters:
TopicName: test-topic
StartingPosition: LATEST
BatchSize: 10
TargetParameters:
LambdaFunctionParameters:
InvocationType: FIRE_AND_FORGET
Outputs:
VpcId:
Description: ID of the created VPC
Value: !Ref VPC
Export:
Name: !Sub "${AWS::StackName}-VpcId"
MSKClusterArn:
Description: ARN of the MSK Serverless cluster
Value: !GetAtt MSKServerlessCluster.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKClusterArn"
PrivateSubnet1Id:
Description: ID of Private Subnet 1
Value: !Ref PrivateSubnet1
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet1"
PrivateSubnet2Id:
Description: ID of Private Subnet 2
Value: !Ref PrivateSubnet2
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet2"
SecurityGroupId:
Description: ID of MSK Security Group
Value: !Ref MSKSecurityGroup
Export:
Name: !Sub "${AWS::StackName}-MSKSecurityGroup"
MSKProcessorFunctionArn:
Description: ARN of the MSK Processor Lambda function
Value: !GetAtt MSKProcessorFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"
MSKProcessorExecutionRoleArn:
Description: ARN of the MSK Processor Lambda execution role
Value: !GetAtt MSKProcessorExecutionRole.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorExecutionRoleArn"
PublicSubnet1Id:
Description: ID of Public Subnet 1
Value: !Ref PublicSubnet1
Export:
Name: !Sub "${AWS::StackName}-PublicSubnet1"
InternetGatewayId:
Description: ID of Internet Gateway
Value: !Ref InternetGateway
Export:
Name: !Sub "${AWS::StackName}-InternetGateway"
NATGatewayId:
Description: ID of NAT Gateway
Value: !Ref NATGateway
Export:
Name: !Sub "${AWS::StackName}-NATGateway"
NATGatewayEIPId:
Description: ID of NAT Gateway Elastic IP
Value: !Ref NATGatewayEIP
Export:
Name: !Sub "${AWS::StackName}-NATGatewayEIP"
PublicRouteTableId:
Description: ID of Public Route Table
Value: !Ref PublicRouteTable
Export:
Name: !Sub "${AWS::StackName}-PublicRouteTable"
PrivateRouteTableId:
Description: ID of Private Route Table
Value: !Ref PrivateRouteTable
Export:
Name: !Sub "${AWS::StackName}-PrivateRouteTable"
:::For details about what parameters can be configured in the ManagedStreamingKafkaParameters
section of EventBridge Pipe, please check the following documentation:
In the above configuration, we set a topic named test-topic
with a batch size (maximum number of messages processed at once) of 10.
The starting position is set to LATEST
, which specifies from which position messages should be read. (EventBridge Pipe allows setting either LATEST
or TRIM_HORIZON
)
The configuration for asynchronous Lambda execution is in the TargetParameters
section where InvocationType
is set to FIRE_AND_FORGET
.
By default, it is set to REQUEST_RESPONSE
(synchronous execution), so if no explicit error handling is implemented, Lambda will be repeatedly triggered when errors occur as mentioned earlier. (Reference)
For the Lambda itself, we've configured EventInvokeConfig
for asynchronous invocation settings, with the maximum retry attempts set to 2.
If failures exceed this limit, the message will be transferred to the configured DLQ.
The IAM role used by EventBridge Pipe includes policies necessary for MSK IAM authentication.
Finally, here is the Lambda function script file msk_processor.py
:
import base64
import json
import logging
from typing import Any, Dict, List
# Log configuration
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_kafka_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""
Process an individual Kafka record
Args:
record (Dict[str, Any]): Kafka record received from EventBridge Pipe
Returns:
Dict[str, Any]: Processed record data
"""
try:
topic_name = record.get("topic", "unknown")
# Decode Kafka message
if "value" in record:
# Decode Base64 encoded message
message_bytes = base64.b64decode(record["value"])
message_str = message_bytes.decode("utf-8")
try:
# Try parsing as JSON
message_data = json.loads(message_str)
except json.JSONDecodeError:
# If not JSON, treat as string
message_data = message_str
else:
message_data = None
# Extract EventBridge Pipe metadata
partition = record.get("partition", "unknown")
offset = record.get("offset", "unknown")
timestamp = record.get("timestamp", "unknown")
event_source = record.get("eventSource", "unknown")
event_source_arn = record.get("eventSourceArn", "unknown")
logger.info(
f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
)
logger.info(f"Message content: {message_data}")
# Implement business logic here
processed_data = {
"topic": topic_name,
"partition": partition,
"offset": offset,
"timestamp": timestamp,
"event_source": event_source,
"event_source_arn": event_source_arn,
"original_message": message_data,
"processing_status": "success",
}
return processed_data
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
return {
"topic": record.get("topic", "unknown"),
"partition": record.get("partition", "unknown"),
"offset": record.get("offset", "unknown"),
"error": str(e),
"processing_status": "error",
}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Lambda function to process Kafka messages from EventBridge Pipe
Args:
event (Dict[str, Any] | List[Dict[str, Any]]): Event data
Typically received as an array of Kafka records
If Kafka is directly specified as an event source, it will be in dictionary format with a "records" key
context (Any): Lambda execution context
Returns:
Dict[str, Any]: Lambda function execution result
"""
logger.info(f"Received event: {json.dumps(event, default=str)}")
processed_records = []
try:
# EventBridge Pipe event structure: event is directly an array of records
if isinstance(event, list):
logger.info(f"Processing {len(event)} records from EventBridge Pipe")
for record in event:
processed_record = process_kafka_record(record)
processed_records.append(processed_record)
else:
# Processing for compatibility when directly connected with MSK
logger.warning("Received non-array event, checking for legacy MSK format")
if "records" in event:
for topic_name, topic_records in event["records"].items():
logger.info(f"Processing topic (legacy format): {topic_name}")
for record in topic_records:
# Add topic name to record
record["topic"] = topic_name
processed_record = process_kafka_record(record)
processed_records.append(processed_record)
logger.info(f"Successfully processed {len(processed_records)} records")
return {
"statusCode": 200,
"body": json.dumps(
{
"message": f"Successfully processed {len(processed_records)} records",
"processed_records": len(processed_records),
}
),
}
except Exception as e:
logger.error(f"Error processing: {str(e)}")
raise
```Basically, it assumes events from EventBridge Pipe, but it's also designed to work when MSK is directly specified as a Lambda source through event source mapping.
As this is a sample script, no specific business logic is implemented, and it only outputs the received messages to the log.
### Deployment
You can deploy using the following command.
```bash
$ sam deploy
When the deployment is complete, a stack will be created with the specified stack name.## Verification
First, launch CloudShell within the VPC we created.
After CloudShell starts, download the Kafka CLI and libraries needed for MSK IAM authentication.
# Download Kafka CLI tools in CloudShell
$ wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
$ tar -xzf kafka_2.13-3.9.1.tgz
# Download MSK IAM authentication library
$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.3.0/aws-msk-iam-auth-2.3.0-all.jar
$ cp aws-msk-iam-auth-2.3.0-all.jar ./kafka_2.13-3.9.1/libs/
# Create client configuration file
$ cat > client.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF
After completing these steps, the files in the root directory should look like this:
Once ready, retrieve the Bootstrap servers and create a topic.
# Get Bootstrap server information
$ aws kafka get-bootstrap-brokers \
--cluster-arn "<ARN of the created Kafka cluster>"
# Create a topic
$ ./kafka_2.13-3.9.1/bin/kafka-topics.sh \
--bootstrap-server <obtained hostname>:9098 \
--command-config client.properties \
--create \
--topic test-topic \
--partitions 3
After creating the topic, send a message.
$ echo '{"message": "Hello Kafka"}' |
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
--bootstrap-server <obtained hostname>:9098 \
--producer.config client.properties \
--topic test-topic
When checking the Lambda logs, we can confirm that the message was successfully received.
If we deliberately make Lambda fail by setting raise Exception()
, we can confirm that after two retry attempts, the message is sent to the SQS configured as DLQ as shown below.
## Finally
This time, I built a serverless Kafka environment using Amazon MSK Serverless + Lambda + EventBridge Pipes.
I hope you found it helpful.
References
- Processing self-managed Apache Kafka messages with Lambda
- AWS::Lambda::EventSourceMapping
- AWS::Serverless::Function
- Create a client machine to access MSK Serverless cluster
- Create authorization policies for the IAM role
- Capturing discarded batches for an Amazon MSK event source
- re:Invent 2024: Real-time data processing using Kafka and Lambda by AWS