Step FunctionsでPinpointのセグメント作成とキャンペーン配信を自動化してみた

2022.08.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。たかやまです。

今回はS3にPinpointのセグメント情報が配置されたら、自動でインポートからキャンペーンを配信するStep Functionsを作っていこうと思います。

今回の検証環境のサンプルコードはこちらです。(Pinpointのプロジェクトは含まず)

構成図

今回の構成は以下の通りです。

やってみた

Lambda

Lambdaはセグメントインポート用とキャンペーン作成用の2つ作成します。

セグメントインポート用ではS3のEventBridge通知からS3ファイルパス抽出と日付プレフィックスを付与したセグメントを作成しています。

EventBridge通知例
{
  "version": "0",
  "id": "05902619-81be-71bb-4029-f9629997a5ef",
  "detail-type": "Object Created",
  "source": "aws.s3",
  "account": "YOUR_ACCOUNT",
  "time": "2022-08-08T13:46:16Z",
  "region": "ap-northeast-1",
  "resources": [
    "arn:aws:s3:::pinpoint-stepfunctions-dev-s3"
  ],
  "detail": {
    "version": "0",
    "bucket": {
      "name": "pinpoint-stepfunctions-dev-s3"
    },
    "object": {
      "key": "pinpoint_example_import_3.csv",
      "size": 11344,
      "etag": "6abfade02d8367a3fab8e0bf494aec4c",
      "sequencer": "0062F113A853D8F276"
    },
    "request-id": "H38YC5YNWZXXP3N2",
    "requester": "YOUR_ACCOUNT",
    "source-ip-address": "8.37.43.7",
    "reason": "PutObject"
  }
}
import datetime
import logging
import os

import boto3
from botocore.exceptions import ClientError

# Lambda environment variable
application_id = os.environ["APPLICATION_ID"]
pinpoinr_role = os.environ["PINPOINT_ROLE"]
# Data setting
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, "JST")
now = datetime.datetime.now(JST)
date = now.strftime("%m%d")
# Log setting
logger = logging.getLogger(__name__)

client = boto3.client("pinpoint")


def lambda_handler(event, context):

    # Extracted S3 file path
    bucket_name = event["detail"]["bucket"]["name"]
    object_key = event["detail"]["object"]["key"]

    try:
        response = client.create_import_job(
            ApplicationId=application_id,
            ImportJobRequest={
                "DefineSegment": True,
                "Format": "CSV",
                "RegisterEndpoints": True,
                "RoleArn": pinpoinr_role,
                "S3Url": "s3://{}/{}".format(bucket_name, object_key),
                "SegmentName": "{}_segment".format(date),
            },
        )
    except ClientError:
        logger.exception("Could not import segment")
        raise
    else:
        return response

Pinpoint - create_import_job — Boto3 Docs 1.24.46 documentation

以下の部分でS3のEventBridge通知からS3のファイルパス抽出を行っています。

    # Extracted S3 file path
    bucket_name = event["detail"]["bucket"]["name"]
    object_key = event["detail"]["object"]["key"]

キャンペーン作成用ではセグメントインポート処理から連携されるセグメントIDの抽出と日付プレフィックスを付与したキャンペーンを作成しています。

import datetime
import logging
import os

import boto3
from botocore.exceptions import ClientError

# Lambda environment variable
application_id = os.environ["APPLICATION_ID"]
template = os.environ["TEMPLATE"]
# Data setting
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, "JST")
now = datetime.datetime.now(JST)
date = now.strftime("%m%d")
# Log setting
logger = logging.getLogger(__name__)

client = boto3.client("pinpoint")


def lambda_handler(event, context):

    # Extracted Segment Id
    segmetn_id = event["ImportJobResponse"]["Definition"]["SegmentId"]

    try:
        response = client.create_campaign(
            ApplicationId=application_id,
            WriteCampaignRequest={
                "Name": "{}_email_campaign".format(date),
                "Schedule": {
                    "StartTime": "IMMEDIATE",
                },
                "SegmentId": segmetn_id,
                "SegmentVersion": 1,
                "TemplateConfiguration": {
                    "EmailTemplate": {
                        "Name": template,
                    },
                },
            },
        )
    except ClientError:
        logger.exception("Could not create campaign")
        raise
    else:
        return response

Pinpoint - create_campaign — Boto3 Docs 1.24.46 documentation

以下の部分でセグメントインポート処理からセグメントIDの抽出を行っています。

    # Extracted Segment Id
    segmetn_id = event["ImportJobResponse"]["Definition"]["SegmentId"]

ここではEmail向けのキャンペーンを作成していますが、テンプレートの指定を書き換えることでSMS配信を指定することができます。以下の例ではSMS向けの設定をしています。

                "TemplateConfiguration": {
                    "SMSTemplate": {
                        "Name": template,
                    },

Step Functions

ワークフローの全体イメージはこちらです。

Step Functions定義
{
  "StartAt": "import-segment",
  "States": {
    "import-segment": {
      "Next": "parallel",
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Type": "Task",
      "OutputPath": "$.Payload",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:ap-northeast-1:YOUR_AWSACCOUNT_ID:function:pinpoint-stepfunctions-dev-import-segment-function",
        "Payload.$": "$"
      }
    },
    "parallel": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "create-email-campagin",
          "States": {
            "create-email-campagin": {
              "End": true,
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 5,
                  "BackoffRate": 2
                }
              ],
              "Type": "Task",
              "OutputPath": "$.Payload",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "arn:aws:lambda:ap-northeast-1:YOUR_AWSACCOUNT_ID:function:pinpoint-stepfunctions-dev-create-email-campagin-fucntion",
                "Payload.$": "$"
              }
            }
          }
        },
        {
          "StartAt": "create-sms-campagin",
          "States": {
            "create-sms-campagin": {
              "End": true,
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 5,
                  "BackoffRate": 2
                }
              ],
              "Type": "Task",
              "OutputPath": "$.Payload",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "arn:aws:lambda:ap-northeast-1:YOUR_AWSACCOUNT_ID:function:pinpoint-stepfunctions-dev-create-sms-campagin-fucntion",
                "Payload.$": "$"
              }
            }
          }
        }
      ]
    }
  }
}

