Glueを使ってRDSからPinpointのセグメント情報を抽出してみた

2022.07.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。たかやまです。

RDS(SQL Server)のテーブル情報をPinpointのセグメントにインポートしたいケースがあり、今回はGlueを使ってデータ抽出を行い、Pinpointのセグメントファイルのデータ形式に変換してS3に出力してみたいと思います。

今回の検証環境のサンプルコードはこちらです。(Glue ジョブは含まず)

やりたいこと

SQL ServerのUsersテーブルをPinpointにインポートできるセグメント情報に変換してS3に格納

やってみた

SQL Serverの設定

SQL Server Express Editionを用意して、以下のコマンドで検証用のテーブルを事前に作成しています。

CREATE DATABASE Test;
GO
USE Test;
GO
CREATE TABLE Users ( 
  UserID CHAR(5) PRIMARY KEY,
  FirstName VARCHAR(10) NOT NULL,
  LastName VARCHAR(10) NOT NULL,
  Telephone VARCHAR(20),
  email VARCHAR(256)
);
GO
INSERT INTO Users
    (UserID, FirstName, LastName, Telephone, email)
VALUES
    (1, 'example', 'user1', '080-xxxx-0001', 'example1@example.com'),
    (2, 'example', 'user2', '080-xxxx-0002', 'example2@example.com'),
    (3, 'example', 'user3', '' , 'example3@example.com'),
    (4, 'example', 'user4', '080-xxxx-0004', ''),
    (5, 'example', 'user5', '', '')
GO

Glue接続の設定

RDSに接続するためのGlueの接続を設定します。

VPC内のRDSに接続するために以下の設定をしている必要があります。

  • Glue用自己参照型セキュリティグループの用意
  • S3への接続経路

セキュリティグループ作成

Glueの接続を許可するために、Glueにアタッチするセキュリティグループは自己参照のインバウンドソースが必要になります。

S3エンドポイント作成

また、GlueのスクリプトファイルなどをS3に保存するためにS3へのインターネット経由の経路またはVPCエンドポイントが必要になります。
ここではGateway型のS3エンドポイントを用意しています。

Glue 接続の作成

追記:RDSのパスワードをSecrets Managerで管理している場合は、Glue接続の作成をGlue StudioコンソールのConnectorsから実施してください。


左ペインの接続 -> 接続の追加を選択します。

接続プロパティで適当な一意の接続名、接続タイプJDBCを設定します。

次にデータストアへのアクセスを設定します。

項目 内容
JDBC URL SQL ServerのJDBC URL : sqlserver://host:port;databaseName=db_name
host : RDSエンドポイント
port : RDSの接続ポート
db_name : 接続先のデータベース名
ユーザー名 データベースにアクセスできるユーザ
パスワード ユーザのパスワード
VPC 接続先のRDSを含んでいるVPC
サブネット 接続先のRDSを含んでいるサブネット
セキュリティグループ 自己参照インバウンドポートが空いているセキュリティーグループ
RDSに接続可能なセキュリティグループ

設定に問題がなければ完了を選択します。

設定した内容で接続可能か接続のテストを実施します。

RDSに接続できるGlueのロールを選択します。
作成していない場合には、リンクの手順を参考に作成してください。

ステップ 2: AWS Glue 用にIAM ロールを作成する - AWS Glue

設定に問題がなければ、正常にインスタンスに接続されました。と表示されることが確認できます。(数分かかります。)

S3まわりで問題がある場合には以下のようなエラーメッセージが出力されます。

Reason: could not find S3 endpoint or NAT gateway for subnetId subnet-xxxx in Vpc vpc-xxxx

S3のエラーはわかりやすいですが、JDBC URLなどRDS接続周りで接続できない場合には以下のようなエラーメッセージが出るので参考までに

Unable to resolve any valid connection.

Glue クローラの設定

Glue クローラの作成

SQL Serverのテーブル情報抽出のため、Glue クローラを設定します。

適当なクローラーの名前を設定します。

クローラーのソースはSQL Serverを利用するため、Data storesを選択します。

データベースの追加ではJDBCを選択して、接続の項目にはさきほど作成したGlue接続を選びます。
インクルードパスには%を指定し、接続先データベースのテーブルをすべて取得します。

接続のテストで利用したロールを選択します。

