AthenaとStep Functionsで特定の条件に該当するユーザーを抽出してLINE公式アカウントのオーディエンスを一括作成してみた

AthenaとStep Functionsで特定の条件に該当するユーザーを抽出してLINE公式アカウントのオーディエンスを一括作成してみた

LINE外のシステムで保持ししている顧客情報を元にオーディエンスを作成したいユースケースに対応する構成について検証してみました
Clock Icon2024.08.09

リテールアプリ共創部@大阪の岩田です。

LINE公式アカウントのオーディエンス作成について検証する機会があったので検証した内容について共有させて頂きます。

やること

こんな構成を作ります。

line-audience-statemachine-arc

何かしらのシステムが顧客の属性値とLINEユーザーIDの情報を定期的にS3バケットに出力していると想定し、出力されたファイルから条件に該当するユーザーIDを抽出し、LINEのMessaging APIを利用してユーザーIDアップロード用のオーディエンスを作成する一連の処理をStepFunctionsで構築します。

このブログではシンプルに以下のようなJSONファイルが出力されている前提とし、prefが兵庫県のユーザーIDを抽出してオーディエンスを作ってみます。

{"id":"123", "pref": "大阪府"}
{"id":"456", "pref": "兵庫県"}

サクっと検証するのが目的なのでオーディエンス作成のAPIエンドポイントはJSON指定のエンドポイントを利用します。JSON指定のエンドポイントはファイル指定のエンドポイントと比較すると一括登録可能な件数が少ないのですが、Lambdaを使わなくてもStep FunctionsのHTTPタスクだけで処理が完結するので採用しています。

環境構築

必要な環境は基本的にCDKで構築していきます。Step FunctionsのHTTPタスクでLINEのアクセストークンを利用するためにEvent BridgeのConnectionが必要になりますが、作成時にLINEのクライアントシークレットが必要になるためここだけ事前にAWS CLIで作成します。

Event BridgeのConnection作成

AWS CLIの入力用として以下のJSONファイルを用意します。<LINE公式アカウントのChannel ID><LINE公式アカウントのChannel secret>の部分は自分の環境に合わせて変更してください。

{
    "Name": "line-messaging-api-token",
    "Description": "LINEのアクセストークン取得",
    "AuthorizationType": "OAUTH_CLIENT_CREDENTIALS",
    "AuthParameters": {
        "OAuthParameters": {
            "ClientParameters": {
                "ClientID": "<LINE公式アカウントのChannel ID>",
                "ClientSecret": "<LINE公式アカウントのChannel secret>"
            },
            "AuthorizationEndpoint": "https://api.line.me/v2/oauth/accessToken",
            "HttpMethod": "POST",
            "OAuthHttpParameters": {
                "HeaderParameters": [
                    {
                        "Key": "Content-Type",
                        "Value": "application/x-www-form-urlencoded",
                        "IsValueSecret": false
                    }
                ],
                "BodyParameters": [
                    {
                        "Key": "grant_type",
                        "Value": "client_credentials",
                        "IsValueSecret": false
                    }
                ]
            }
        }
    }
}

JSONファイルが準備できたらAWS CLIでConnectionの作成を行います。

aws events create-connection --cli-input-json file://<先ほど作成したJSONファイル>

作成が完了すると自動的にSecretsManagerのSecretが作成されます。SecretのARNが必要になるので以下のコマンドで取得します

aws events describe-connection --name line-messaging-api-token

以下のようなレスポンスが返却されるのでConnectionArnSecretArnを控えておきましょう

{
    "ConnectionArn": "arn:aws:events:ap-northeast-1:123456789012:connection/line-messaging-api-token/15a76b06-ca2b-4a78-87ad-19f3546a986c",
    "Name": "line-messaging-api-token",
    "Description": "LINEのアクセストークン取得",
    "ConnectionState": "AUTHORIZED",
    "AuthorizationType": "OAUTH_CLIENT_CREDENTIALS",
    "SecretArn": "arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:events!connection/line-messaging-api-token/09ae6d99-59fc-4686-a5a8-15574dd21ad1-oZB4wm",
    ...略
}

