Amazon MSKでSASL/SCRAM認証を試してみた

Amazon MSKでSASL/SCRAM認証を試してみた

2025.09.09

データ事業本部のueharaです。

今回は、Amazon MSKで SASL/SCRAM認証 を試してみたいと思います。

はじめに

前回、前々回とAmazon MSKに関する記事を出しましたが、その際認証は「IAM認証」を使用していました。

https://dev.classmethod.jp/articles/msk-serverless-lambda-eventbridge-pipes/

https://dev.classmethod.jp/articles/msk-cross-account-lambda/

別の認証方式も試してみたいため、IAM認証ではなく「SASL/SCRAM認証」を利用してAmazon MSKへ接続してみたいと思います。

Amazon MSKの認証方式

Amazon MSKの認証には、以下の3つが存在します。

  • IAM認証
  • SASL/SCRAM認証
  • 相互TLS認証

※ただし、Amazon MSK ServerlessはIAM認証のみ

IAM認証についてはその名の通りですが、SASL/SCRAM認証と相互TLS認証についての概要は以下の通りです。

項目 SASL/SCRAM認証 相互TLS認証
認証方式 ユーザー名・パスワード クライアント証明書
管理サービス AWS Secrets Manager AWS Certificate Manager
セキュリティレベル 標準
設定の複雑さ 比較的簡単 やや複雑
運用負荷 パスワード管理 証明書管理

したがって、一般的なユーザー名・パスワード認証で認証を行いたい場合は SASL/SCRAM認証 が選択肢となります。

SASL/SCRAMは、RFC 5802で定義されており、SCRAMはセキュリティで保護されたハッシュアルゴリズムを使用するため、クライアントとサーバー間でプレーンテキストの認証情報を送信しない形となります。

SASL/SCRAM認証を利用する場合の制限事項

Amazon MSKでSASL/SCRAM認証を利用する場合、以下のような制限事項があります。

  • Amazon MSKは SCRAM-SHA-512 認証のみをサポート
  • MSKクラスターに紐づけられるのは最大1000ユーザーまで
  • 1度のAPI実行で紐づけられるのは最大10ユーザーまで
  • シークレットはKMSで暗号化する必要があるが、Secrets Managerのデフォルトの暗号化キーを利用することはできない
  • シークレットの暗号化にKMSの公開鍵暗号方式を利用することはできない
  • MSKクラスターに関連付けるシークレットの名前には、 Amazon MSK_ というPrefixが必要
  • MSKクラスターに関連付けられたシークレットは、クラスターと同じAWSアカウントおよびリージョンに存在する必要がある

ファイル準備

今回作成するリソースは以下の通りです。

			
			.
├── cfn
│   ├── msk_infra.yml
│   └── secrets_manager.yml
└── sam
    ├── function
    │   └── msk_processor.py
    ├── samconfig.toml
    └── template.yaml

		

まず、 secrets_manager.yml は今回作成するユーザーのユーザー名とパスワードをSecrets ManagerにデプロイするためのCloudFormationテンプレートとなります。

secrets_manager.yml
			
			AWSTemplateFormatVersion: 2010-09-09
Description: MSK User Credentials for SASL/SCRAM

Parameters:
  Username:
    Description: Username for MSK Cluster
    Default: test-user
    Type: String
  Password:
    Description: Password for MSK Cluster
    Type: String
    NoEcho: true

Resources:
  # KMSキーの作成
  MSKSecretsKMSKey:
    Type: AWS::KMS::Key
    Properties:
      Description: "KMS Key for MSK Secrets Manager"

  # KMSキーエイリアス
  MSKSecretsKMSKeyAlias:
    Type: AWS::KMS::Alias
    Properties:
      AliasName: !Sub "alias/uehara-msk-secrets"
      TargetKeyId: !Ref MSKSecretsKMSKey

  # Secrets Managerシークレット
  MSKUserSecret:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: "AmazonMSK_test_user"
      Description: "MSK SASL/SCRAM user credentials"
      KmsKeyId: !Ref MSKSecretsKMSKey
      SecretString: !Sub |
        {
          "username": "${Username}",
          "password": "${Password}"
        }

		