クローラの実行スケジュールを選択します。
ここではオンデマンドでの実行を選択します。

クローラの出力先を設定します。
出力先のGlueデータベースが無い場合はデータベースの追加からデータベースを用意してください。

最後に設定を確認し問題がなければ、完了を選択します。

Glueテーブルの作成

作成されたクローラを実行します。

クローラーが問題なく完了すれば、Glue テーブルにSQL Serverのテーブル情報が追加されます。

Glue ジョブの設定

Glue Studioでジョブを作成していきます。作成したフローの全体像はこちらです。

Job Script
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
import re

# Script generated for node Custom transform
def add_colum2(glueContext, dfc) -> DynamicFrameCollection:
    from pyspark.sql.functions import col, concat, lit, regexp_replace

    df = dfc.select(list(dfc.keys())[0]).toDF()
    df = df.withColumn("ChannelType", lit("SMS"))
    df = df.withColumn("Address", regexp_replace("Address", "-", ""))
    df = df.withColumn("Address", regexp_replace("Address", "^.", ""))
    df_add_colum = df.withColumn("Address", concat(lit("+81"), col("Address")))
    output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df")
    return DynamicFrameCollection({"output_df1": output_df1}, glueContext)


# Script generated for node Custom transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    selected = dfc.select(list(dfc.keys())[0]).toDF()
    reprep = selected.coalesce(1)
    results = DynamicFrame.fromDF(reprep, glueContext, "results")
    return DynamicFrameCollection({"results": results}, glueContext)


def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(
        "(select * from source1) UNION " + unionType + " (select * from source2)"
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


# Script generated for node Custom transform
def add_colum1(glueContext, dfc) -> DynamicFrameCollection:
    from pyspark.sql.functions import lit

    df = dfc.select(list(dfc.keys())[0]).toDF()
    df_add_colum = df.withColumn("ChannelType", lit("EMAIL"))
    output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df")
    return DynamicFrameCollection({"output_df1": output_df1}, glueContext)


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node SQL Server table
SQLServertable_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="mssql-s3-integration-dev-db",
    table_name="test_dbo_users",
    transformation_ctx="SQLServertable_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=SQLServertable_node1,
    mappings=[("telephone", "string", "Address", "string")],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node Apply Mapping
ApplyMapping_node1658129522250 = ApplyMapping.apply(
    frame=SQLServertable_node1,
    mappings=[("email", "string", "Address", "string")],
    transformation_ctx="ApplyMapping_node1658129522250",
)

# Script generated for node Filter
Filter_node1658143711633 = Filter.apply(
    frame=ApplyMapping_node2,
    f=lambda row: (bool(re.match("\S", row["Address"]))),
    transformation_ctx="Filter_node1658143711633",
)

# Script generated for node Filter
Filter_node1658143945724 = Filter.apply(
    frame=ApplyMapping_node1658129522250,
    f=lambda row: (bool(re.match("\S", row["Address"]))),
    transformation_ctx="Filter_node1658143945724",
)

# Script generated for node Custom transform
Customtransform_node1658140204230 = add_colum2(
    glueContext,
    DynamicFrameCollection(
        {"Filter_node1658143711633": Filter_node1658143711633}, glueContext
    ),
)

# Script generated for node Custom transform
Customtransform_node1658140211475 = add_colum1(
    glueContext,
    DynamicFrameCollection(
        {"Filter_node1658143945724": Filter_node1658143945724}, glueContext
    ),
)

# Script generated for node Select From Collection
SelectFromCollection_node1658140219909 = SelectFromCollection.apply(
    dfc=Customtransform_node1658140204230,
    key=list(Customtransform_node1658140204230.keys())[0],
    transformation_ctx="SelectFromCollection_node1658140219909",
)

# Script generated for node Select From Collection
SelectFromCollection_node1658140226309 = SelectFromCollection.apply(
    dfc=Customtransform_node1658140211475,
    key=list(Customtransform_node1658140211475.keys())[0],
    transformation_ctx="SelectFromCollection_node1658140226309",
)

# Script generated for node Union
Union_node1658134250028 = sparkUnion(
    glueContext,
    unionType="ALL",
    mapping={
        "source1": SelectFromCollection_node1658140226309,
        "source2": SelectFromCollection_node1658140219909,
    },
    transformation_ctx="Union_node1658134250028",
)

# Script generated for node Custom transform
Customtransform_node1658311524969 = MyTransform(
    glueContext,
    DynamicFrameCollection(
        {"Union_node1658134250028": Union_node1658134250028}, glueContext
    ),
)

# Script generated for node Select From Collection
SelectFromCollection_node1658311587979 = SelectFromCollection.apply(
    dfc=Customtransform_node1658311524969,
    key=list(Customtransform_node1658311524969.keys())[0],
    transformation_ctx="SelectFromCollection_node1658311587979",
)

# Script generated for node Amazon S3
AmazonS3_node1658134594283 = glueContext.write_dynamic_frame.from_options(
    frame=SelectFromCollection_node1658311587979,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://mssql-s3-integration-dev-mssqls3integrationdevs30-q462kdal8dd1",
        "partitionKeys": [],
    },
    transformation_ctx="AmazonS3_node1658134594283",
)