CDKのコード準備

各種リソースを作成するCDKコードを書いていきます。

まず先程控えた情報をもとに設定値を取得する処理を作成します。実案件で利用する場合は複数環境の設定を切り替えてりようできるように拡張して下さい。

config.ts
export type Config = {
    connectionArn: string
    connectionName: string
    connectionSecretArn: string
}

export const getConfig = (): Config => {

    return {
        connectionArn: '<先ほど控えたConnectionArn>',
        connectionName: 'line-messaging-api-token',
        connectionSecretArn: '<先ほど控えたSecretArn>',
    }
}

続いてbin配下のコードです。シンプルに設定値の取得とスタックの作成のみを行います。

bin/line-audience.ts
#!/usr/bin/env node
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import { LineAudienceStack } from "../lib/line-audience-stack";
import { getConfig } from "../config";

const app = new cdk.App();
const config = getConfig();

new LineAudienceStack(app, "LineAudienceStack", {
  config,
});

スタックのコンストラクタです。メイン処理はDwhConstructStateMachineConstructに委譲しています。

lib/line-audience-stack.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import { StateMachineConstruct } from "./constructs/statemachine";
import { Config } from "../config";
import { DwhConstruct } from "./constructs/dwh";

type LineAudienceStackProps = cdk.StackProps & {
  config: Config;
};

export class LineAudienceStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: LineAudienceStackProps) {
    super(scope, id, props);

    const dwh = new DwhConstruct(this, "Dwh", {});

    new StateMachineConstruct(this, "StateMachine", {
      athenaWgName: dwh.athenaWgName,
      dbName: dwh.dbName,
      tableName: dwh.tableName,
      connectionArn: props.config.connectionArn,
      connectionName: props.config.connectionName,
      connectionSecretArn: props.config.connectionSecretArn,
    });
  }
}

DwhConstructの中身です。Athenaにクエリするための諸々のリソース作成に加え、テストに利用するためのファイルをassets/line-usersというディレクトリからS3にアップロードする処理も記述しています。

lib/constructs/dwh.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import {
  aws_athena as athena,
  aws_glue as glue,
  aws_s3 as s3,
  aws_s3_deployment as s3_deployment,
} from "aws-cdk-lib";

type DwhConstructProps = {};

export class DwhConstruct extends Construct {
  readonly athenaWgName: string;
  readonly dbName: string;
  readonly tableName: string;

  constructor(scope: Construct, id: string, props: DwhConstructProps) {
    super(scope, id);

    const athenaResultBucket = new s3.Bucket(this, "AthenaResultBucket", {
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      lifecycleRules: [
        {
          expiration: cdk.Duration.days(365),
          abortIncompleteMultipartUploadAfter: cdk.Duration.days(7),
        },
      ],
    });

    this.athenaWgName = "line-wg";

    const athenaWg = new athena.CfnWorkGroup(this, "AthenaWg", {
      name: this.athenaWgName,
      workGroupConfiguration: {
        resultConfiguration: {
          outputLocation: `s3://${athenaResultBucket.bucketName}/athena-results/`,
          encryptionConfiguration: {
            encryptionOption: "SSE_S3",
          },
        },
      },
      state: "ENABLED",
      recursiveDeleteOption: true,
    });

    const lineDBBucket = new s3.Bucket(this, "LineDBBucket", {
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      lifecycleRules: [
        {
          expiration: cdk.Duration.days(365),
          abortIncompleteMultipartUploadAfter: cdk.Duration.days(7),
        },
      ],
    });

    this.dbName = "line-db";
    this.tableName = "line_users";

    new glue.CfnDatabase(this, "GlueDB", {
      catalogId: cdk.Stack.of(this).account,
      databaseInput: {
        name: this.dbName,
      },
    });

    const lineUsersDir = "line_users";

    new s3_deployment.BucketDeployment(this, "DeployLineUsers", {
      destinationBucket: lineDBBucket,
      destinationKeyPrefix: lineUsersDir,
      sources: [s3_deployment.Source.asset("./assets/line-users")],
    });

    new glue.CfnTable(this, "LineUserTable", {
      databaseName: this.dbName,
      catalogId: cdk.Stack.of(this).account,
      tableInput: {
        name: this.tableName,
        parameters: {
          classification: "json",
        },
        storageDescriptor: {
          columns: [
            {
              name: "id",
              type: "string",
            },
            {
              name: "pref",
              type: "string",
            },
          ],
          location: `s3://${lineDBBucket.bucketName}/${lineUsersDir}/`,
          inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
          outputFormat:
            "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
          serdeInfo: {
            serializationLibrary: "org.openx.data.jsonserde.JsonSerDe",
          },
        },
      },
    });
  }
}

