Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみた
データ事業本部のueharaです。
今回は、Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみたいと思います。
はじめに
Amazon MSK Serverlessについて
Amazon MSK ServerlessはAmazon MSK(Amazon Managed Streaming for Apache Kafka)のクラスタータイプで、クラスターの容量を管理およびスケールすることなくApache Kafkaを実行することができるサービスです。
Kafkaの基本的な概念については以下の記事などが参考になると思います。
Kafkaの基本的な構成要素としては以下の通りです。
- Topic:メッセージを分類するカテゴリ。データベースのテーブルのような概念
- Partition:Topicを分割した単位。並列処理とスケーラビリティを実現
- Message:Kafkaで送受信される個々のデータ
- Offset:パーティション内でのメッセージの位置を示す連番
上記の構成要素をベースとし、Kafkaを利用する環境には以下の4つの役割が存在します。
- Producer:メッセージを送信する側
- Consumer:メッセージを受信・処理する側
- Consumer Group:複数のConsumerをグループ化し、負荷分散を実現
- Broker:Kafkaクラスターを構成するサーバー
MSKが担う役割としては「Broker」となります。
認証について
KafkaもDBへの接続のように「認証」という概念があるのですが、いくつかある認証方式の中でもMSK Serverlessが対応しているのは「IAM認証」のみになります。
認証について詳しく知りたい方は公式ドキュメントをご確認下さい。
Lambdaのイベントソースマッピングについて
Kafkaは「Pull型のアーキテクチャ」となります。
すなわち、Producerが送信したデータに対し、Consumerが任意のタイミングで自分でデータを取りに行く必要があります。(Consumerはポーリングする必要がある)
ただ、Lambdaのイベントソースマッピングを利用するとProducerのデータ送信を検知し、Lambdaに即座にデータを連携することができます。
これはイベントソースマッピングにおいて、内部的な処理としては1秒ごとにKafkaブローカーにポーリングを行うことで実現されています。
ただ、上記資料の通り、イベントソースマッピングによるLambdaの起動は "同期的に(Synchronous)" 行われます。
これは仕様になるのですが、同期実行の場合Lambdaでレコードの処理に失敗した際Lambdaは同じレコードに対し関数を再度呼び出し、処理が成功するか、ストレージまたは時間の制限でレコードがストリームから消えるまで処理を継続します。
つまり、「無限ループ」とまでは言いませんが、処理に問題がある場合はストリームにレコードが存在する限りLambdaが起動し続けることになります。
例えば一時的なネットワークの問題等であれば再起動を続けるでも良いのですが、処理ロジック自体に問題があり失敗する場合は再起動しても解決する問題ではないので、あまり好ましい挙動とは言えません。
上記事象を回避したい場合は、Lambda内の処理ロジックとしてエラーが発生するメッセージに対応するか(エラーが発生したメッセージは別で取っておいてLambdaは正常終了する等)、障害発生時の送信先(Failure Destination)の設定、またはその他の類似する対応を実施する必要があります。
EventBridge Pipeについて
EventBridge Pipesを利用することでも、Kafkaへのメッセージ送信を検知することができます。
EventBridge Pipesについても、実際としては内部でLambdaのイベントソースマッピングリソースを使用しているようです。
今回はEventBridge PipeでKafkaのメッセージを検知し、Lambdaを「非同期で」実行したいと思います。
※上記の場合、EventBridgeがLambdaの起動に成功すればLambda自体の実行成否に関わらずKafka側へは正常終了を通知する(Offsetを進める)。
今回作成するアプリケーションの構成図
今回作成するアプリケーションの構成図は以下の通りです。
イベントソースマッピングのポーラーはVPC内で動作するため、AWSのサービスAPIを呼び出すためにPrivateLinkまたはNAT GWが必要となります。今回はNAT GWを設定しています。
メッセージを送信するProducerに関してはVPC内で動作させるCloudShellで実行しようと思います。
また、EventBridge PipeからLambdaを非同期実行し、失敗したメッセージについてはDLQに転送するようにします。
リソース作成
ファイル準備
今回作成するリソースのディレクトリ構成は以下の通りです。リソースについては AWS SAM を利用して作成することにします。
.
├── function
│ └── msk_processor.py
├── samconfig.toml
└── template.yaml
samconfig.toml
はSAMでデプロイするための設定ファイルです。今回は以下のようにしてみました。
version = 0.1
[default]
region = "ap-northeast-1"
[default.build.parameters]
debug = true
[default.deploy.parameters]
stack_name = "uehara-msk-test"
s3_bucket = "<デプロイ用のS3バケット名>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true
template.yaml
はリソースを定義したテンプレートファイルです。(長いためコードは折りたたんでおきます)
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application with Serverless Kafka"
Resources:
# VPC
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.10.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: uehara-msk-vpc
# パブリックサブネット (AZ: ap-northeast-1a)
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.128/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: msk-public-subnet-1a
# プライベートサブネット1 (AZ: ap-northeast-1a)
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.0/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: msk-private-subnet-1a
# プライベートサブネット2 (AZ: ap-northeast-1c)
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.64/26
AvailabilityZone: ap-northeast-1c
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: msk-private-subnet-1c
# インターネットゲートウェイ
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: msk-internet-gateway
# インターネットゲートウェイをVPCにアタッチ
AttachGateway:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
# NAT Gateway用のElastic IP
NATGatewayEIP:
Type: AWS::EC2::EIP
DependsOn: AttachGateway
Properties:
Domain: vpc
Tags:
- Key: Name
Value: msk-nat-gateway-eip
# NAT Gateway
NATGateway:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NATGatewayEIP.AllocationId
SubnetId: !Ref PublicSubnet1
Tags:
- Key: Name
Value: msk-nat-gateway
# パブリックルートテーブル
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: msk-public-route-table
# パブリックルート(インターネットゲートウェイへ)
PublicRoute:
Type: AWS::EC2::Route
DependsOn: AttachGateway
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
# パブリックサブネットとルートテーブルの関連付け
PublicSubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet1
RouteTableId: !Ref PublicRouteTable
# プライベートルートテーブル
PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: msk-private-route-table
# プライベートルート(NAT Gatewayへ)
PrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NATGateway
# プライベートサブネット1とルートテーブルの関連付け
PrivateSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref PrivateRouteTable
# プライベートサブネット2とルートテーブルの関連付け
PrivateSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable
# セキュリティグループ
MSKSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for MSK Serverless cluster
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 9098
ToPort: 9098
CidrIp: 10.10.0.0/16
Description: MSK Serverless bootstrap servers (VPC access)
- IpProtocol: tcp
FromPort: 9092
ToPort: 9092
CidrIp: 10.10.0.0/16
Description: MSK Kafka protocol (VPC access)
Tags:
- Key: Name
Value: msk-serverless-sg
# MSK Serverlessクラスター
MSKServerlessCluster:
Type: AWS::MSK::ServerlessCluster
Properties:
ClusterName: uehara-msk-serverless-cluster
VpcConfigs:
- SubnetIds:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
SecurityGroups:
- !Ref MSKSecurityGroup
ClientAuthentication:
Sasl:
Iam:
Enabled: true
# Lambda実行ロール
MSKProcessorExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: msk-processor-execution-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
# MSKイベントソースLambda関数
MSKProcessorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: msk-processor-function
CodeUri: function/
Handler: msk_processor.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Timeout: 300
MemorySize: 512
Role: !GetAtt MSKProcessorExecutionRole.Arn
Layers:
- "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
Environment:
Variables:
LOG_LEVEL: INFO
EventInvokeConfig:
MaximumRetryAttempts: 2
MaximumEventAgeInSeconds: 300
DeadLetterQueue:
Type: SQS
TargetArn: !GetAtt DeadLetterQueue.Arn
# DLQ
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: msk-dlq
MessageRetentionPeriod: 1209600 # 14日
# Pipe実行ロール
PipeExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: pipes.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: MSKAccessPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kafka:DescribeCluster
- kafka:DescribeClusterV2
- kafka:GetBootstrapBrokers
- ec2:CreateNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:DescribeVpcs
- ec2:DeleteNetworkInterface
- ec2:DescribeSubnets
- ec2:DescribeSecurityGroups
Resource: "*"
- Effect: Allow
Action:
- kafka-cluster:Connect
- kafka-cluster:AlterCluster
- kafka-cluster:DescribeCluster
Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/uehara-msk-serverless-cluster/*"
- Effect: Allow
Action:
- kafka-cluster:*Topic*
- kafka-cluster:WriteData
- kafka-cluster:ReadData
Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/uehara-msk-serverless-cluster/*"
- Effect: Allow
Action:
- kafka-cluster:AlterGroup
- kafka-cluster:DescribeGroup
Resource: !Sub "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/uehara-msk-serverless-cluster/*"
- PolicyName: LambdaInvoke
PolicyDocument:
Statement:
- Effect: Allow
Action: lambda:InvokeFunction
Resource: !GetAtt MSKProcessorFunction.Arn
# EventBridge Pipe
MSKPipe:
Type: AWS::Pipes::Pipe
Properties:
Name: msk-to-lambda-pipe
Source: !GetAtt MSKServerlessCluster.Arn
Target: !GetAtt MSKProcessorFunction.Arn
RoleArn: !GetAtt PipeExecutionRole.Arn
SourceParameters:
ManagedStreamingKafkaParameters:
TopicName: test-topic
StartingPosition: LATEST
BatchSize: 10
TargetParameters:
LambdaFunctionParameters:
InvocationType: FIRE_AND_FORGET
Outputs:
VpcId:
Description: ID of the created VPC
Value: !Ref VPC
Export:
Name: !Sub "${AWS::StackName}-VpcId"
MSKClusterArn:
Description: ARN of the MSK Serverless cluster
Value: !GetAtt MSKServerlessCluster.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKClusterArn"
PrivateSubnet1Id:
Description: ID of Private Subnet 1
Value: !Ref PrivateSubnet1
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet1"
PrivateSubnet2Id:
Description: ID of Private Subnet 2
Value: !Ref PrivateSubnet2
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet2"
SecurityGroupId:
Description: ID of MSK Security Group
Value: !Ref MSKSecurityGroup
Export:
Name: !Sub "${AWS::StackName}-MSKSecurityGroup"
MSKProcessorFunctionArn:
Description: ARN of the MSK Processor Lambda function
Value: !GetAtt MSKProcessorFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"
MSKProcessorExecutionRoleArn:
Description: ARN of the MSK Processor Lambda execution role
Value: !GetAtt MSKProcessorExecutionRole.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorExecutionRoleArn"
PublicSubnet1Id:
Description: ID of Public Subnet 1
Value: !Ref PublicSubnet1
Export:
Name: !Sub "${AWS::StackName}-PublicSubnet1"
InternetGatewayId:
Description: ID of Internet Gateway
Value: !Ref InternetGateway
Export:
Name: !Sub "${AWS::StackName}-InternetGateway"
NATGatewayId:
Description: ID of NAT Gateway
Value: !Ref NATGateway
Export:
Name: !Sub "${AWS::StackName}-NATGateway"
NATGatewayEIPId:
Description: ID of NAT Gateway Elastic IP
Value: !Ref NATGatewayEIP
Export:
Name: !Sub "${AWS::StackName}-NATGatewayEIP"
PublicRouteTableId:
Description: ID of Public Route Table
Value: !Ref PublicRouteTable
Export:
Name: !Sub "${AWS::StackName}-PublicRouteTable"
PrivateRouteTableId:
Description: ID of Private Route Table
Value: !Ref PrivateRouteTable
Export:
Name: !Sub "${AWS::StackName}-PrivateRouteTable"
EventBridge Pipeで設定している ManagedStreamingKafkaParameters
の部分に関して、具体的にどのようなパラメーターが設定できるのかは以下のドキュメントをご確認下さい。
上記では test-topic
という名前のトピックを設定し、バッチサイズ(1度に処理するメッセージの最大数)は10にしました。
開始ポジションは LATEST
に設定していますが、これはどのような位置からメッセージを読むかを指定するものです。(EventBridge Pipeでは LATEST
または TRIM_HORIZON
を設定可能です)
Lambdaを非同期実行するための設定は TargetParameters
の InvocationType
を FIRE_AND_FORGET
としている部分です。
デフォルトでは REQUEST_RESPONSE
(同期実行)となっているため、特に障害時の処理を明示的に入れてない場合は先に述べたようにエラー発生時にLambdaが繰り返し起動してしまいます。(参考)
Lambda自体には EventInvokeConfig
という非同期でLambdaを呼び出す場合の設定をしており、最大試行数は2に設定しています。
これを超えて失敗すると設定したDLQにメッセージが転送される形となります。
EventBridge Pipeが利用するIAMロールには、MSKのIAM認証に必要なポリシーを記述しています。
最後に、Lambda関数のスクリプトファイルである msk_processor.py
は以下の通りです。
import base64
import json
import logging
from typing import Any, Dict, List
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_kafka_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""
個別のKafkaレコードを処理する
Args:
record (Dict[str, Any]): EventBridge Pipeから受信したKafkaレコード
Returns:
Dict[str, Any]: 処理されたレコードデータ
"""
try:
topic_name = record.get("topic", "unknown")
# Kafkaメッセージのデコード
if "value" in record:
# Base64でエンコードされたメッセージをデコード
message_bytes = base64.b64decode(record["value"])
message_str = message_bytes.decode("utf-8")
try:
# JSONとしてパース試行
message_data = json.loads(message_str)
except json.JSONDecodeError:
# JSONでない場合は文字列として扱う
message_data = message_str
else:
message_data = None
# EventBridge Pipeのメタデータの抽出
partition = record.get("partition", "unknown")
offset = record.get("offset", "unknown")
timestamp = record.get("timestamp", "unknown")
event_source = record.get("eventSource", "unknown")
event_source_arn = record.get("eventSourceArn", "unknown")
logger.info(
f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
)
logger.info(f"Message content: {message_data}")
# ここでビジネスロジックを実装
processed_data = {
"topic": topic_name,
"partition": partition,
"offset": offset,
"timestamp": timestamp,
"event_source": event_source,
"event_source_arn": event_source_arn,
"original_message": message_data,
"processing_status": "success",
}
return processed_data
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
return {
"topic": record.get("topic", "unknown"),
"partition": record.get("partition", "unknown"),
"offset": record.get("offset", "unknown"),
"error": str(e),
"processing_status": "error",
}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
EventBridge PipeからのKafkaメッセージを処理するLambda関数
Args:
event (Dict[str, Any] | List[Dict[str, Any]]): イベントデータ
通常はKafkaレコードの配列形式で受信される
Kafkaを直接イベントソースとして指定した場合は辞書形式で "records" キーを含む
context (Any): Lambda実行コンテキスト
Returns:
Dict[str, Any]: Lambda関数の実行結果
"""
logger.info(f"Received event: {json.dumps(event, default=str)}")
processed_records = []
try:
# EventBridge Pipeのイベント構造: eventは直接レコードの配列
if isinstance(event, list):
logger.info(f"Processing {len(event)} records from EventBridge Pipe")
for record in event:
processed_record = process_kafka_record(record)
processed_records.append(processed_record)
else:
# MSKと直接接続された時の互換性のための処理
logger.warning("Received non-array event, checking for legacy MSK format")
if "records" in event:
for topic_name, topic_records in event["records"].items():
logger.info(f"Processing topic (legacy format): {topic_name}")
for record in topic_records:
# レコードにトピック名を追加
record["topic"] = topic_name
processed_record = process_kafka_record(record)
processed_records.append(processed_record)
logger.info(f"Successfully processed {len(processed_records)} records")
return {
"statusCode": 200,
"body": json.dumps(
{
"message": f"Successfully processed {len(processed_records)} records",
"processed_records": len(processed_records),
}
),
}
except Exception as e:
logger.error(f"Error processing: {str(e)}")
raise
基本的にはEventBridge Pipeからのイベントを想定していますが、イベントソースマッピングでLambdaのソースとしてMSKが直接指定された場合でも動くようにしております。
サンプルスクリプトのため特にビジネスロジックは実装しておらず、受け取ったメッセージをログに出力するのみとしています。
デプロイ
以下コマンドでデプロイ可能です。
$ sam deploy
デプロイが完了すると、指定したスタック名でスタックが作成されます。
動作確認
まず、今回作成したVPC内でCloudShellを起動します。
CloudShellが起動したら、Kafka CLIやMSKのIAM認証のためのライブラリ等のDLを行います。
# CloudShellでKafka CLIツールをダウンロード
$ wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
$ tar -xzf kafka_2.13-3.9.1.tgz
# MSK IAM認証ライブラリのダウンロード
$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.3.0/aws-msk-iam-auth-2.3.0-all.jar
$ cp aws-msk-iam-auth-2.3.0-all.jar ./kafka_2.13-3.9.1/libs/
# クライアントの設定ファイルを作成
$ cat > client.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF
ここまで完了すると、直下のディレクトリファイルは以下のようになっていると思います。
準備ができたら、Bootstrapサーバーの取得を行い、トピックの作成を行います。
# Bootstrap serverの取得
$ aws kafka get-bootstrap-brokers \
--cluster-arn "<作成したKafkaクラスターのARN>"
# トピックの作成
$ ./kafka_2.13-3.9.1/bin/kafka-topics.sh \
--bootstrap-server <取得したホスト名>:9098 \
--command-config client.properties \
--create \
--topic test-topic \
--partitions 3
トピックの作成が完了したら、メッセージの送信を行います。
$ echo '{"message": "Hello Kafka"}' |
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
--bootstrap-server <取得したホスト名>:9098 \
--producer.config client.properties \
--topic test-topic
Lambdaのログを確認すると、無事メッセージが取得できていることが確認できました。
あえてLambdaを失敗するよう raise Exception()
を設定して動作させると、2回の再試行の後以下の通りDLQに設定したSQSにメッセージが送信されていることも確認できました。
最後に
今回は、Amazon MSK Serverless + Lambda + EventBridge PipesでサーバーレスなKafka環境を構築してみました。
参考になりましたら幸いです。
参考文献
- Processing self-managed Apache Kafka messages with Lambda
- AWS::Lambda::EventSourceMapping
- AWS::Serverless::Function
- Create a client machine to access MSK Serverless cluster
- Create authorization policies for the IAM role
- Capturing discarded batches for an Amazon MSK event source
- re:Invent 2024: AWSによるKafkaとLambdaを用いたリアルタイムデータ処理