Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみた
データ事業本部のueharaです。
今回は、Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみたいと思います。
はじめに
前回、「Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみた」という記事を出しました。
今回は、Amazon MSKのマネージドVPC接続とイベントソースマッピングを利用して、別アカウントのLambdaでメッセージを受信します。
イベントソースマッピングについては上記記事で仕組みを紹介していますので、必要に応じてご確認下さい。
なお、今回MSKのクラスターはServerlessでない通常のものを利用します。
Amazon MSKマネージド
今回作成するアプリケーションの構成図
今回作成するアプリケーションの構成図は以下の通りです。
ここでは、Producer側のアカウントにあるMSKクラスターは既に構築済みであるとします。
また、実際には2つのAZにサブネットが1つずつ存在し、MSKのクラスターはそれにまたがって作成されていますが、簡略化して記載しています。
なお今回のMSKクラスターの設定値は以下の通りです。
項目 | 設定値 |
---|---|
KafkaVersion | 3.9.x |
NumberOfBrokerNodes | 2 |
InstanceType | kafka.m5.large |
ファイル準備
Producer側のアカウント(MSKがあるアカウント)とConsumer側のアカウント(Lambdaがあるアカウント)それぞれで設定が必要となるため、ディレクトリを分けて作成します。
.
├── consumer
│ ├── function
│ │ └── msk_processor.py
│ ├── samconfig.toml
│ └── template.yaml
└── producer
└── msk_cluster_policy.yml
まず、 Producerアカウントのクラスターポリシーの設定である msk_cluster_policy.yml
は以下の通りです。
AWSTemplateFormatVersion: 2010-09-09
Description: Apply a cluster policy to MSK cluster
Parameters:
MSKClusterArn:
Description: Arn of the MSK Cluster
Type: String
LambdaRoleArn:
Description: Arn of the IAM Role used by Lambda function
Type: String
LambdaAccountId:
Description: AWS Account ID where Lambda is deployed
Type: String
Resources:
MSKClusterPolicy:
Type: AWS::MSK::ClusterPolicy
Properties:
ClusterArn: !Ref MSKClusterArn
Policy:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
AWS: !Sub arn:aws:iam::${LambdaAccountId}:root
Action:
- kafka:CreateVpcConnection
- kafka:GetBootstrapBrokers
- kafka:DescribeCluster
- kafka:DescribeClusterV2
Resource: !Ref MSKClusterArn
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action:
- kafka-cluster:Connect
- kafka-cluster:AlterCluster
- kafka-cluster:DescribeCluster
Resource: !Ref MSKClusterArn
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action:
- kafka-cluster:*Topic*
- kafka-cluster:WriteData
- kafka-cluster:ReadData
Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/*
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action:
- kafka-cluster:AlterGroup
- kafka-cluster:DescribeGroup
Resource: !Sub arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/*
パラメーターは MSKClusterArn
, LambdaRoleArn
, LambdaAccountId
です。
MSKClusterArn
は作成済みのMSKクラスターのARNで、LambdaRoleArn
, LambdaAccountID
についてはそれぞれConsumerアカウントの情報を設定します。
Producerアカウントで必要な資材は以上です。
Consumerアカウントについて、今回Lambdaを AWS SAM で構築するため、その要領でファイルを用意しています。
samconfig.toml
についてはスタック名やデプロイ用バケット等AWS SAMの設定ファイルとなります。
version = 0.1
[default]
region = "ap-northeast-1"
[default.build.parameters]
debug = true
[default.deploy.parameters]
stack_name = "uehara-msk-test-cross-acount"
s3_bucket = "<デプロイ用のS3バケット名>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true
template.yaml
はリソースを定義したテンプレートファイルです。(長いためコードは折りたたんでおきます)
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with MSK Managed Connection"
Parameters:
MSKAccountId:
Description: AWS Account Id where MSK cluster is deployed
Type: String
MSKConnectionArn:
Description: ARN of the MSK Managed Connection
Default: ""
Type: String
Resources:
# VPC
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.10.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: uehara-test-vpc
# プライベートサブネット1 (AZ: ap-northeast-1a)
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.0/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: uehara-test-private-subnet-1a
# プライベートサブネット2 (AZ: ap-northeast-1c)
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.64/26
AvailabilityZone: ap-northeast-1c
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: uehara-test-private-subnet-1c
# VPCエンドポイント用のセキュリティグループ
VpcEndpointSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for VPC endpoints
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: !GetAtt VPC.CidrBlock
Tags:
- Key: "Name"
Value: !Sub "uehara-test-vpc-endpoint-sg"
# Lambda VPCエンドポイント(イベントソースマッピング用)
LambdaVPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref VPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.lambda'
VpcEndpointType: Interface
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroupIds:
- !Ref VpcEndpointSecurityGroup
PrivateDnsEnabled: true
# STS VPCエンドポイント
STSVPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref VPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.sts'
VpcEndpointType: Interface
SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroupIds:
- !Ref VpcEndpointSecurityGroup
PrivateDnsEnabled: true
# マネージドコネクション用のセキュリティグループ
ManagedConnectionSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Managed Connection
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 14001
ToPort: 14100
CidrIp: !GetAtt VPC.CidrBlock
SecurityGroupEgress:
- Description: Allow all outbound traffic
IpProtocol: "-1"
CidrIp: 0.0.0.0/0
Tags:
- Key: "Name"
Value: !Sub "uehara-test-managed-connection-sg"
# 自己参照設定
ManagedConnectionSecurityGroupIngress:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Ref ManagedConnectionSecurityGroup
IpProtocol: tcp
FromPort: 14001
ToPort: 14100
SourceSecurityGroupId: !Ref ManagedConnectionSecurityGroup
# Lambda実行ロール
MSKProcessorExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: msk-processor-execution-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: "MSKAccessPolicy"
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kafka:DescribeVpcConnection
Resource: '*'
- Effect: Allow
Action:
- kafka-cluster:Connect
- kafka-cluster:DescribeClusterDynamicConfiguration
Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:cluster/*/*'
- Effect: Allow
Action:
- kafka-cluster:DescribeTopic
- kafka-cluster:ReadData
Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:topic/*/*'
- Effect: Allow
Action:
- kafka-cluster:AlterGroup
- kafka-cluster:DescribeGroup
Resource: !Sub 'arn:aws:kafka:${AWS::Region}:${MSKAccountId}:group/*/*'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
# MSKイベントソースLambda関数
MSKProcessorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: msk-processor-function
CodeUri: function/
Handler: msk_processor.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Timeout: 300
MemorySize: 512
Role: !GetAtt MSKProcessorExecutionRole.Arn
Layers:
- "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
Environment:
Variables:
LOG_LEVEL: INFO
# # MSKイベントソースマッピング
# MSKEventSourceMapping:
# Type: AWS::Lambda::EventSourceMapping
# Properties:
# EventSourceArn: !Ref MSKConnectionArn
# FunctionName: !Ref MSKProcessorFunction
# StartingPosition: LATEST
# Topics:
# - test-topic
# BatchSize: 10
Outputs:
VpcId:
Description: ID of the created VPC
Value: !Ref VPC
Export:
Name: !Sub "${AWS::StackName}-VpcId"
PrivateSubnet1Id:
Description: ID of Private Subnet 1
Value: !Ref PrivateSubnet1
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet1"
PrivateSubnet2Id:
Description: ID of Private Subnet 2
Value: !Ref PrivateSubnet2
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet2"
MSKProcessorFunctionArn:
Description: ARN of the MSK Processor Lambda function
Value: !GetAtt MSKProcessorFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"
ここで1点補足ですが、 MSKEventSourceMapping
は敢えてコメントアウトしています。
これは、後段のデプロイ手順で説明するMSKのマネージドVPC接続を設定した後にデプロイするためのもので、初回デプロイ時にはデプロイしないためコメントアウトしています。
別ファイルにしても良かったのですが、今回は簡単化のため1ファイルにまとめています。
基本的に作成リソースはコメントに記載している通りですが、補足として、イベントソースマッピングはプライベートサブネット上で動作するため、イベントソースマッピングがLambda関数を起動するためにLambdaおよびSTS用のVPCエンドポイントを用意しています。
こちらに関してはパブリックサブネットを別途用意してそこにNAT Gatewayを配置し、プライベートサブネットからのルーティングを設定するでも問題ありません。
最後に、 msk_processor.py
はイベントソースマッピングから起動するLambda関数のスクリプトファイルです。
import base64
import json
import logging
from typing import Any, Dict, List
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_kafka_record(record: Dict[str, Any], topic_name: str) -> Dict[str, Any]:
try:
# Kafkaメッセージのデコード
if "value" in record:
# Base64でエンコードされたメッセージをデコード
message_bytes = base64.b64decode(record["value"])
message_str = message_bytes.decode("utf-8")
try:
# JSONとしてパース試行
message_data = json.loads(message_str)
except json.JSONDecodeError:
# JSONでない場合は文字列として扱う
message_data = message_str
else:
message_data = None
# メタデータの抽出
partition = record.get("partition", "unknown")
offset = record.get("offset", "unknown")
timestamp = record.get("timestamp", "unknown")
logger.info(
f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
)
logger.info(f"Message content: {message_data}")
# ここでビジネスロジックを実装
processed_data = {
"topic": topic_name,
"partition": partition,
"offset": offset,
"timestamp": timestamp,
"original_message": message_data,
"processing_status": "success",
}
return processed_data
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
return {
"topic": topic_name,
"partition": record.get("partition", "unknown"),
"offset": record.get("offset", "unknown"),
"error": str(e),
"processing_status": "error",
}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
logger.info(f"Received event: {json.dumps(event, default=str)}")
processed_records = []
try:
# MSKイベントの構造: event['records']にトピック別のレコードが含まれる
if "records" in event:
for topic_name, topic_records in event["records"].items():
logger.info(f"Processing topic: {topic_name}")
for record in topic_records:
processed_record = process_kafka_record(record, topic_name)
processed_records.append(processed_record)
logger.info(f"Successfully processed {len(processed_records)} records")
return {
"statusCode": 200,
"body": json.dumps(
{
"message": f"Successfully processed {len(processed_records)} records",
"processed_records": len(processed_records),
}
),
}
except Exception as e:
logger.error(f"Error processing MSK event: {str(e)}")
raise
イベントソースマッピングでLambdaのソースとしてMSKを直接指定し、イベントを受信する想定で記述をしています。
サンプルスクリプトのため特にビジネスロジックは実装しておらず、受け取ったメッセージをログに出力するのみとしています。
デプロイ手順
ファイルが用意できたら、以下手順で順番にデプロイを行います。
1. (Producer側)MSKクラスターのマルチVPC接続の設定
Producerアカウントにある今回連携対象のMSKクラスターについて、ネットワーク設定から『マルチVPC接続をオンにする』を設定します。
認証タイプはIAMロールベースでの認証となります。(SASL/SCRAMやTLS等の認証方式ではマルチVPC接続は有効化できません)
設定が完了するまで少し時間がかかるため、待ちます。(私の環境では40分〜50分程度かかりました)
AWS PrivateLinkがオンになったら設定は完了です。
2. (Consumer側)アプリケーションデプロイ(イベントソースマッピング以外)
AWS PrivateLinkがオンにできたら、ConsumerアカウントにLambda関数のデプロイを行います。
この時点ではまだイベントソースマッピングは作成しません。
デプロイは以下の通り、ProducerアカウントIDのパラメーターを設定してデプロイします。
$ sam deploy --parameter-overrides "MSKAccountId=<ProducerアカウントのアカウントID>"
3. (Producer側)クラスターポリシーのデプロイ
用意した msk_cluster_policy.yml
でProducerアカウントでCloudFormationのデプロイを行います。
このポリシーで、ConsumerアカウントのリソースからProducerアカウントにあるMSKにアクセス許可をする設定を行います。
設定するパラメーターについては「ファイル準備」に記載した通りです。
4. (Consumer側)マネージドVPC接続の作成
ConsumerアカウントからProducerアカウントのMSKにアクセスするため、Amazon MSKのマネージドVPC接続を作成します。
VPCやサブネット、セキュリティグループは先ほどAWS SAMでデプロイしたものを設定します。(セキュリティグループはVPCエンドポイント用のセキュリティグループではなく、マネージドVPC接続用に作成したセキュリティグループを選択します)
以下のように状態が「使用可能」になれば設定完了です。
5. (Consumer側)イベントソースマッピングのデプロイ
template.yaml
の以下のコメント部分を外します。
# MSKイベントソースマッピング
MSKEventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !Ref MSKConnectionArn
FunctionName: !Ref MSKProcessorFunction
StartingPosition: LATEST
Topics:
- test-topic
BatchSize: 10
手順4で作成したマネージドVPC接続をパラメーターに指定し、再度AWS SAMでデプロイを行います。
$ sam deploy --parameter-overrides "MSKAccountId=<ProducerアカウントのアカウントID>" "MSKConnectionArn=<マネージドVPC接続のARN>"
以上でデプロイは完了です。
動作確認
Producerアカウントで、MSKに対しメッセージを送信します。(私の手元の環境ではMSKと同じプライベートサブネットで起動したCloudShellから実施しています。詳しくは冒頭で記載したブログをご参考下さい)
echo '{"message": "Hello Kafka"}' |
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
--bootstrap-server <MSKのBootstrapサーバーのホスト名>:9098 \
--producer.config client.properties \
--topic test-topic
ConsumerアカウントのLambdaのログを確認すると、無事メッセージを受信できていることが確認できました。
最後に
今回は、Amazon MSKのマネージドVPC接続を利用して別アカウントのLambdaでメッセージを受信してみました。
参考になりましたら幸いです。