続いてStateMachineConstructの中身です

lib/constructs/statemachine.ts
import { Construct } from "constructs";
import {
  aws_iam as iam,
  aws_stepfunctions as sfn,
  aws_events as events,
  aws_secretsmanager as secretsmanager,
} from "aws-cdk-lib";
import { Pass, StateMachine } from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Config } from "../config";

type StateMachineConstructProps = {
  athenaWgName: string;
  dbName: string;
  tableName: string;
} & Pick<Config, "connectionArn" | "connectionName" | "connectionSecretArn">;

export class StateMachineConstruct extends Construct {
  constructor(scope: Construct, id: string, props: StateMachineConstructProps) {
    super(scope, id);

    const sfnRole = new iam.Role(this, "Role", {
      assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
    });
    sfnRole.addToPolicy(
      new iam.PolicyStatement({
        resources: ["*"],
        actions: [
          "glue:GetDatabase",
          "glue:GetTable",
          "states:InvokeHTTPEndpoint",
        ],
      }),
    );

    const query = `
SELECT
    audiences
FROM
    (SELECT 
    	ARRAY_AGG(JSON_PARSE(JSON_OBJECT('id': id))) AS audiences
    FROM
        "${props.dbName}"."${props.tableName}"
    WHERE
        pref = '兵庫県'
    )    
`;

    const connection = events.Connection.fromConnectionAttributes(
      this,
      "HttpConnection",
      {
        connectionArn: props.connectionArn,
        connectionName: props.connectionName,
        connectionSecretArn: props.connectionSecretArn,
      },
    );

    new sfn.StateMachine(this, "Resource", {
      stateMachineName: "CreateLineAudiencesSM",
      comment: "LINEのAudienceを作成するステートマシン",
      role: sfnRole,
      definitionBody: sfn.DefinitionBody.fromChainable(
        new tasks.AthenaStartQueryExecution(this, "StartQueryExecution", {
          queryString: query,
          integrationPattern: sfn.IntegrationPattern.RUN_JOB,
          comment: "AthenaのクエリでLINEユーザーアカウントの一覧を取得する",
          workGroup: props.athenaWgName,
          resultSelector: {
            "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId",
          },
          resultPath: "$.StartQueryExecutionResult",
        })
          .next(
            new tasks.AthenaGetQueryResults(this, "GetQueryResult", {
              queryExecutionId: sfn.JsonPath.stringAt(
                "$.StartQueryExecutionResult.QueryExecutionId",
              ),
              resultSelector: {
                "audiences.$": "$.ResultSet.Rows[1].Data[0].VarCharValue",
              },
              resultPath: "$.GetQueryResultResult",
            }),
          )
          .next(
            new tasks.HttpInvoke(this, "InvokeHTTPEndpoint", {
              apiRoot:
                "https://api.line.me",
              apiEndpoint: sfn.TaskInput.fromText("/v2/bot/audienceGroup/upload"),
              connection: connection,
              method: sfn.TaskInput.fromText("POST"),
              inputPath: "$.GetQueryResultResult.audiences",
              body: sfn.TaskInput.fromObject({
                // TODO 動的な値を設定するようにする
                description: "AthenaとStepFunctionsで自動生成",
                uploadDescription: "YYYY-MM-DD",
                "audiences.$": "States.StringToJson($)",
              }),
            }),
          ),
      ),
    });
  }
}

準備できたらcdk deployでデプロイしましょう。

ステートマシンのポイント解説

