Amazon MSKでSASL/SCRAM認証を試してみた
データ事業本部のueharaです。
今回は、Amazon MSKで SASL/SCRAM認証 を試してみたいと思います。
はじめに
前回、前々回とAmazon MSKに関する記事を出しましたが、その際認証は「IAM認証」を使用していました。
別の認証方式も試してみたいため、IAM認証ではなく「SASL/SCRAM認証」を利用してAmazon MSKへ接続してみたいと思います。
Amazon MSKの認証方式
Amazon MSKの認証には、以下の3つが存在します。
- IAM認証
- SASL/SCRAM認証
- 相互TLS認証
※ただし、Amazon MSK ServerlessはIAM認証のみ
IAM認証についてはその名の通りですが、SASL/SCRAM認証と相互TLS認証についての概要は以下の通りです。
項目 | SASL/SCRAM認証 | 相互TLS認証 |
---|---|---|
認証方式 | ユーザー名・パスワード | クライアント証明書 |
管理サービス | AWS Secrets Manager | AWS Certificate Manager |
セキュリティレベル | 標準 | 高 |
設定の複雑さ | 比較的簡単 | やや複雑 |
運用負荷 | パスワード管理 | 証明書管理 |
したがって、一般的なユーザー名・パスワード認証で認証を行いたい場合は SASL/SCRAM認証 が選択肢となります。
SASL/SCRAMは、RFC 5802で定義されており、SCRAMはセキュリティで保護されたハッシュアルゴリズムを使用するため、クライアントとサーバー間でプレーンテキストの認証情報を送信しない形となります。
SASL/SCRAM認証を利用する場合の制限事項
Amazon MSKでSASL/SCRAM認証を利用する場合、以下のような制限事項があります。
- Amazon MSKは
SCRAM-SHA-512
認証のみをサポート - MSKクラスターに紐づけられるのは最大1000ユーザーまで
- 1度のAPI実行で紐づけられるのは最大10ユーザーまで
- シークレットはKMSで暗号化する必要があるが、Secrets Managerのデフォルトの暗号化キーを利用することはできない
- シークレットの暗号化にKMSの公開鍵暗号方式を利用することはできない
- MSKクラスターに関連付けるシークレットの名前には、
Amazon MSK_
というPrefixが必要 - MSKクラスターに関連付けられたシークレットは、クラスターと同じAWSアカウントおよびリージョンに存在する必要がある
ファイル準備
今回作成するリソースは以下の通りです。
.
├── cfn
│ ├── msk_infra.yml
│ └── secrets_manager.yml
└── sam
├── function
│ └── msk_processor.py
├── samconfig.toml
└── template.yaml
まず、 secrets_manager.yml
は今回作成するユーザーのユーザー名とパスワードをSecrets ManagerにデプロイするためのCloudFormationテンプレートとなります。
AWSTemplateFormatVersion: 2010-09-09
Description: MSK User Credentials for SASL/SCRAM
Parameters:
Username:
Description: Username for MSK Cluster
Default: test-user
Type: String
Password:
Description: Password for MSK Cluster
Type: String
NoEcho: true
Resources:
# KMSキーの作成
MSKSecretsKMSKey:
Type: AWS::KMS::Key
Properties:
Description: "KMS Key for MSK Secrets Manager"
# KMSキーエイリアス
MSKSecretsKMSKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: !Sub "alias/uehara-msk-secrets"
TargetKeyId: !Ref MSKSecretsKMSKey
# Secrets Managerシークレット
MSKUserSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: "AmazonMSK_test_user"
Description: "MSK SASL/SCRAM user credentials"
KmsKeyId: !Ref MSKSecretsKMSKey
SecretString: !Sub |
{
"username": "${Username}",
"password": "${Password}"
}
前述の制限事項のようにKMSのマネージドキーが利用できなかったり、シークレット名が AmazonMSK_
で始まらないといけないなどの制限があるので、そちらに沿うように作成しています。
msk_infra.yml
はAmazon MSKをデプロイするためのインフラ関連の定義をしたCloudFormationテンプレートです。
具体的には以下のような構成でデプロイを行います。
テンプレートファイルは以下の通りです。(少し長いため折りたたんでおきます)
msk_infra.yml
AWSTemplateFormatVersion: 2010-09-09
Description: Infrastructure for MSK Cluster
Resources:
# VPC
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.10.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: uehara-msk-vpc
# パブリックサブネット (AZ: ap-northeast-1a)
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.128/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: msk-public-subnet-1a
# プライベートサブネット1 (AZ: ap-northeast-1a)
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.0/26
AvailabilityZone: ap-northeast-1a
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: msk-private-subnet-1a
# プライベートサブネット2 (AZ: ap-northeast-1c)
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.10.0.64/26
AvailabilityZone: ap-northeast-1c
MapPublicIpOnLaunch: false
Tags:
- Key: Name
Value: msk-private-subnet-1c
# インターネットゲートウェイ
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: msk-internet-gateway
# インターネットゲートウェイをVPCにアタッチ
AttachGateway:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
# NAT Gateway用のElastic IP
NATGatewayEIP:
Type: AWS::EC2::EIP
DependsOn: AttachGateway
Properties:
Domain: vpc
Tags:
- Key: Name
Value: msk-nat-gateway-eip
# NAT Gateway
NATGateway:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NATGatewayEIP.AllocationId
SubnetId: !Ref PublicSubnet1
Tags:
- Key: Name
Value: msk-nat-gateway
# パブリックルートテーブル
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: msk-public-route-table
# パブリックルート(インターネットゲートウェイへ)
PublicRoute:
Type: AWS::EC2::Route
DependsOn: AttachGateway
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
# パブリックサブネットとルートテーブルの関連付け
PublicSubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PublicSubnet1
RouteTableId: !Ref PublicRouteTable
# プライベートルートテーブル
PrivateRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: msk-private-route-table
# プライベートルート(NAT Gatewayへ)
PrivateRoute:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NATGateway
# プライベートサブネット1とルートテーブルの関連付け
PrivateSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref PrivateRouteTable
# プライベートサブネット2とルートテーブルの関連付け
PrivateSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable
# セキュリティグループ
MSKSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for MSK cluster
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 9096
ToPort: 9096
CidrIp: 10.10.0.0/16
Description: MSK Kafka protocol (VPC access)
Tags:
- Key: Name
Value: msk-cluster-sg
# MSK クラスター
MSKCluster:
Type: AWS::MSK::Cluster
Properties:
BrokerNodeGroupInfo:
ClientSubnets:
- !Ref PrivateSubnet1
- !Ref PrivateSubnet2
InstanceType: kafka.t3.small
SecurityGroups: [!GetAtt MSKSecurityGroup.GroupId]
StorageInfo:
EBSStorageInfo:
VolumeSize: 100
ClusterName: uehara-msk-cluster
EncryptionInfo:
EncryptionInTransit:
ClientBroker: TLS
InCluster: true
# SASL/SCRAM認証
ClientAuthentication:
Sasl:
Scram:
Enabled: true
EnhancedMonitoring: DEFAULT
KafkaVersion: 3.9.x
NumberOfBrokerNodes: 2
Outputs:
VpcId:
Description: ID of the created VPC
Value: !Ref VPC
Export:
Name: !Sub "${AWS::StackName}-VpcId"
MSKClusterArn:
Description: ARN of the MSK cluster
Value: !GetAtt MSKCluster.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKClusterArn"
PrivateSubnet1Id:
Description: ID of Private Subnet 1
Value: !Ref PrivateSubnet1
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet1"
PrivateSubnet2Id:
Description: ID of Private Subnet 2
Value: !Ref PrivateSubnet2
Export:
Name: !Sub "${AWS::StackName}-PrivateSubnet2"
SecurityGroupId:
Description: ID of MSK Security Group
Value: !Ref MSKSecurityGroup
Export:
Name: !Sub "${AWS::StackName}-MSKSecurityGroup"
PublicSubnet1Id:
Description: ID of Public Subnet 1
Value: !Ref PublicSubnet1
Export:
Name: !Sub "${AWS::StackName}-PublicSubnet1"
InternetGatewayId:
Description: ID of Internet Gateway
Value: !Ref InternetGateway
Export:
Name: !Sub "${AWS::StackName}-InternetGateway"
NATGatewayId:
Description: ID of NAT Gateway
Value: !Ref NATGateway
Export:
Name: !Sub "${AWS::StackName}-NATGateway"
NATGatewayEIPId:
Description: ID of NAT Gateway Elastic IP
Value: !Ref NATGatewayEIP
Export:
Name: !Sub "${AWS::StackName}-NATGatewayEIP"
PublicRouteTableId:
Description: ID of Public Route Table
Value: !Ref PublicRouteTable
Export:
Name: !Sub "${AWS::StackName}-PublicRouteTable"
PrivateRouteTableId:
Description: ID of Private Route Table
Value: !Ref PrivateRouteTable
Export:
Name: !Sub "${AWS::StackName}-PrivateRouteTable"
ポイントはAmazon MSKのクラスター定義で、以下の通りSASL/SCRAM認証を設定している部分です。
# SASL/SCRAM認証
ClientAuthentication:
Sasl:
Scram:
Enabled: true
実際のユーザーの紐づけはCloudFormationでは対応しておらず、AWS CLIまたはAWSマネジメントコンソール上での操作となるため、後ほどデプロイ時に対応します。
MSKのメッセージの受信はLambdaで行いたいため、アプリケーション部分はAWS SAMで定義をしております。
samconfig.toml
についてはスタック名やデプロイ用バケット等AWS SAMの設定ファイルとなります。
version = 0.1
[default]
region = "ap-northeast-1"
[default.build.parameters]
debug = true
[default.deploy.parameters]
stack_name = "uehara-msk-test-function"
s3_bucket = "<デプロイ用のS3バケット名>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true
template.yaml
はリソースを定義したテンプレートファイルです。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application"
Parameters:
MSKClusterArn:
Type: String
Description: ARN of the MSK Cluster
MSKUserSecretArn:
Type: String
Description: ARN of the MSK Secrets
Resources:
# Lambda実行ロール
MSKProcessorExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: msk-processor-execution-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
- arn:aws:iam::aws:policy/SecretsManagerReadWrite
Policies:
- PolicyName: MSKEventSourceMappingPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
# SCRAMシークレット関連の権限
- Effect: Allow
Action:
- kafka:ListScramSecrets
- kafka:BatchAssociateScramSecret
- kafka:BatchDisassociateScramSecret
Resource: "*"
# Lambda関数
MSKProcessorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: msk-processor-function
CodeUri: function/
Handler: msk_processor.lambda_handler
Runtime: python3.11
Architectures:
- x86_64
Timeout: 300
MemorySize: 512
Role: !GetAtt MSKProcessorExecutionRole.Arn
Layers:
- "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
Environment:
Variables:
LOG_LEVEL: INFO
Events:
MSKEvent:
Type: MSK
Properties:
Stream: !Ref MSKClusterArn
StartingPosition: LATEST
Topics:
- test-topic
BatchSize: 5
# SASL/SCRAM認証設定
SourceAccessConfigurations:
- Type: SASL_SCRAM_512_AUTH
URI: !Ref MSKUserSecretArn
Outputs:
MSKProcessorFunctionArn:
Description: ARN of the MSK Processor Lambda function
Value: !GetAtt MSKProcessorFunction.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"
MSKProcessorExecutionRoleArn:
Description: ARN of the MSK Processor Lambda execution role
Value: !GetAtt MSKProcessorExecutionRole.Arn
Export:
Name: !Sub "${AWS::StackName}-MSKProcessorExecutionRoleArn"
Amazon MSKへのメッセージのポーリングについては、おなじみのイベントソースマッピングで定義をしています。
SASL/SCRAM認証を設定しているのは以下の部分です。
SourceAccessConfigurations:
- Type: SASL_SCRAM_512_AUTH
URI: !Ref MSKUserSecretArn
冒頭の制限事項で示した通り、Amazon MSKは SCRAM-SHA-512
のみのサポートとなります。
最後に、 msk_processor.py
はイベントソースマッピングから起動するLambda関数のスクリプトファイルです。
import base64
import json
import logging
from typing import Any, Dict, List
# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_kafka_record(record: Dict[str, Any], topic_name: str) -> Dict[str, Any]:
try:
# Kafkaメッセージのデコード
if "value" in record:
# Base64でエンコードされたメッセージをデコード
message_bytes = base64.b64decode(record["value"])
message_str = message_bytes.decode("utf-8")
try:
# JSONとしてパース試行
message_data = json.loads(message_str)
except json.JSONDecodeError:
# JSONでない場合は文字列として扱う
message_data = message_str
else:
message_data = None
# メタデータの抽出
partition = record.get("partition", "unknown")
offset = record.get("offset", "unknown")
timestamp = record.get("timestamp", "unknown")
logger.info(
f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
)
logger.info(f"Message content: {message_data}")
# ここでビジネスロジックを実装
processed_data = {
"topic": topic_name,
"partition": partition,
"offset": offset,
"timestamp": timestamp,
"original_message": message_data,
"processing_status": "success",
}
return processed_data
except Exception as e:
logger.error(f"Error processing record: {str(e)}")
return {
"topic": topic_name,
"partition": record.get("partition", "unknown"),
"offset": record.get("offset", "unknown"),
"error": str(e),
"processing_status": "error",
}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
logger.info(f"Received event: {json.dumps(event, default=str)}")
processed_records = []
try:
# MSKイベントの構造: event['records']にトピック別のレコードが含まれる
if "records" in event:
for topic_name, topic_records in event["records"].items():
logger.info(f"Processing topic: {topic_name}")
for record in topic_records:
processed_record = process_kafka_record(record, topic_name)
processed_records.append(processed_record)
logger.info(f"Successfully processed {len(processed_records)} records")
return {
"statusCode": 200,
"body": json.dumps(
{
"message": f"Successfully processed {len(processed_records)} records",
"processed_records": len(processed_records),
}
),
}
except Exception as e:
logger.error(f"Error processing MSK event: {str(e)}")
raise
イベントソースマッピングでLambdaのソースとしてMSKを直接指定し、イベントを受信する想定で記述をしています。
サンプルスクリプトのため特にビジネスロジックは実装しておらず、受け取ったメッセージをログに出力するのみとしています。
デプロイ
ファイルの用意ができたら、以下手順でデプロイを行います。
Secrets Managerのデプロイ
まず、 secrets_manager.yml
でSecrets Managerのデプロイを行います。
Username
, Password
はAmazon MSKに設定するユーザーのユーザー名とパスワードになります。
Amazon MSK関連リソースのデプロイ
次に、 msk_infra.yml
でAmazon MSK関連リソースのデプロイを行います。
こちらは特にパラメーターの設定はありません。
Amazon MSKとシークレットの紐づけ
前段の作業でMSKクラスターが作成されているので、AWSマネジメントコンソール上でアクセスしてみると、以下のようにシークレットとクラスターの紐づけに関する案内が表示されているかと思います。
『シークレットの関連付け』から遷移したページで『シークレットの選択』ボタンを押下すると、以下のように作成したシークレットが表示されるので選択します。
選択後『シークレットの関連付け』ボタンを押下すると、以下の通り関連付けられた旨のメッセージが表示されます。
クラスターのプロパティを確認すると、以下の通りSASL/SCRAM認証が有効になっており、Secrets Managerの関連付けられたシークレットが確認できます。
新しくユーザーを作成したい場合は、再度このプロパティからシークレットの紐づけが可能です。
Lambdaのデプロイ
最後に、メッセージを受信するLambdaをデプロイします。
作成した sam
ディレクトリ配下で以下のコマンドを実行します。
$ sam deploy --parameter-overrides "MSKClusterArn=<作成したMSKクラスターのARN> MSKUserSecretArn=<作成したシークレットのARN>"
デプロイが完了すると、以下の通りMSKをイベントソースとしたLambdaが作成されます。
動作確認
MSKクラスターをデプロイしたVPCおよびサブネットで、CloudShellを起動します。(冒頭で示した以前の記事と同様です)
CloudShellが起動したら、Kafka CLIのダウンロードやSASL/SCRAM認証のための設定を行います。
# CloudShellでKafka CLIツールをダウンロード
$ wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
$ tar -xzf kafka_2.13-3.9.1.tgz
# クライアントの設定ファイルを作成
$ cat > client.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<設定したユーザー名>" \
password="<設定したパスワード>";
EOF
準備ができたら、トピックの作成とメッセージの送信を行います。
# トピックの作成
$ ./kafka_2.13-3.9.1/bin/kafka-topics.sh \
--bootstrap-server <MSKクラスターのBootstrap Serverのホスト名>:9096 \
--command-config client.properties \
--create \
--topic test-topic \
--partitions 3
# メッセージ送信
echo '{"message": "Hello Kafka"}' |
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
--bootstrap-server <MSKクラスターのBootstrap Serverのホスト名>:9096 \
--producer.config client.properties \
--topic test-topic
Lambdaのログを確認すると、以下の通り無事メッセージの受信ができていました。
最後に
今回は、Amazon MSKでSASL/SCRAM認証を試してみました。
参考になりましたら幸いです。