インポート処理ではLambda Invokeで先程作成したセグメントインポート用のLambdaファンクションを指定します。

出力はフィルタリングして$.Payloadを抽出しています。

次にEmail/SMS向けのキャンペーン作成を並列実行するために、Parallel stateを設定していきます。

最後に、キャンペーン作成用のLambdaファンクションをParallel state配下に設定していきます。
ここではEmailとSMS向けのキャンペーン作成を並列実行しています。

前段のセグメントインポート処理でインポート量が多い場合はLambda実行完了後もPinpoint側でインポート処理が続いていることがあるので、エラー処理で再試行処理を追加していきます。

リトライ時の待機時間の考え方についてはこちらのブログをご参考にしてください。

S3

Step Functionsをトリガーするために、CSVをアップロードするS3バケットのEventBridgeの通知設定を有効にします。

EventBridge

S3にファイルが置かれた場合にStep FunctionsをトリガーするためのEventBridgeを設定します。

イベントパターンにはオブジェクト作成時にトリガーされるよう設定します。

{
  "detail-type": ["Object Created"],
  "source": ["aws.s3"],
  "detail": {
    "bucket": {
      "name": ["YOUR_BUCKET_NAME"]
    }
  }
}

イベントのターゲットにはさきほど作成したStep Functionsを指定します。

実行

サンプルのセグメント情報を記載したCSVファイルをS3にアップロードします。

ChannelType,Address,User.UserAttributes.Name
SMS,+8180xxxxxxxx,たかやまSMS
EMAIL,xxxxx@gmail.com,たかやまメール1
EMAIL,xxxxx@gmail.com,たかやまメール2
EMAIL,xxxxx@classmethod.jp,たかやまメール3

ファイルがアップロードされるとStep Functionsが実行されることが確認できます。

Pinpoint側にも日付プレフィックスのついたセグメントとキャンペーンが作成されていることが確認できます。

メールとSMSの配信も無事確認できました。

Appendix : Pinpoint APIでの実装例

今回Pinpointの作成リソースに日付プレフィックスをつけるためにLambdaを利用しましたが、リソース名が固定値またはアップロードのCSVファイル名を付与するような場合はStep Functionsで直接Pinpoint APIを定義する方法も可能です。

Step Functions定義
{
  "StartAt": "import-segment",
  "States": {
    "import-segment": {
      "Next": "parallel",
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:pinpoint:createImportJob",
      "Parameters": {
        "ApplicationId": "YOUR_PINPOINT_APPLICATION_ID",
        "ImportJobRequest": {
          "DefineSegment": true,
          "Format": "CSV",
          "RegisterEndpoints": true,
          "RoleArn": "arn:aws:iam::YOUR_AWSACCOUNT_ID:role/service-role/pinpoint-events",
          "S3Url.$": "States.Format('s3://{}/{}', $.detail.bucket.name,$.detail.object.key)",
          "SegmentName.$": "States.Format('segment-{}', $.detail.object.key)"
        }
      }
    },
    "parallel": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "create-campaign-for-email-pinpointapi",
          "States": {
            "create-campaign-for-email-pinpointapi": {
              "End": true,
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 5,
                  "BackoffRate": 2
                }
              ],
              "Type": "Task",
              "Resource": "arn:aws:states:::aws-sdk:pinpoint:createCampaign",
              "Parameters": {
                "ApplicationId": "YOUR_PINPOINT_APPLICATION_ID",
                "WriteCampaignRequest": {
                  "Name": "campaign",
                  "Schedule": {
                    "StartTime": "IMMEDIATE"
                  },
                  "SegmentId.$": "$.ImportJobResponse.Definition.SegmentId",
                  "SegmentVersion": 1,
                  "TemplateConfiguration": {
                    "EmailTemplate": {
                      "Name": "test-template"
                    }
                  }
                }
              }
            }
          }
        },
        {
          "StartAt": "create-campaign-for-sms-pinpointapi",
          "States": {
            "create-campaign-for-sms-pinpointapi": {
              "End": true,
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 5,
                  "BackoffRate": 2
                }
              ],
              "Type": "Task",
              "Resource": "arn:aws:states:::aws-sdk:pinpoint:createCampaign",
              "Parameters": {
                "ApplicationId": "YOUR_PINPOINT_APPLICATION_ID",
                "WriteCampaignRequest": {
                  "Name": "campaign",
                  "Schedule": {
                    "StartTime": "IMMEDIATE"
                  },
                  "SegmentId.$": "$.ImportJobResponse.Definition.SegmentId",
                  "SegmentVersion": 1,
                  "TemplateConfiguration": {
                    "SmsTemplate": {
                      "Name": "test-template"
                    }
                  }
                }
              }
            }
          }
        }
      ]
    }
  }
}

最後に

Step Functionsを利用したキャンペーン自動配信の仕組みを作成しました。

今回の例では、セグメントファイルアップデート後に即座にキャンペーン配信されるような流れになりますが、キャンペーン作成時に配信時間の指定もできるのでユースケースに合わせ設定いただければと思います。

以上、たかやまでした。