前述の制限事項のようにKMSのマネージドキーが利用できなかったり、シークレット名が AmazonMSK_ で始まらないといけないなどの制限があるので、そちらに沿うように作成しています。

msk_infra.yml はAmazon MSKをデプロイするためのインフラ関連の定義をしたCloudFormationテンプレートです。

具体的には以下のような構成でデプロイを行います。

0909_msk_01

テンプレートファイルは以下の通りです。(少し長いため折りたたんでおきます)

msk_infra.yml
msk_infra.yml
			
			AWSTemplateFormatVersion: 2010-09-09
Description: Infrastructure for MSK Cluster

Resources:
  # VPC
  VPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.10.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: uehara-msk-vpc

  # パブリックサブネット (AZ: ap-northeast-1a)
  PublicSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.128/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: true
      Tags:
        - Key: Name
          Value: msk-public-subnet-1a

  # プライベートサブネット1 (AZ: ap-northeast-1a)
  PrivateSubnet1:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.0/26
      AvailabilityZone: ap-northeast-1a
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: msk-private-subnet-1a

  # プライベートサブネット2 (AZ: ap-northeast-1c)
  PrivateSubnet2:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref VPC
      CidrBlock: 10.10.0.64/26
      AvailabilityZone: ap-northeast-1c
      MapPublicIpOnLaunch: false
      Tags:
        - Key: Name
          Value: msk-private-subnet-1c

  # インターネットゲートウェイ
  InternetGateway:
    Type: AWS::EC2::InternetGateway
    Properties:
      Tags:
        - Key: Name
          Value: msk-internet-gateway

  # インターネットゲートウェイをVPCにアタッチ
  AttachGateway:
    Type: AWS::EC2::VPCGatewayAttachment
    Properties:
      VpcId: !Ref VPC
      InternetGatewayId: !Ref InternetGateway

  # NAT Gateway用のElastic IP
  NATGatewayEIP:
    Type: AWS::EC2::EIP
    DependsOn: AttachGateway
    Properties:
      Domain: vpc
      Tags:
        - Key: Name
          Value: msk-nat-gateway-eip

  # NAT Gateway
  NATGateway:
    Type: AWS::EC2::NatGateway
    Properties:
      AllocationId: !GetAtt NATGatewayEIP.AllocationId
      SubnetId: !Ref PublicSubnet1
      Tags:
        - Key: Name
          Value: msk-nat-gateway

  # パブリックルートテーブル
  PublicRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: msk-public-route-table

  # パブリックルート(インターネットゲートウェイへ)
  PublicRoute:
    Type: AWS::EC2::Route
    DependsOn: AttachGateway
    Properties:
      RouteTableId: !Ref PublicRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      GatewayId: !Ref InternetGateway

  # パブリックサブネットとルートテーブルの関連付け
  PublicSubnetRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PublicSubnet1
      RouteTableId: !Ref PublicRouteTable

  # プライベートルートテーブル
  PrivateRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref VPC
      Tags:
        - Key: Name
          Value: msk-private-route-table

  # プライベートルート(NAT Gatewayへ)
  PrivateRoute:
    Type: AWS::EC2::Route
    Properties:
      RouteTableId: !Ref PrivateRouteTable
      DestinationCidrBlock: 0.0.0.0/0
      NatGatewayId: !Ref NATGateway

  # プライベートサブネット1とルートテーブルの関連付け
  PrivateSubnet1RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet1
      RouteTableId: !Ref PrivateRouteTable

  # プライベートサブネット2とルートテーブルの関連付け
  PrivateSubnet2RouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet2
      RouteTableId: !Ref PrivateRouteTable

  # セキュリティグループ
  MSKSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for MSK cluster
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 9096
          ToPort: 9096
          CidrIp: 10.10.0.0/16
          Description: MSK Kafka protocol (VPC access)
      Tags:
        - Key: Name
          Value: msk-cluster-sg

  # MSK クラスター
  MSKCluster:
    Type: AWS::MSK::Cluster
    Properties:
      BrokerNodeGroupInfo:
        ClientSubnets:
          - !Ref PrivateSubnet1
          - !Ref PrivateSubnet2
        InstanceType: kafka.t3.small
        SecurityGroups: [!GetAtt MSKSecurityGroup.GroupId]
        StorageInfo:
          EBSStorageInfo:
            VolumeSize: 100
      ClusterName: uehara-msk-cluster
      EncryptionInfo:
        EncryptionInTransit:
          ClientBroker: TLS
          InCluster: true
      # SASL/SCRAM認証
      ClientAuthentication:
        Sasl:
          Scram:
            Enabled: true
      EnhancedMonitoring: DEFAULT
      KafkaVersion: 3.9.x
      NumberOfBrokerNodes: 2

