Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみた

Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみた

2025.09.05

データ事業本部のueharaです。

今回は、Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみたいと思います。

はじめに

前回、「Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみた」という記事を出しました。

https://dev.classmethod.jp/articles/msk-serverless-lambda-eventbridge-pipes/

今回は、Amazon MSKのマネージドVPC接続とイベントソースマッピングを利用して、別アカウントのLambdaでメッセージを受信します。

イベントソースマッピングについては上記記事で仕組みを紹介していますので、必要に応じてご確認下さい。

なお、今回MSKのクラスターはServerlessでない通常のものを利用します。

Amazon MSKマネージド

今回作成するアプリケーションの構成図

今回作成するアプリケーションの構成図は以下の通りです。

0905_msk_01

ここでは、Producer側のアカウントにあるMSKクラスターは既に構築済みであるとします。

また、実際には2つのAZにサブネットが1つずつ存在し、MSKのクラスターはそれにまたがって作成されていますが、簡略化して記載しています。

なお今回のMSKクラスターの設定値は以下の通りです。

項目 設定値
KafkaVersion 3.9.x
NumberOfBrokerNodes 2
InstanceType kafka.m5.large

ファイル準備

Producer側のアカウント(MSKがあるアカウント)とConsumer側のアカウント(Lambdaがあるアカウント)それぞれで設定が必要となるため、ディレクトリを分けて作成します。

.
├── consumer
│   ├── function
│   │   └── msk_processor.py
│   ├── samconfig.toml
│   └── template.yaml
└── producer
    └── msk_cluster_policy.yml

まず、 Producerアカウントのクラスターポリシーの設定である msk_cluster_policy.yml は以下の通りです。

msk_cluster_policy.yml
AWSTemplateFormatVersion: 2010-09-09
Description: Apply a cluster policy to MSK cluster
Parameters:
  MSKClusterArn:
    Description: Arn of the MSK Cluster
    Type: String
  LambdaRoleArn:
    Description: Arn of the IAM Role used by Lambda function
    Type: String
  LambdaAccountId:
    Description: AWS Account ID where Lambda is deployed
    Type: String

Resources:
  MSKClusterPolicy:
    Type: AWS::MSK::ClusterPolicy
    Properties:
      ClusterArn: !Ref MSKClusterArn
      Policy: 
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              AWS: !Sub arn:aws:iam::${LambdaAccountId}:root
            Action:
              - kafka:CreateVpcConnection
              - kafka:GetBootstrapBrokers
              - kafka:DescribeCluster
              - kafka:DescribeClusterV2
            Resource: !Ref MSKClusterArn
          - Effect: Allow
            Principal:
              AWS: !Ref LambdaRoleArn
            Action:
              - kafka-cluster:Connect
              - kafka-cluster:AlterCluster
              - kafka-cluster:DescribeCluster
            Resource: !Ref MSKClusterArn
          - Effect: Allow
            Principal:
              AWS: !Ref LambdaRoleArn
            Action:
              - kafka-cluster:*Topic*
              - kafka-cluster:WriteData
              - kafka-cluster:ReadData
            Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/*
          - Effect: Allow
            Principal:
              AWS: !Ref LambdaRoleArn
            Action:
              - kafka-cluster:AlterGroup
              - kafka-cluster:DescribeGroup
            Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/*

パラメーターは MSKClusterArn, LambdaRoleArn, LambdaAccountId です。

MSKClusterArn は作成済みのMSKクラスターのARNで、LambdaRoleArn, LambdaAccountID についてはそれぞれConsumerアカウントの情報を設定します。

Producerアカウントで必要な資材は以上です。

Consumerアカウントについて、今回Lambdaを AWS SAM で構築するため、その要領でファイルを用意しています。

samconfig.toml についてはスタック名やデプロイ用バケット等AWS SAMの設定ファイルとなります。

samconfig.toml
version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "uehara-msk-test-cross-acount"
s3_bucket = "<デプロイ用のS3バケット名>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true

template.yaml はリソースを定義したテンプレートファイルです。(長いためコードは折りたたんでおきます)

template.yaml
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with MSK Managed Connection"

Parameters:
  MSKAccountId:
    Description: AWS Account Id where MSK cluster is deployed
    Type:  String
  MSKConnectionArn:
    Description: ARN of the MSK Managed Connection
    Default: ""
    Type: String

Resources:
  # VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.10.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: uehara-test-vpc

  # プライベートサブネット1 (AZ: ap-northeast-1a)
  PrivateSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.0/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: uehara-test-private-subnet-1a

  # プライベートサブネット2 (AZ: ap-northeast-1c)
  PrivateSubnet2:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.64/26
      AvailabilityZone: ap-northeast-1c
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: uehara-test-private-subnet-1c

  # VPCエンドポイント用のセキュリティグループ
  VpcEndpointSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for VPC endpoints
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: !GetAtt VPC.CidrBlock
      Tags:
        - Key: "Name"
          Value: !Sub "uehara-test-vpc-endpoint-sg"

  # Lambda VPCエンドポイント(イベントソースマッピング用)
  LambdaVPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcId: !Ref VPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.lambda'
      VpcEndpointType: Interface
      SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
      SecurityGroupIds:
        - !Ref VpcEndpointSecurityGroup
      PrivateDnsEnabled: true

  # STS VPCエンドポイント
  STSVPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcId: !Ref VPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.sts'
      VpcEndpointType: Interface
      SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
      SecurityGroupIds:
        - !Ref VpcEndpointSecurityGroup
      PrivateDnsEnabled: true

  # マネージドコネクション用のセキュリティグループ
  ManagedConnectionSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for Managed Connection
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 14001
          ToPort: 14100
          CidrIp: !GetAtt VPC.CidrBlock
      SecurityGroupEgress:
        - Description: Allow all outbound traffic
          IpProtocol: "-1"
          CidrIp: 0.0.0.0/0
      Tags:
        - Key: "Name"
          Value: !Sub "uehara-test-managed-connection-sg"

  # 自己参照設定
  ManagedConnectionSecurityGroupIngress:
    Type: AWS::EC2::SecurityGroupIngress
    Properties:
      GroupId: !Ref ManagedConnectionSecurityGroup
      IpProtocol: tcp
      FromPort: 14001
      ToPort: 14100
      SourceSecurityGroupId: !Ref ManagedConnectionSecurityGroup

  # Lambda実行ロール
  MSKProcessorExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: msk-processor-execution-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: "MSKAccessPolicy"
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kafka:DescribeVpcConnection
                Resource: '*'
              - Effect: Allow
                Action:
                  - kafka-cluster:Connect
                  - kafka-cluster:DescribeClusterDynamicConfiguration
                Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:cluster/*/*'
              - Effect: Allow
                Action:
                  - kafka-cluster:DescribeTopic
                  - kafka-cluster:ReadData
                Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:topic/*/*'
              - Effect: Allow
                Action:
                  - kafka-cluster:AlterGroup
                  - kafka-cluster:DescribeGroup
                Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:group/*/*'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole

  # MSKイベントソースLambda関数
  MSKProcessorFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: msk-processor-function
      CodeUri: function/
      Handler: msk_processor.lambda_handler
      Runtime: python3.11
      Architectures:
        - x86_64
      Timeout: 300
      MemorySize: 512
      Role: !GetAtt MSKProcessorExecutionRole.Arn
      Layers:
        - "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
      Environment:
        Variables:
          LOG_LEVEL: INFO

  # # MSKイベントソースマッピング
  # MSKEventSourceMapping:
  #   Type: AWS::Lambda::EventSourceMapping
  #   Properties:
  #     EventSourceArn: !Ref MSKConnectionArn
  #     FunctionName: !Ref MSKProcessorFunction
  #     StartingPosition: LATEST
  #     Topics:
  #       - test-topic
  #     BatchSize: 10

