Tried receiving messages in Lambda from another account using Amazon MSK's managed VPC connection

Tried receiving messages in Lambda from another account using Amazon MSK's managed VPC connection

2025.09.05

This is uehara from the Data Business Division.

In this article, I'll explore how to receive messages in Lambda from a different AWS account using Amazon MSK's managed VPC connection.

Introduction

Previously, I published an article titled "Building a Serverless Kafka Environment with Amazon MSK Serverless + Lambda + EventBridge Pipes."

https://dev.classmethod.jp/articles/msk-serverless-lambda-eventbridge-pipes/

This time, I will use Amazon MSK's managed VPC connection and event source mapping to receive messages in a Lambda function in a separate AWS account.

For information about event source mapping, please refer to the above article as needed.

Note that in this case, I'll be using a standard MSK cluster rather than a Serverless one.

About Amazon MSK Managed VPC Connection

Amazon MSK's managed VPC connection is a feature that allows you to connect to Amazon MSK clusters in different VPCs and AWS accounts using AWS PrivateLink.

Compared to other VPC connectivity methods like VPC peering, managed VPC connections offer the following advantages:

  • Automates the operation and management of AWS PrivateLink
  • Allows IP overlap between connected VPCs, eliminating the need for non-overlapping IP design or routing table management required by other VPC connectivity methods

However, there are also some constraints:

  • Supported only with Apache Kafka 2.7.1 or later
  • Cross-region connections are not supported; must be in the same region
  • Setting up a managed VPC connection requires the same number of client subnets as cluster subnets, and the AZs of client subnets must match those of cluster subnets
  • Does not support the t3.small instance type
  • Does not support connections to Zookeeper nodes
  • Supports IAM, TLS, and SASL/SCRAM authentication types (When using SASL/SCRAM or TLS, Apache Kafka ACLs must be configured on the cluster)

If these constraints are not an issue, it becomes very easy to receive MSK messages across different AWS accounts.

Architecture Diagram of the Application

Here's the architecture diagram for the application we'll be building:

0905_msk_01

For this demonstration, we'll assume the MSK cluster in the Producer account is already built.

Although in reality there would be one subnet in each of the two AZs and the MSK cluster would span across them, the diagram is simplified.

The configuration values for the MSK cluster used in this demonstration are:

Item Value
KafkaVersion 3.9.x
NumberOfBrokerNodes 2
InstanceType kafka.m5.large
Since configuration is required for both the Producer account (where MSK exists) and the Consumer account (where Lambda exists), we will create separate directories.
			
			.
├── consumer
│   ├── function
│   │   └── msk_processor.py
│   ├── samconfig.toml
│   └── template.yaml
└── producer
    └── msk_cluster_policy.yml

		

First, the msk_cluster_policy.yml for the Producer account's cluster policy is as follows.

msk_cluster_policy.yml
			
			AWSTemplateFormatVersion: 2010-09-09
Description: Apply a cluster policy to MSK cluster
Parameters:
  MSKClusterArn:
    Description: Arn of the MSK Cluster
    Type: String
  LambdaRoleArn:
    Description: Arn of the IAM Role used by Lambda function
    Type: String
  LambdaAccountId:
    Description: AWS Account ID where Lambda is deployed
    Type: String