Outputs:
  VpcId:
    Description: ID of the created VPC
    Value: !Ref VPC
    Export:
      Name: !Sub "${AWS::StackName}-VpcId"

  MSKClusterArn:
    Description: ARN of the MSK cluster
    Value: !GetAtt MSKCluster.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKClusterArn"

  PrivateSubnet1Id:
    Description: ID of Private Subnet 1
    Value: !Ref PrivateSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet1"

  PrivateSubnet2Id:
    Description: ID of Private Subnet 2
    Value: !Ref PrivateSubnet2
    Export:
      Name: !Sub "${AWS::StackName}-PrivateSubnet2"

  SecurityGroupId:
    Description: ID of MSK Security Group
    Value: !Ref MSKSecurityGroup
    Export:
      Name: !Sub "${AWS::StackName}-MSKSecurityGroup"

  PublicSubnet1Id:
    Description: ID of Public Subnet 1
    Value: !Ref PublicSubnet1
    Export:
      Name: !Sub "${AWS::StackName}-PublicSubnet1"

  InternetGatewayId:
    Description: ID of Internet Gateway
    Value: !Ref InternetGateway
    Export:
      Name: !Sub "${AWS::StackName}-InternetGateway"

  NATGatewayId:
    Description: ID of NAT Gateway
    Value: !Ref NATGateway
    Export:
      Name: !Sub "${AWS::StackName}-NATGateway"

  NATGatewayEIPId:
    Description: ID of NAT Gateway Elastic IP
    Value: !Ref NATGatewayEIP
    Export:
      Name: !Sub "${AWS::StackName}-NATGatewayEIP"

  PublicRouteTableId:
    Description: ID of Public Route Table
    Value: !Ref PublicRouteTable
    Export:
      Name: !Sub "${AWS::StackName}-PublicRouteTable"

  PrivateRouteTableId:
    Description: ID of Private Route Table
    Value: !Ref PrivateRouteTable
    Export:
      Name: !Sub "${AWS::StackName}-PrivateRouteTable"

		

ポイントはAmazon MSKのクラスター定義で、以下の通りSASL/SCRAM認証を設定している部分です。

			
			# SASL/SCRAM認証
ClientAuthentication:
  Sasl:
    Scram:
      Enabled: true

		

実際のユーザーの紐づけはCloudFormationでは対応しておらず、AWS CLIまたはAWSマネジメントコンソール上での操作となるため、後ほどデプロイ時に対応します。

MSKのメッセージの受信はLambdaで行いたいため、アプリケーション部分はAWS SAMで定義をしております。

samconfig.toml についてはスタック名やデプロイ用バケット等AWS SAMの設定ファイルとなります。

samconfig.toml
			
			version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "uehara-msk-test-function"
s3_bucket = "<デプロイ用のS3バケット名>"
s3_prefix = "sam-deploy"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
resolve_image_repos = true

		

template.yaml はリソースを定義したテンプレートファイルです。

template.yaml
			
			AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "SAM Application"

Parameters:
  MSKClusterArn:
    Type: String
    Description: ARN of the MSK Cluster
  MSKUserSecretArn:
    Type: String
    Description: ARN of the MSK Secrets