Outputs:
  VpcId:
    Description: ID of the created VPC
    Value: !Ref VPC
    Export:
      Name: !Sub "${AWS::StackName}-VpcId"

  PrivateSubnet1Id:
    Description: ID of Private Subnet 1
    Value: !Ref PrivateSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet1"

  PrivateSubnet2Id:
    Description: ID of Private Subnet 2
    Value: !Ref PrivateSubnet2
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet2"

  MSKProcessorFunctionArn:
    Description: ARN of the MSK Processor Lambda function
    Value: !GetAtt MSKProcessorFunction.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"

ここで1点補足ですが、 MSKEventSourceMapping は敢えてコメントアウトしています。

これは、後段のデプロイ手順で説明するMSKのマネージドVPC接続を設定した後にデプロイするためのもので、初回デプロイ時にはデプロイしないためコメントアウトしています。

別ファイルにしても良かったのですが、今回は簡単化のため1ファイルにまとめています。

基本的に作成リソースはコメントに記載している通りですが、補足として、イベントソースマッピングはプライベートサブネット上で動作するため、イベントソースマッピングがLambda関数を起動するためにLambdaおよびSTS用のVPCエンドポイントを用意しています。

こちらに関してはパブリックサブネットを別途用意してそこにNAT Gatewayを配置し、プライベートサブネットからのルーティングを設定するでも問題ありません。

最後に、 msk_processor.py はイベントソースマッピングから起動するLambda関数のスクリプトファイルです。