Resources:
  MSKClusterPolicy:
    Type: AWS::MSK::ClusterPolicy
    Properties:
      ClusterArn: !Ref MSKClusterArn
      Policy: 
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              AWS: !Sub arn:aws:iam::${LambdaAccountId}:root
            Action:
              - kafka:CreateVpcConnection
              - kafka:GetBootstrapBrokers
              - kafka:DescribeCluster
              - kafka:DescribeClusterV2
            Resource: !Ref MSKClusterArn
          - Effect: Allow
            Principal:
              AWS: !Ref LambdaRoleArn
            Action:
              - kafka-cluster:Connect
              - kafka-cluster:AlterCluster
              - kafka-cluster:DescribeCluster
            Resource: !Ref MSKClusterArn
          - Effect: Allow
            Principal:
              AWS: !Ref LambdaRoleArn
            Action:
              - kafka-cluster:*Topic*
              - kafka-cluster:WriteData
              - kafka-cluster:ReadData
            Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/*
          - Effect: Allow
            Principal:
              AWS: !Ref LambdaRoleArn
            Action:
              - kafka-cluster:AlterGroup
              - kafka-cluster:DescribeGroup
            Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/*
```The parameters are `MSKClusterArn`, `LambdaRoleArn`, and `LambdaAccountId`.

`MSKClusterArn` is the ARN of the created MSK cluster, and for `LambdaRoleArn` and `LambdaAccountID`, set the Consumer account information respectively.

The materials needed for the Producer account are as described above.

For the Consumer account, since we're building Lambda using **AWS SAM** this time, we've prepared files according to those requirements.

`samconfig.toml` is the AWS SAM configuration file for stack name, deployment bucket, etc.

```toml:samconfig.toml
version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "uehara-msk-test-cross-acount"
s3_bucket = "<S3 bucket name for deployment>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true

		

template.yaml is the template file defining the resources. (The code is collapsed due to its length)

template.yaml
template.yaml
			
			AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with MSK Managed Connection"

Parameters:
  MSKAccountId:
    Description: AWS Account Id where MSK cluster is deployed
    Type:  String
  MSKConnectionArn:
    Description: ARN of the MSK Managed Connection
    Default: ""
    Type: String

Resources:
  # VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.10.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: uehara-test-vpc

  # Private subnet 1 (AZ: ap-northeast-1a)
  PrivateSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.0/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: uehara-test-private-subnet-1a

  # Private subnet 2 (AZ: ap-northeast-1c)
  PrivateSubnet2:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.64/26
      AvailabilityZone: ap-northeast-1c
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: uehara-test-private-subnet-1c

  # Security group for VPC endpoints
  VpcEndpointSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for VPC endpoints
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: !GetAtt VPC.CidrBlock
      Tags:
        - Key: "Name"
          Value: !Sub "uehara-test-vpc-endpoint-sg"

  # Lambda VPC endpoint (for event source mapping)
  LambdaVPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcId: !Ref VPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.lambda'
      VpcEndpointType: Interface
      SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
      SecurityGroupIds:
        - !Ref VpcEndpointSecurityGroup
      PrivateDnsEnabled: true

  # STS VPC endpoint (for event source mapping)
  STSVPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcId: !Ref VPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.sts'
      VpcEndpointType: Interface
      SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
      SecurityGroupIds:
        - !Ref VpcEndpointSecurityGroup
      PrivateDnsEnabled: true

  # Security group for managed connection
  ManagedConnectionSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for Managed Connection
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 14001
          ToPort: 14100
          CidrIp: !GetAtt VPC.CidrBlock
      SecurityGroupEgress:
        - Description: Allow all outbound traffic
          IpProtocol: "-1"
          CidrIp: 0.0.0.0/0
      Tags:
        - Key: "Name"
          Value: !Sub "uehara-test-managed-connection-sg"

  # Self-referencing configuration
  ManagedConnectionSecurityGroupIngress:
    Type: AWS::EC2::SecurityGroupIngress
    Properties:
      GroupId: !Ref ManagedConnectionSecurityGroup
      IpProtocol: tcp
      FromPort: 14001
      ToPort: 14100
      SourceSecurityGroupId: !Ref ManagedConnectionSecurityGroup

  # Lambda execution role
  MSKProcessorExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: msk-processor-execution-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: "MSKAccessPolicy"
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kafka:DescribeVpcConnection
                Resource: '*'
              - Effect: Allow
                Action:
                  - kafka-cluster:Connect
                  - kafka-cluster:DescribeClusterDynamicConfiguration
                Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:cluster/*/*'
              - Effect: Allow
                Action:
                  - kafka-cluster:DescribeTopic
                  - kafka-cluster:ReadData
                Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:topic/*/*'
              - Effect: Allow
                Action:
                  - kafka-cluster:AlterGroup
                  - kafka-cluster:DescribeGroup
                Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:group/*/*'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole

  # MSK event source Lambda function
  MSKProcessorFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: msk-processor-function
      CodeUri: function/
      Handler: msk_processor.lambda_handler
      Runtime: python3.11
      Architectures:
        - x86_64
      Timeout: 300
      MemorySize: 512
      Role: !GetAtt MSKProcessorExecutionRole.Arn
      Layers:
        - "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
      Environment:
        Variables:
          LOG_LEVEL: INFO

  # # MSK event source mapping
  # MSKEventSourceMapping:
  #   Type: AWS::Lambda::EventSourceMapping
  #   Properties:
  #     EventSourceArn: !Ref MSKConnectionArn
  #     FunctionName: !Ref MSKProcessorFunction
  #     StartingPosition: LATEST
  #     Topics:
  #       - test-topic
  #     BatchSize: 10