job.commit()

項目ごとに説明していきます。

Data Sourceには作成したSQL Serverのデータカタログを指定します。

Apply Mappingで不要なカラムの削除とtelephone/emailカラムの分離、リネームを行っています。

レコードには値が入っていないものもあるので、Filterを使い値の入っているレコードのみ抽出します。

Custom transformでpinpointで必須項目のChannelTypeのカラムを追加していきます。
Custom transoformの出力はSelectFromCollectionでDynamicFrameに変換する必要があります。

add_colum2の処理ではカラム追加に合わせて電話番号情報をpinpointで扱うための国際公衆電気通信番号(E.164)に変換する処理も行っています。

Amazon Pinpoint での電話番号の検証 - Amazon Pinpoint

def add_colum2 (glueContext, dfc) -> DynamicFrameCollection:
    from pyspark.sql.functions import col, concat, lit, regexp_replace
    
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df = df.withColumn("ChannelType", lit("SMS"))
    df = df.withColumn("Address", regexp_replace("Address", "-", ""))
    df = df.withColumn("Address", regexp_replace("Address", "^.", ""))
    df_add_colum = df.withColumn("Address", concat(lit("+81"), col("Address")))
    output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df")
    return DynamicFrameCollection({"output_df1": output_df1}, glueContext)

Unionで分離したメールアドレスと電話のデータセットを結合します。

2つ目のCustom transformではGlueの出力をまとめます。

通常Glueは処理結果を複数のファイルに分割して出力します。
分割されるとpinpointへのインポートが大変になるので、coalesceを使用して出力ファイルをひとつにまとめます。

より大きなファイルを出力するように AWS Glue ETL ジョブを設定する

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    selected = dfc.select(list(dfc.keys())[0]).toDF()
    reprep = selected.coalesce(1)
    results = DynamicFrame.fromDF(reprep, glueContext, "results")
    return DynamicFrameCollection({"results": results}, glueContext)

Data tagetでS3バケットを選択します。
pinpointに取り込むように非圧縮のCSV形式で出力します。

ジョブ実行

作成したジョブを実行していきます。
ジョブプロパティは以下の通りです。実行のたびに全件データを抽出したいので今回Job bookmarkはDisableにしておきます。

ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue

Runを選択して、ステータスがSucceededになれば実行完了です。

Data targetに指定したS3バケットにデータが格納されていることが確認できます。

ファイルにはCSV形式でフォーマットされたデータが格納されていることが確認できます。
電話番号がダブルクォーテーションに付与されてしまっていますが、pinpointへの取り込み上の問題はないので一旦このままで。。

Address,ChannelType
example1@example.com,EMAIL
example3@example.com,EMAIL
example2@example.com,EMAIL
"+8180xxxx0004",SMS
"+8180xxxx0001",SMS
"+8180xxxx0002",SMS

Pinpointへのインポート

出力したデータをPinpointのセグメント情報としてインポートしたいと思います。

無事、インポートされることが確認できました。

最後に

ほとんどコードを書かずにGlueを使ってやりたいことを実現できたと思います。

ただ、このブログを書いているときにGlue DataBrewでは電話番号のフォーマットを行う機能なども使えることを知ったので、今回やりたかったことはDataBrewの方が簡単にできそうな気がしました。

次はGlue Databrewでためしてみたいと思います。

以上、たかやまでした。