msk_processor.py
import base64
import json
import logging
from typing import Any, Dict, List

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_kafka_record(record: Dict[str, Any], topic_name: str) -> Dict[str, Any]:
    try:
        # Kafkaメッセージのデコード
        if "value" in record:
            # Base64でエンコードされたメッセージをデコード
            message_bytes = base64.b64decode(record["value"])
            message_str = message_bytes.decode("utf-8")

            try:
                # JSONとしてパース試行
                message_data = json.loads(message_str)
            except json.JSONDecodeError:
                # JSONでない場合は文字列として扱う
                message_data = message_str
        else:
            message_data = None

        # メタデータの抽出
        partition = record.get("partition", "unknown")
        offset = record.get("offset", "unknown")
        timestamp = record.get("timestamp", "unknown")

        logger.info(
            f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
        )
        logger.info(f"Message content: {message_data}")

        # ここでビジネスロジックを実装

        processed_data = {
            "topic": topic_name,
            "partition": partition,
            "offset": offset,
            "timestamp": timestamp,
            "original_message": message_data,
            "processing_status": "success",
        }

        return processed_data

    except Exception as e:
        logger.error(f"Error processing record: {str(e)}")
        return {
            "topic": topic_name,
            "partition": record.get("partition", "unknown"),
            "offset": record.get("offset", "unknown"),
            "error": str(e),
            "processing_status": "error",
        }

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    logger.info(f"Received event: {json.dumps(event, default=str)}")

    processed_records = []

    try:
        # MSKイベントの構造: event['records']にトピック別のレコードが含まれる
        if "records" in event:
            for topic_name, topic_records in event["records"].items():
                logger.info(f"Processing topic: {topic_name}")

                for record in topic_records:
                    processed_record = process_kafka_record(record, topic_name)
                    processed_records.append(processed_record)

        logger.info(f"Successfully processed {len(processed_records)} records")

        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": f"Successfully processed {len(processed_records)} records",
                    "processed_records": len(processed_records),
                }
            ),
        }

    except Exception as e:
        logger.error(f"Error processing MSK event: {str(e)}")
        raise

イベントソースマッピングでLambdaのソースとしてMSKを直接指定し、イベントを受信する想定で記述をしています。

サンプルスクリプトのため特にビジネスロジックは実装しておらず、受け取ったメッセージをログに出力するのみとしています。

デプロイ手順

ファイルが用意できたら、以下手順で順番にデプロイを行います。

1. (Producer側)MSKクラスターのマルチVPC接続の設定

Producerアカウントにある今回連携対象のMSKクラスターについて、ネットワーク設定から『マルチVPC接続をオンにする』を設定します。

0905_msk_02

認証タイプはIAMロールベースでの認証となります。(SASL/SCRAMやTLS等の認証方式ではマルチVPC接続は有効化できません)

0905_msk_03

設定が完了するまで少し時間がかかるため、待ちます。(私の環境では40分〜50分程度かかりました)

AWS PrivateLinkがオンになったら設定は完了です。

0905_msk_04

2. (Consumer側)アプリケーションデプロイ(イベントソースマッピング以外)

AWS PrivateLinkがオンにできたら、ConsumerアカウントにLambda関数のデプロイを行います。

この時点ではまだイベントソースマッピングは作成しません。

デプロイは以下の通り、ProducerアカウントIDのパラメーターを設定してデプロイします。

$ sam deploy --parameter-overrides "MSKAccountId=<ProducerアカウントのアカウントID>"

3. (Producer側)クラスターポリシーのデプロイ

用意した msk_cluster_policy.yml でProducerアカウントでCloudFormationのデプロイを行います。

このポリシーで、ConsumerアカウントのリソースからProducerアカウントにあるMSKにアクセス許可をする設定を行います。

設定するパラメーターについては「ファイル準備」に記載した通りです。

0905_msk_05

4. (Consumer側)マネージドVPC接続の作成

ConsumerアカウントからProducerアカウントのMSKにアクセスするため、Amazon MSKのマネージドVPC接続を作成します。

0906_msk_06

VPCやサブネット、セキュリティグループは先ほどAWS SAMでデプロイしたものを設定します。(セキュリティグループはVPCエンドポイント用のセキュリティグループではなく、マネージドVPC接続用に作成したセキュリティグループを選択します)

以下のように状態が「使用可能」になれば設定完了です。

0905_msk_07

5. (Consumer側)イベントソースマッピングのデプロイ

template.yaml の以下のコメント部分を外します。

# MSKイベントソースマッピング
MSKEventSourceMapping:
  Type: AWS::Lambda::EventSourceMapping
  Properties:
    EventSourceArn: !Ref MSKConnectionArn
    FunctionName: !Ref MSKProcessorFunction
    StartingPosition: LATEST
    Topics:
      - test-topic
    BatchSize: 10

手順4で作成したマネージドVPC接続をパラメーターに指定し、再度AWS SAMでデプロイを行います。

$ sam deploy --parameter-overrides "MSKAccountId=<ProducerアカウントのアカウントID>" "MSKConnectionArn=<マネージドVPC接続のARN>"

以上でデプロイは完了です。

動作確認

Producerアカウントで、MSKに対しメッセージを送信します。(私の手元の環境ではMSKと同じプライベートサブネットで起動したCloudShellから実施しています。詳しくは冒頭で記載したブログをご参考下さい)

echo '{"message": "Hello Kafka"}' | 
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
  --bootstrap-server <MSKのBootstrapサーバーのホスト名>:9098 \
  --producer.config client.properties \
  --topic test-topic

ConsumerアカウントのLambdaのログを確認すると、無事メッセージを受信できていることが確認できました。

0905_msk_08

最後に

今回は、Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみました。

参考になりましたら幸いです。

参考文献

この記事をシェアする

facebookのロゴhatenaのロゴtwitterのロゴ

© Classmethod, Inc. All rights reserved.