Outputs:
  VpcId:
    Description: ID of the created VPC
    Value: !Ref VPC
    Export:
      Name: !Sub "${AWS::StackName}-VpcId"

  PrivateSubnet1Id:
    Description: ID of Private Subnet 1
    Value: !Ref PrivateSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet1"

  PrivateSubnet2Id:
    Description: ID of Private Subnet 2
    Value: !Ref PrivateSubnet2
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet2"

  MSKProcessorFunctionArn:
    Description: ARN of the MSK Processor Lambda function
    Value: !GetAtt MSKProcessorFunction.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"
```:::

Here is one supplementary point: I've intentionally commented out the `MSKEventSourceMapping`.

This is meant to be deployed after setting up the MSK managed VPC connection, which will be explained in the later deployment steps, and is commented out because it shouldn't be deployed during the initial deployment.

I could have put it in a separate file, but for simplicity, I've kept everything in one file.

The created resources are as described in the comments, but as additional information, since the event source mapping operates in a private subnet, I've prepared VPC endpoints for Lambda and STS so that the event source mapping can invoke the Lambda function.

Alternatively, you could also set up a public subnet with a NAT Gateway and configure routing from the private subnet.

Finally, `msk_processor.py` is the script file for the Lambda function that will be triggered by the event source mapping.

```python:msk_processor.py
import base64
import json
import logging
from typing import Any, Dict, List

# Log configuration
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_kafka_record(record: Dict[str, Any], topic_name: str) -> Dict[str, Any]:
    try:
        # Decode Kafka message
        if "value" in record:
            # Decode Base64 encoded message
            message_bytes = base64.b64decode(record["value"])
            message_str = message_bytes.decode("utf-8")

            try:
                # Try to parse as JSON
                message_data = json.loads(message_str)
            except json.JSONDecodeError:
                # If not JSON, treat as string
                message_data = message_str
        else:
            message_data = None

        # Extract metadata
        partition = record.get("partition", "unknown")
        offset = record.get("offset", "unknown")
        timestamp = record.get("timestamp", "unknown")

        logger.info(
            f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
        )
        logger.info(f"Message content: {message_data}")

        # Implement business logic here

        processed_data = {
            "topic": topic_name,
            "partition": partition,
            "offset": offset,
            "timestamp": timestamp,
            "original_message": message_data,
            "processing_status": "success",
        }

        return processed_data

    except Exception as e:
        logger.error(f"Error processing record: {str(e)}")
        return {
            "topic": topic_name,
            "partition": record.get("partition", "unknown"),
            "offset": record.get("offset", "unknown"),
            "error": str(e),
            "processing_status": "error",
        }

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    logger.info(f"Received event: {json.dumps(event, default=str)}")

    processed_records = []

    try:
        # MSK event structure: records by topic are contained in event['records']
        if "records" in event:
            for topic_name, topic_records in event["records"].items():
                logger.info(f"Processing topic: {topic_name}")

                for record in topic_records:
                    processed_record = process_kafka_record(record, topic_name)
                    processed_records.append(processed_record)

        logger.info(f"Successfully processed {len(processed_records)} records")

        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": f"Successfully processed {len(processed_records)} records",
                    "processed_records": len(processed_records),
                }
            ),
        }

    except Exception as e:
        logger.error(f"Error processing MSK event: {str(e)}")
        raise
