Tried receiving messages in Lambda from another account using Amazon MSK's managed VPC connection
This is uehara from the Data Business Division.
In this article, I'll explore how to receive messages in Lambda from a different AWS account using Amazon MSK's managed VPC connection.
Introduction
Previously, I published an article titled "Building a Serverless Kafka Environment with Amazon MSK Serverless + Lambda + EventBridge Pipes."
This time, I will use Amazon MSK's managed VPC connection and event source mapping to receive messages in a Lambda function in a separate AWS account.
For information about event source mapping, please refer to the above article as needed.
Note that in this case, I'll be using a standard MSK cluster rather than a Serverless one.
About Amazon MSK Managed VPC Connection
Amazon MSK's managed VPC connection is a feature that allows you to connect to Amazon MSK clusters in different VPCs and AWS accounts using AWS PrivateLink.
Compared to other VPC connectivity methods like VPC peering, managed VPC connections offer the following advantages:
- Automates the operation and management of AWS PrivateLink
- Allows IP overlap between connected VPCs, eliminating the need for non-overlapping IP design or routing table management required by other VPC connectivity methods
However, there are also some constraints:
- Supported only with Apache Kafka 2.7.1 or later
- Cross-region connections are not supported; must be in the same region
- Setting up a managed VPC connection requires the same number of client subnets as cluster subnets, and the AZs of client subnets must match those of cluster subnets
- Does not support the t3.small instance type
- Does not support connections to Zookeeper nodes
- Supports IAM, TLS, and SASL/SCRAM authentication types (When using SASL/SCRAM or TLS, Apache Kafka ACLs must be configured on the cluster)
If these constraints are not an issue, it becomes very easy to receive MSK messages across different AWS accounts.
Architecture Diagram of the Application
Here's the architecture diagram for the application we'll be building:
For this demonstration, we'll assume the MSK cluster in the Producer account is already built.
Although in reality there would be one subnet in each of the two AZs and the MSK cluster would span across them, the diagram is simplified.
The configuration values for the MSK cluster used in this demonstration are:
Item | Value |
---|---|
KafkaVersion | 3.9.x |
NumberOfBrokerNodes | 2 |
InstanceType | kafka.m5.large |
Since configuration is required for both the Producer account (where MSK exists) and the Consumer account (where Lambda exists), we will create separate directories. |
.
├── consumer
│ ├── function
│ │ └── msk_processor.py
│ ├── samconfig.toml
│ └── template.yaml
└── producer
└── msk_cluster_policy.yml
First, the msk_cluster_policy.yml
for the Producer account's cluster policy is as follows.
AWSTemplateFormatVersion: 2010-09-09
Description: Apply a cluster policy to MSK cluster
Parameters:
MSKClusterArn:
Description: Arn of the MSK Cluster
Type: String
LambdaRoleArn:
Description: Arn of the IAM Role used by Lambda function
Type: String
LambdaAccountId:
Description: AWS Account ID where Lambda is deployed
Type: String
Resources:
MSKClusterPolicy:
Type: AWS::MSK::ClusterPolicy
Properties:
ClusterArn: !Ref MSKClusterArn
Policy:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
AWS: !Sub arn:aws:iam::${LambdaAccountId}:root
Action:
- kafka:CreateVpcConnection
- kafka:GetBootstrapBrokers
- kafka:DescribeCluster
- kafka:DescribeClusterV2
Resource: !Ref MSKClusterArn
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action:
- kafka-cluster:Connect
- kafka-cluster:AlterCluster
- kafka-cluster:DescribeCluster
Resource: !Ref MSKClusterArn
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action:
- kafka-cluster:*Topic*
- kafka-cluster:WriteData
- kafka-cluster:ReadData
Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/*
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action:
- kafka-cluster:AlterGroup
- kafka-cluster:DescribeGroup
Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/*
```The parameters are `MSKClusterArn`, `LambdaRoleArn`, and `LambdaAccountId`.
`MSKClusterArn` is the ARN of the created MSK cluster, and for `LambdaRoleArn` and `LambdaAccountID`, set the Consumer account information respectively.
The materials needed for the Producer account are as described above.
For the Consumer account, since we're building Lambda using **AWS SAM** this time, we've prepared files according to those requirements.
`samconfig.toml` is the AWS SAM configuration file for stack name, deployment bucket, etc.
```toml:samconfig.toml
version = 0.1
[default]
region = "ap-northeast-1"
[default.build.parameters]
debug = true
[default.deploy.parameters]
stack_name = "uehara-msk-test-cross-acount"
s3_bucket = "<S3 bucket name for deployment>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true
template.yaml
is the template file defining the resources. (The code is collapsed due to its length)
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with MSK Managed Connection"
Parameters:
MSKAccountId:
Description: AWS Account Id where MSK cluster is deployed
Type: String
MSKConnectionArn:
Description: ARN of the MSK Managed Connection
Default: ""
Type: String
Resources:
# VPC
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.10.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: uehara-test-vpc
# Private subnet 1 (AZ: ap-northeast-1a)
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.0/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: uehara-test-private-subnet-1a
# Private subnet 2 (AZ: ap-northeast-1c)
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.64/26
AvailabilityZone: ap-northeast-1c
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: uehara-test-private-subnet-1c
# Security group for VPC endpoints
VpcEndpointSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for VPC endpoints
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: !GetAtt VPC.CidrBlock
Tags:
- Key: "Name"
Value: !Sub "uehara-test-vpc-endpoint-sg"
# Lambda VPC endpoint (for event source mapping)
LambdaVPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref VPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.lambda'
VpcEndpointType: Interface
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroupIds:
- !Ref VpcEndpointSecurityGroup
PrivateDnsEnabled: true
# STS VPC endpoint (for event source mapping)
STSVPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref VPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.sts'
VpcEndpointType: Interface
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroupIds:
- !Ref VpcEndpointSecurityGroup
PrivateDnsEnabled: true
# Security group for managed connection
ManagedConnectionSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Managed Connection
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 14001
ToPort: 14100
CidrIp: !GetAtt VPC.CidrBlock
SecurityGroupEgress:
- Description: Allow all outbound traffic
IpProtocol: "-1"
CidrIp: 0.0.0.0/0
Tags:
- Key: "Name"
Value: !Sub "uehara-test-managed-connection-sg"
# Self-referencing configuration
ManagedConnectionSecurityGroupIngress:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Ref ManagedConnectionSecurityGroup
IpProtocol: tcp
FromPort: 14001
ToPort: 14100
SourceSecurityGroupId: !Ref ManagedConnectionSecurityGroup
# Lambda execution role
MSKProcessorExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: msk-processor-execution-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: "MSKAccessPolicy"
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kafka:DescribeVpcConnection
Resource: '*'
- Effect: Allow
Action:
- kafka-cluster:Connect
- kafka-cluster:DescribeClusterDynamicConfiguration
Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:cluster/*/*'
- Effect: Allow
Action:
- kafka-cluster:DescribeTopic
- kafka-cluster:ReadData
Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:topic/*/*'
- Effect: Allow
Action:
- kafka-cluster:AlterGroup
- kafka-cluster:DescribeGroup
Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:group/*/*'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
# MSK event source Lambda function
MSKProcessorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: msk-processor-function
CodeUri: function/
Handler: msk_processor.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Timeout: 300
MemorySize: 512
Role: !GetAtt MSKProcessorExecutionRole.Arn
Layers:
- "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
Environment:
Variables:
LOG_LEVEL: INFO
# # MSK event source mapping
# MSKEventSourceMapping:
# Type: AWS::Lambda::EventSourceMapping
# Properties:
# EventSourceArn: !Ref MSKConnectionArn
# FunctionName: !Ref MSKProcessorFunction
# StartingPosition: LATEST
# Topics:
# - test-topic
# BatchSize: 10
Outputs:
VpcId:
Description: ID of the created VPC
Value: !Ref VPC
Export:
Name: !Sub "${AWS::StackName}-VpcId"
PrivateSubnet1Id:
Description: ID of Private Subnet 1
Value: !Ref PrivateSubnet1
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet1"
PrivateSubnet2Id:
Description: ID of Private Subnet 2
Value: !Ref PrivateSubnet2
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet2"
MSKProcessorFunctionArn:
Description: ARN of the MSK Processor Lambda function
Value: !GetAtt MSKProcessorFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"
```:::
Here is one supplementary point: I've intentionally commented out the `MSKEventSourceMapping`.
This is meant to be deployed after setting up the MSK managed VPC connection, which will be explained in the later deployment steps, and is commented out because it shouldn't be deployed during the initial deployment.
I could have put it in a separate file, but for simplicity, I've kept everything in one file.
The created resources are as described in the comments, but as additional information, since the event source mapping operates in a private subnet, I've prepared VPC endpoints for Lambda and STS so that the event source mapping can invoke the Lambda function.
Alternatively, you could also set up a public subnet with a NAT Gateway and configure routing from the private subnet.
Finally, `msk_processor.py` is the script file for the Lambda function that will be triggered by the event source mapping.
```python:msk_processor.py
import base64
import json
import logging
from typing import Any, Dict, List
# Log configuration
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_kafka_record(record: Dict[str, Any], topic_name: str) -> Dict[str, Any]:
try:
# Decode Kafka message
if "value" in record:
# Decode Base64 encoded message
message_bytes = base64.b64decode(record["value"])
message_str = message_bytes.decode("utf-8")
try:
# Try to parse as JSON
message_data = json.loads(message_str)
except json.JSONDecodeError:
# If not JSON, treat as string
message_data = message_str
else:
message_data = None
# Extract metadata
partition = record.get("partition", "unknown")
offset = record.get("offset", "unknown")
timestamp = record.get("timestamp", "unknown")
logger.info(
f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
)
logger.info(f"Message content: {message_data}")
# Implement business logic here
processed_data = {
"topic": topic_name,
"partition": partition,
"offset": offset,
"timestamp": timestamp,
"original_message": message_data,
"processing_status": "success",
}
return processed_data
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
return {
"topic": topic_name,
"partition": record.get("partition", "unknown"),
"offset": record.get("offset", "unknown"),
"error": str(e),
"processing_status": "error",
}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
logger.info(f"Received event: {json.dumps(event, default=str)}")
processed_records = []
try:
# MSK event structure: records by topic are contained in event['records']
if "records" in event:
for topic_name, topic_records in event["records"].items():
logger.info(f"Processing topic: {topic_name}")
for record in topic_records:
processed_record = process_kafka_record(record, topic_name)
processed_records.append(processed_record)
logger.info(f"Successfully processed {len(processed_records)} records")
return {
"statusCode": 200,
"body": json.dumps(
{
"message": f"Successfully processed {len(processed_records)} records",
"processed_records": len(processed_records),
}
),
}
except Exception as e:
logger.error(f"Error processing MSK event: {str(e)}")
raise
```I am writing with the assumption of receiving events by directly specifying MSK as a Lambda source through event source mapping.
As this is a sample script, no specific business logic is implemented, and it only outputs received messages to the log.
## Deployment Procedure
Once the files are ready, deploy in the following order.
### 1. (Producer side) Configure multi-VPC connection for the MSK cluster
In the Producer account, set "Turn on multi-VPC connectivity" from the network settings for the target MSK cluster.

The authentication type will be IAM role-based authentication.

It takes some time for the configuration to complete, so please wait. (In my environment, it took about 40-50 minutes)
Once AWS PrivateLink is activated, the configuration is complete.

### 2. (Consumer side) Application deployment (excluding event source mapping)
After AWS PrivateLink is enabled, deploy the Lambda function to the Consumer account.
At this point, do not create the event source mapping yet.
Deploy by setting the Producer account ID parameter as follows:
```bash
$ sam deploy --parameter-overrides "MSKAccountId=<Producer account ID>"
3. (Producer side) Deploy cluster policy
Deploy CloudFormation in the Producer account with the prepared msk_cluster_policy.yml
.
This policy configures permissions for resources in the Consumer account to access MSK in the Producer account.
The parameters to be configured are as described in "File Preparation".
4. (Consumer side) Create managed VPC connection
Create an Amazon MSK managed VPC connection to access MSK in the Producer account from the Consumer account.
Configure the VPC, subnet, and security group that were deployed with AWS SAM earlier. (For the security group, select the security group created for managed VPC connection, not the security group for VPC endpoint)
When the status becomes "Available" as shown below, the setup is complete.
### 5. (Consumer side) Deployment of event source mapping
Uncomment the following section in template.yaml
.
# MSK event source mapping
MSKEventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !Ref MSKConnectionArn
FunctionName: !Ref MSKProcessorFunction
StartingPosition: LATEST
Topics:
- test-topic
BatchSize: 10
Specify the managed VPC connection created in step 4 as a parameter and deploy again with AWS SAM.
$ sam deploy --parameter-overrides "MSKAccountId=<Producer account ID>" "MSKConnectionArn=<Managed VPC connection ARN>"
After successful deployment, you should see MSK added as an event source as shown below.
This completes the deployment.
Verification
In the Producer account, send a message to MSK. (In my environment, I'm doing this from CloudShell launched in the same private subnet as MSK. For more details, please refer to the blog mentioned at the beginning)
echo '{"message": "Hello Kafka"}' |
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
--bootstrap-server <MSK Bootstrap server hostname>:9098 \
--producer.config client.properties \
--topic test-topic
By checking the Lambda logs in the Consumer account, we can confirm that the message was successfully received.
Conclusion
In this article, we demonstrated how to receive messages in a Lambda function in a different account using Amazon MSK's managed VPC connection.
I hope you found this helpful.