作成したステートマシンの処理について簡単に解説します。

Athenaのクエリを実行(StartQueryExecution)

まずAthenaのクエリを実行します。実行しているクエリは以下の通りで、条件に合致するユーザーIDをJSONの配列として取得します。

SELECT
    audiences
FROM
    (SELECT 
    	ARRAY_AGG(JSON_PARSE(JSON_OBJECT('id': id))) AS audiences
    FROM
        "line-db"."line_users"
    WHERE
        pref = '兵庫県'
    )   

元テーブルのデータが以下の状態だとします。

Id pref
123 大阪府
456 兵庫県

この場合クエリの実行結果は以下のような形になります。

audiences
[{"id":"456"}]

Athenaのクエリ実行結果を取得(GetQueryResult)

続いてAthenaのGetQueryResultsで先程のクエリの実行結果を取得します。GetQueryResultsで取得できる生データは以下のようなデータです。

{
    "ResultSet": {
        "Rows": [
            {
                "Data": [
                    {
                        "VarCharValue": "audiences"
                    }
                ]
            },
            {
                "Data": [
                    {
                        "VarCharValue": "[{\"id\":\"456\"}]"
                    }
                ]
            }
        ],
        "ResultSetMetadata": {
            "ColumnInfo": [
                {
                    "CatalogName": "hive",
                    "SchemaName": "",
                    "TableName": "",
                    "Name": "audiences",
                    "Label": "audiences",
                    "Type": "array",
                    "Precision": 0,
                    "Scale": 0,
                    "Nullable": "UNKNOWN",
                    "CaseSensitive": false
                }
            ]
        }
    },
    "UpdateCount": 0
}

必要な情報はLINEユーザーIDの一覧だけなので以下ResultSelectorの記述で必要な情報だけ抽出して後続タスクに引き渡します。

{
    "audiences.$": "$.ResultSet.Rows[1].Data[0].VarCharValue",
}

LINEのMessaging APIでオーディエンスを作成(InvokeHTTPEndpoint)

前段のタスクから"[{\"id\":\"456\"}]"のような文字列が渡されてくるので、この文字列をJSONとしてパースしてLINE Messaging APIのリクエストボディを組み立ててリクエストします。HTTPタスクのパラメータRequestBodyを以下のように記述しJSONのパースを行っています。

"RequestBody": {
  "description": "AthenaとStepFunctionsで自動生成",
  "uploadDescription": "YYYY-MM-DD",
  "audiences.$": "States.StringToJson($)"
}

descriptionuploadDescriptionは固定値を埋め込んでいますが、実案件ではステートマシンの入力値から動的に生成するような形になるでしょう。

やってみる

実際にステートマシンを実行してLINE公式アカウントのオーディエンスが作成されることを確認してみましょう。

line-audience-statemachine-result

問題なく正常終了しました。このあとLINE公式アカウントの管理画面を確認すると...

line-audiences

無事にオーディエンスが作成されていました!

まとめ

AthenaとStep Functionsを組み合わせてLINE公式アカウントのオーディエンスを作成してみました。LINE公式アカウント側で保持していない情報で対象ユーザーを抽出したいようなユースケースだとETL処理とAthenaを組み合わせるといい感じに対象ユーザーが抽出できそうです。実際に案件に適用する場合は以下のような点を考慮すると良いでしょう。

  • JSON指定のオーディエンス作成は1リクエストあたり10,000件の上限がある。これ以上の件数を扱いたいならファイル指定のオーディエンス作成が必要
  • Step FunctionsのHTTPタスクはファイルアップロードに対応していないので、ファイル指定のオーディエンス作成を利用する場合は別途Lambdaの実装が必要
  • ファイル指定のオーディエンス作成を利用する場合はAthenaのCTASでアップロード用のファイルを作成してS3に出力しておくと後続処理で楽ができそう
  • ステートマシンへの入力もしくはLambdaを利用してAthenaのクエリを動的に生成すると良さそう

紹介したコードは以下のリポジトリで公開しています。

https://github.com/cm-iwata/create-line-audience-by-stepfunctions

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.