```I am writing with the assumption of receiving events by directly specifying MSK as a Lambda source through event source mapping.

As this is a sample script, no specific business logic is implemented, and it only outputs received messages to the log.

## Deployment Procedure
Once the files are ready, deploy in the following order.

### 1. (Producer side) Configure multi-VPC connection for the MSK cluster

In the Producer account, set "Turn on multi-VPC connectivity" from the network settings for the target MSK cluster.

![0905_msk_02](https://devio2024-media.developers.io/image/upload/v1757065942/2025/09/05/vqpnrrubzxia9pyfpnfa.png)

The authentication type will be IAM role-based authentication.

![0905_msk_03](https://devio2024-media.developers.io/image/upload/v1757066060/2025/09/05/lwnxbalvkgab3w93n7pa.png)

It takes some time for the configuration to complete, so please wait. (In my environment, it took about 40-50 minutes)

Once AWS PrivateLink is activated, the configuration is complete.

![0905_msk_04](https://devio2024-media.developers.io/image/upload/v1757066348/2025/09/05/cnrdvaj2dgahtyynkwor.png)

### 2. (Consumer side) Application deployment (excluding event source mapping)

After AWS PrivateLink is enabled, deploy the Lambda function to the Consumer account.

At this point, do not create the event source mapping yet.

Deploy by setting the Producer account ID parameter as follows:

```bash
$ sam deploy --parameter-overrides "MSKAccountId=<Producer account ID>"

		

3. (Producer side) Deploy cluster policy

Deploy CloudFormation in the Producer account with the prepared msk_cluster_policy.yml.

This policy configures permissions for resources in the Consumer account to access MSK in the Producer account.

The parameters to be configured are as described in "File Preparation".

0905_msk_05

4. (Consumer side) Create managed VPC connection

Create an Amazon MSK managed VPC connection to access MSK in the Producer account from the Consumer account.

0906_msk_06

Configure the VPC, subnet, and security group that were deployed with AWS SAM earlier. (For the security group, select the security group created for managed VPC connection, not the security group for VPC endpoint)

When the status becomes "Available" as shown below, the setup is complete.

0905_msk_07### 5. (Consumer side) Deployment of event source mapping

Uncomment the following section in template.yaml.

			
			# MSK event source mapping
MSKEventSourceMapping:
  Type: AWS::Lambda::EventSourceMapping
  Properties:
    EventSourceArn: !Ref MSKConnectionArn
    FunctionName: !Ref MSKProcessorFunction
    StartingPosition: LATEST
    Topics:
      - test-topic
    BatchSize: 10

		

Specify the managed VPC connection created in step 4 as a parameter and deploy again with AWS SAM.

			
			$ sam deploy --parameter-overrides "MSKAccountId=<Producer account ID>" "MSKConnectionArn=<Managed VPC connection ARN>"

		

After successful deployment, you should see MSK added as an event source as shown below.

0905_msk_09

This completes the deployment.

Verification

In the Producer account, send a message to MSK. (In my environment, I'm doing this from CloudShell launched in the same private subnet as MSK. For more details, please refer to the blog mentioned at the beginning)

			
			echo '{"message": "Hello Kafka"}' | 
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
  --bootstrap-server <MSK Bootstrap server hostname>:9098 \
  --producer.config client.properties \
  --topic test-topic

		

By checking the Lambda logs in the Consumer account, we can confirm that the message was successfully received.

0905_msk_08

Conclusion

In this article, we demonstrated how to receive messages in a Lambda function in a different account using Amazon MSK's managed VPC connection.

I hope you found this helpful.

References

Share this article

FacebookHatena blogX

Related articles