Resources:
  # Lambda実行ロール
  MSKProcessorExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: msk-processor-execution-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
        - arn:aws:iam::aws:policy/SecretsManagerReadWrite
      Policies:
        - PolicyName: MSKEventSourceMappingPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              # SCRAMシークレット関連の権限
              - Effect: Allow
                Action:
                  - kafka:ListScramSecrets
                  - kafka:BatchAssociateScramSecret
                  - kafka:BatchDisassociateScramSecret
                Resource: "*"

  # Lambda関数
  MSKProcessorFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: msk-processor-function
      CodeUri: function/
      Handler: msk_processor.lambda_handler
      Runtime: python3.11
      Architectures:
        - x86_64
      Timeout: 300
      MemorySize: 512
      Role: !GetAtt MSKProcessorExecutionRole.Arn
      Layers:
        - "arn:aws:lambda:ap-northeast-1:336392948345:layer:AWSSDKPandas-Python311:22"
      Environment:
        Variables:
          LOG_LEVEL: INFO
      Events:
        MSKEvent:
          Type: MSK
          Properties:
            Stream: !Ref MSKClusterArn
            StartingPosition: LATEST
            Topics:
              - test-topic
            BatchSize: 5
            # SASL/SCRAM認証設定
            SourceAccessConfigurations:
              - Type: SASL_SCRAM_512_AUTH
                URI: !Ref MSKUserSecretArn

Outputs:
  MSKProcessorFunctionArn:
    Description: ARN of the MSK Processor Lambda function
    Value: !GetAtt MSKProcessorFunction.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorFunctionArn"

  MSKProcessorExecutionRoleArn:
    Description: ARN of the MSK Processor Lambda execution role
    Value: !GetAtt MSKProcessorExecutionRole.Arn
    Export:
      Name: !Sub "${AWS::StackName}-MSKProcessorExecutionRoleArn"

		

Amazon MSKへのメッセージのポーリングについては、おなじみのイベントソースマッピングで定義をしています。

SASL/SCRAM認証を設定しているのは以下の部分です。

			
			SourceAccessConfigurations:
  - Type: SASL_SCRAM_512_AUTH
    URI: !Ref MSKUserSecretArn

		

冒頭の制限事項で示した通り、Amazon MSKは SCRAM-SHA-512 のみのサポートとなります。

最後に、 msk_processor.py はイベントソースマッピングから起動するLambda関数のスクリプトファイルです。

msk_processor.py
			
			import base64
import json
import logging
from typing import Any, Dict, List

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_kafka_record(record: Dict[str, Any], topic_name: str) -> Dict[str, Any]:
    try:
        # Kafkaメッセージのデコード
        if "value" in record:
            # Base64でエンコードされたメッセージをデコード
            message_bytes = base64.b64decode(record["value"])
            message_str = message_bytes.decode("utf-8")

            try:
                # JSONとしてパース試行
                message_data = json.loads(message_str)
            except json.JSONDecodeError:
                # JSONでない場合は文字列として扱う
                message_data = message_str
        else:
            message_data = None

        # メタデータの抽出
        partition = record.get("partition", "unknown")
        offset = record.get("offset", "unknown")
        timestamp = record.get("timestamp", "unknown")

        logger.info(
            f"Processing message from topic '{topic_name}', partition {partition}, offset {offset}"
        )
        logger.info(f"Message content: {message_data}")

        # ここでビジネスロジックを実装

        processed_data = {
            "topic": topic_name,
            "partition": partition,
            "offset": offset,
            "timestamp": timestamp,
            "original_message": message_data,
            "processing_status": "success",
        }

        return processed_data

    except Exception as e:
        logger.error(f"Error processing record: {str(e)}")
        return {
            "topic": topic_name,
            "partition": record.get("partition", "unknown"),
            "offset": record.get("offset", "unknown"),
            "error": str(e),
            "processing_status": "error",
        }

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    logger.info(f"Received event: {json.dumps(event, default=str)}")

    processed_records = []

    try:
        # MSKイベントの構造: event['records']にトピック別のレコードが含まれる
        if "records" in event:
            for topic_name, topic_records in event["records"].items():
                logger.info(f"Processing topic: {topic_name}")

                for record in topic_records:
                    processed_record = process_kafka_record(record, topic_name)
                    processed_records.append(processed_record)

        logger.info(f"Successfully processed {len(processed_records)} records")

        return {
            "statusCode": 200,
            "body": json.dumps(
                {
                    "message": f"Successfully processed {len(processed_records)} records",
                    "processed_records": len(processed_records),
                }
            ),
        }

    except Exception as e:
        logger.error(f"Error processing MSK event: {str(e)}")
        raise

		

