PythonのStrEnumを使ってSNSトピックにPublishした場合、文字列を受け取れるのか試してみた

PythonのStrEnumを使ってSNSトピックにPublishした場合、文字列を受け取れるのか試してみた

PythonのStrEnumを使ってSNSトピックにメッセージをPublishした場合の動作を試してみました。
Clock Icon2024.09.30

Python 3.11 から、「StrEnum」が追加されました。

StrEnumを使うことで、文字列同士を比較するよりも可読性やメンテナンス性が向上しますし、単なるEnumよりもわかりやすくなります(文字列を扱う場合)。

そして、ふと気になりました。このStrEnumを利用してJSONを作成したり、SNSトピックにPublishしたとき、受け取った側ではどうなっているんだろう?と。
試してみました。

おすすめの方

  • StrEnumの雰囲気を知りたい方
  • StrEnumを利用してJSONを作成したい方
  • StrEnumを利用したJSONをSNSトピックにPublishして、受け取った場合にどうなるか知りたい方

LambdaとSNSをデプロイする

sam init

sam init \
    --runtime python3.11 \
    --name lambda-sns-strenum-sample \
    --app-template hello-world \
    --no-tracing \
    --no-application-insights \
    --structured-logging \
    --package-type Zip

SAMテンプレート

1つのSNSトピックと2つのLambdaを作成します。Lambdaは、SNSトピックをPublishするLambdaとSubscribeするLambdaを用意します。

template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: lambda-sns-strenum-sample

Globals:
  Function:
    Timeout: 3
    LoggingConfig:
      LogFormat: JSON

Resources:
  SampleTopic:
    Type: AWS::SNS::Topic

  PublishTopicFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: publish.lambda_handler
      Runtime: python3.11
      Architectures:
      - x86_64
      Policies:
        - arn:aws:iam::aws:policy/AmazonSNSFullAccess
      Environment:
        Variables:
          TOPIC_ARN: !Ref SampleTopic

  PublishTopicFunctionLogGroup:
        Type: AWS::Logs::LogGroup
        Properties:
          LogGroupName: !Sub /aws/lambda/${PublishTopicFunction}

  SubscribeTopicFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: subscribe.lambda_handler
      Runtime: python3.11
      Architectures:
      - x86_64
      Events:
        HelloWorld:
          Type: SNS
          Properties:
            Topic: !Ref SampleTopic

  SubscribeTopicFunctionLogGroup:
        Type: AWS::Logs::LogGroup
        Properties:
          LogGroupName: !Sub /aws/lambda/${SubscribeTopicFunction}

Lambdaコード

StrEnumを作る

適当に作ります。

fruit.py
from enum import StrEnum

class Fruit(StrEnum):
    APPLE = "apple"
    BANANA = "banana"
    ORANGE = "orange"

SNSトピックをPublishするLambda

publish.py
import os
import json
import boto3
from fruit import Fruit

client = boto3.client("sns")

def lambda_handler(event, context):
    client.publish(
        TopicArn=os.environ["TOPIC_ARN"],
        Message=json.dumps({"fruit": Fruit.APPLE, "hello": "world"}),
    )

SNSトピックをSubscribeするLambda

受け取ったSNSトピックのメッセージをそのまま出力したり、StrEnumと比較ができるかを試してみます。

subscribe.py
import json
from fruit import Fruit

def lambda_handler(event, context):
    data = json.loads(event["Records"][0]["Sns"]["Message"])

    print(data)

    if data["fruit"] == Fruit.APPLE:
        print("Apple")
    else:
        print("Not Apple")

デプロイ

sam deploy \
    --guided \
    --region ap-northeast-1 \
    --stack-name lambda-sns-strenum-sample-stack

動作を確認する

SNSトピックをPublishするLambdaを実行して、とSubscribeするLambdaのログを確認します。

受け取ったSNSトピックのメッセージは、しっかりと文字列になっていました。また、StrEnumとの比較もできました。

01_cloudwatch_logs

参考

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.