イベントソースマッピングでLambdaのソースとしてMSKを直接指定し、イベントを受信する想定で記述をしています。

サンプルスクリプトのため特にビジネスロジックは実装しておらず、受け取ったメッセージをログに出力するのみとしています。

デプロイ

ファイルの用意ができたら、以下手順でデプロイを行います。

Secrets Managerのデプロイ

まず、 secrets_manager.yml でSecrets Managerのデプロイを行います。

0909_msk_02

Username , Password はAmazon MSKに設定するユーザーのユーザー名とパスワードになります。

Amazon MSK関連リソースのデプロイ

次に、 msk_infra.yml でAmazon MSK関連リソースのデプロイを行います。

こちらは特にパラメーターの設定はありません。

0909_msk_03

Amazon MSKとシークレットの紐づけ

前段の作業でMSKクラスターが作成されているので、AWSマネジメントコンソール上でアクセスしてみると、以下のようにシークレットとクラスターの紐づけに関する案内が表示されているかと思います。

0909_msk_04

『シークレットの関連付け』から遷移したページで『シークレットの選択』ボタンを押下すると、以下のように作成したシークレットが表示されるので選択します。

0909_msk_05

選択後『シークレットの関連付け』ボタンを押下すると、以下の通り関連付けられた旨のメッセージが表示されます。

0909_msk_06

クラスターのプロパティを確認すると、以下の通りSASL/SCRAM認証が有効になっており、Secrets Managerの関連付けられたシークレットが確認できます。

0909_msk_07

新しくユーザーを作成したい場合は、再度このプロパティからシークレットの紐づけが可能です。

Lambdaのデプロイ

最後に、メッセージを受信するLambdaをデプロイします。

作成した sam ディレクトリ配下で以下のコマンドを実行します。

			
			$ sam deploy --parameter-overrides "MSKClusterArn=<作成したMSKクラスターのARN> MSKUserSecretArn=<作成したシークレットのARN>"

		

デプロイが完了すると、以下の通りMSKをイベントソースとしたLambdaが作成されます。

0909_msk_08

動作確認

MSKクラスターをデプロイしたVPCおよびサブネットで、CloudShellを起動します。(冒頭で示した以前の記事と同様です)

CloudShellが起動したら、Kafka CLIのダウンロードやSASL/SCRAM認証のための設定を行います。

			
			# CloudShellでKafka CLIツールをダウンロード
$ wget https://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
$ tar -xzf kafka_2.13-3.9.1.tgz

# クライアントの設定ファイルを作成
$ cat > client.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="<設定したユーザー名>" \
  password="<設定したパスワード>";
EOF

		

準備ができたら、トピックの作成とメッセージの送信を行います。

			
			# トピックの作成
$ ./kafka_2.13-3.9.1/bin/kafka-topics.sh \
  --bootstrap-server <MSKクラスターのBootstrap Serverのホスト名>:9096 \
  --command-config client.properties \
  --create \
  --topic test-topic \
  --partitions 3

# メッセージ送信  
echo '{"message": "Hello Kafka"}' | 
./kafka_2.13-3.9.1/bin/kafka-console-producer.sh \
  --bootstrap-server <MSKクラスターのBootstrap Serverのホスト名>:9096 \
  --producer.config client.properties \
  --topic test-topic

		

Lambdaのログを確認すると、以下の通り無事メッセージの受信ができていました。

0909_msk_09

最後に

今回は、Amazon MSKでSASL/SCRAM認証を試してみました。

参考になりましたら幸いです。

参考文献

この記事をシェアする

FacebookHatena blogX

関連記事