こんにちは。たかやまです。
RDS(SQL Server)のテーブル情報をPinpointのセグメントにインポートしたいケースがあり、今回はGlueを使ってデータ抽出を行い、Pinpointのセグメントファイルのデータ形式に変換してS3に出力してみたいと思います。
今回の検証環境のサンプルコードはこちらです。(Glue ジョブは含まず)
やりたいこと
SQL ServerのUsersテーブルをPinpointにインポートできるセグメント情報に変換してS3に格納
やってみた
SQL Serverの設定
SQL Server Express Editionを用意して、以下のコマンドで検証用のテーブルを事前に作成しています。
CREATE DATABASE Test;
GO
USE Test;
GO
CREATE TABLE Users (
UserID CHAR(5) PRIMARY KEY,
FirstName VARCHAR(10) NOT NULL,
LastName VARCHAR(10) NOT NULL,
Telephone VARCHAR(20),
email VARCHAR(256)
);
GO
INSERT INTO Users
(UserID, FirstName, LastName, Telephone, email)
VALUES
(1, 'example', 'user1', '080-xxxx-0001', 'example1@example.com'),
(2, 'example', 'user2', '080-xxxx-0002', 'example2@example.com'),
(3, 'example', 'user3', '' , 'example3@example.com'),
(4, 'example', 'user4', '080-xxxx-0004', ''),
(5, 'example', 'user5', '', '')
GO
Glue接続の設定
RDSに接続するためのGlueの接続
を設定します。
VPC内のRDSに接続するために以下の設定をしている必要があります。
- Glue用自己参照型セキュリティグループの用意
- S3への接続経路
セキュリティグループ作成
Glueの接続を許可するために、Glueにアタッチするセキュリティグループは自己参照のインバウンドソースが必要になります。
S3エンドポイント作成
また、GlueのスクリプトファイルなどをS3に保存するためにS3へのインターネット経由の経路またはVPCエンドポイントが必要になります。
ここではGateway型のS3エンドポイントを用意しています。
Glue 接続の作成
追記:RDSのパスワードをSecrets Managerで管理している場合は、Glue接続の作成をGlue StudioコンソールのConnectorsから実施してください。
左ペインの接続
-> 接続の追加
を選択します。
接続プロパティで適当な一意の接続名、接続タイプJDBC
を設定します。
次にデータストアへのアクセスを設定します。
項目 | 内容 |
---|---|
JDBC URL | SQL ServerのJDBC URL : sqlserver://host:port;databaseName=db_name host : RDSエンドポイント port : RDSの接続ポート db_name : 接続先のデータベース名 |
ユーザー名 | データベースにアクセスできるユーザ |
パスワード | ユーザのパスワード |
VPC | 接続先のRDSを含んでいるVPC |
サブネット | 接続先のRDSを含んでいるサブネット |
セキュリティグループ | 自己参照インバウンドポートが空いているセキュリティーグループ RDSに接続可能なセキュリティグループ |
設定に問題がなければ完了
を選択します。
設定した内容で接続可能か接続のテスト
を実施します。
RDSに接続できるGlueのロールを選択します。
作成していない場合には、リンクの手順を参考に作成してください。
ステップ 2: AWS Glue 用にIAM ロールを作成する - AWS Glue
設定に問題がなければ、正常にインスタンスに接続されました。
と表示されることが確認できます。(数分かかります。)
S3まわりで問題がある場合には以下のようなエラーメッセージが出力されます。
Reason: could not find S3 endpoint or NAT gateway for subnetId subnet-xxxx in Vpc vpc-xxxx
S3のエラーはわかりやすいですが、JDBC URLなどRDS接続周りで接続できない場合には以下のようなエラーメッセージが出るので参考までに
Unable to resolve any valid connection.
Glue クローラの設定
Glue クローラの作成
SQL Serverのテーブル情報抽出のため、Glue クローラを設定します。
適当なクローラーの名前を設定します。
クローラーのソースはSQL Serverを利用するため、Data stores
を選択します。
データベースの追加ではJDBC
を選択して、接続の項目にはさきほど作成したGlue接続
を選びます。
インクルードパスには%
を指定し、接続先データベースのテーブルをすべて取得します。
接続のテストで利用したロールを選択します。
クローラの実行スケジュールを選択します。
ここではオンデマンドでの実行を選択します。
クローラの出力先を設定します。
出力先のGlueデータベースが無い場合はデータベースの追加からデータベースを用意してください。
最後に設定を確認し問題がなければ、完了を選択します。
Glueテーブルの作成
作成されたクローラを実行します。
クローラーが問題なく完了すれば、Glue テーブルにSQL Serverのテーブル情報が追加されます。
Glue ジョブの設定
Glue Studioでジョブを作成していきます。作成したフローの全体像はこちらです。
Job Script
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
import re
# Script generated for node Custom transform
def add_colum2(glueContext, dfc) -> DynamicFrameCollection:
from pyspark.sql.functions import col, concat, lit, regexp_replace
df = dfc.select(list(dfc.keys())[0]).toDF()
df = df.withColumn("ChannelType", lit("SMS"))
df = df.withColumn("Address", regexp_replace("Address", "-", ""))
df = df.withColumn("Address", regexp_replace("Address", "^.", ""))
df_add_colum = df.withColumn("Address", concat(lit("+81"), col("Address")))
output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df")
return DynamicFrameCollection({"output_df1": output_df1}, glueContext)
# Script generated for node Custom transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
selected = dfc.select(list(dfc.keys())[0]).toDF()
reprep = selected.coalesce(1)
results = DynamicFrame.fromDF(reprep, glueContext, "results")
return DynamicFrameCollection({"results": results}, glueContext)
def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(
"(select * from source1) UNION " + unionType + " (select * from source2)"
)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
# Script generated for node Custom transform
def add_colum1(glueContext, dfc) -> DynamicFrameCollection:
from pyspark.sql.functions import lit
df = dfc.select(list(dfc.keys())[0]).toDF()
df_add_colum = df.withColumn("ChannelType", lit("EMAIL"))
output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df")
return DynamicFrameCollection({"output_df1": output_df1}, glueContext)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node SQL Server table
SQLServertable_node1 = glueContext.create_dynamic_frame.from_catalog(
database="mssql-s3-integration-dev-db",
table_name="test_dbo_users",
transformation_ctx="SQLServertable_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=SQLServertable_node1,
mappings=[("telephone", "string", "Address", "string")],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Apply Mapping
ApplyMapping_node1658129522250 = ApplyMapping.apply(
frame=SQLServertable_node1,
mappings=[("email", "string", "Address", "string")],
transformation_ctx="ApplyMapping_node1658129522250",
)
# Script generated for node Filter
Filter_node1658143711633 = Filter.apply(
frame=ApplyMapping_node2,
f=lambda row: (bool(re.match("\S", row["Address"]))),
transformation_ctx="Filter_node1658143711633",
)
# Script generated for node Filter
Filter_node1658143945724 = Filter.apply(
frame=ApplyMapping_node1658129522250,
f=lambda row: (bool(re.match("\S", row["Address"]))),
transformation_ctx="Filter_node1658143945724",
)
# Script generated for node Custom transform
Customtransform_node1658140204230 = add_colum2(
glueContext,
DynamicFrameCollection(
{"Filter_node1658143711633": Filter_node1658143711633}, glueContext
),
)
# Script generated for node Custom transform
Customtransform_node1658140211475 = add_colum1(
glueContext,
DynamicFrameCollection(
{"Filter_node1658143945724": Filter_node1658143945724}, glueContext
),
)
# Script generated for node Select From Collection
SelectFromCollection_node1658140219909 = SelectFromCollection.apply(
dfc=Customtransform_node1658140204230,
key=list(Customtransform_node1658140204230.keys())[0],
transformation_ctx="SelectFromCollection_node1658140219909",
)
# Script generated for node Select From Collection
SelectFromCollection_node1658140226309 = SelectFromCollection.apply(
dfc=Customtransform_node1658140211475,
key=list(Customtransform_node1658140211475.keys())[0],
transformation_ctx="SelectFromCollection_node1658140226309",
)
# Script generated for node Union
Union_node1658134250028 = sparkUnion(
glueContext,
unionType="ALL",
mapping={
"source1": SelectFromCollection_node1658140226309,
"source2": SelectFromCollection_node1658140219909,
},
transformation_ctx="Union_node1658134250028",
)
# Script generated for node Custom transform
Customtransform_node1658311524969 = MyTransform(
glueContext,
DynamicFrameCollection(
{"Union_node1658134250028": Union_node1658134250028}, glueContext
),
)
# Script generated for node Select From Collection
SelectFromCollection_node1658311587979 = SelectFromCollection.apply(
dfc=Customtransform_node1658311524969,
key=list(Customtransform_node1658311524969.keys())[0],
transformation_ctx="SelectFromCollection_node1658311587979",
)
# Script generated for node Amazon S3
AmazonS3_node1658134594283 = glueContext.write_dynamic_frame.from_options(
frame=SelectFromCollection_node1658311587979,
connection_type="s3",
format="csv",
connection_options={
"path": "s3://mssql-s3-integration-dev-mssqls3integrationdevs30-q462kdal8dd1",
"partitionKeys": [],
},
transformation_ctx="AmazonS3_node1658134594283",
)
job.commit()
項目ごとに説明していきます。
Data Source
には作成したSQL Serverのデータカタログを指定します。
Apply Mapping
で不要なカラムの削除とtelephone/emailカラムの分離、リネームを行っています。
レコードには値が入っていないものもあるので、Filter
を使い値の入っているレコードのみ抽出します。
Custom transform
でpinpointで必須項目のChannelTypeのカラムを追加していきます。
Custom transoformの出力はSelectFromCollection
でDynamicFrameに変換する必要があります。
add_colum2の処理ではカラム追加に合わせて電話番号情報をpinpointで扱うための国際公衆電気通信番号(E.164)に変換する処理も行っています。
Amazon Pinpoint での電話番号の検証 - Amazon Pinpoint
def add_colum2 (glueContext, dfc) -> DynamicFrameCollection:
from pyspark.sql.functions import col, concat, lit, regexp_replace
df = dfc.select(list(dfc.keys())[0]).toDF()
df = df.withColumn("ChannelType", lit("SMS"))
df = df.withColumn("Address", regexp_replace("Address", "-", ""))
df = df.withColumn("Address", regexp_replace("Address", "^.", ""))
df_add_colum = df.withColumn("Address", concat(lit("+81"), col("Address")))
output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df")
return DynamicFrameCollection({"output_df1": output_df1}, glueContext)
Union
で分離したメールアドレスと電話のデータセットを結合します。
2つ目のCustom transform
ではGlueの出力をまとめます。
通常Glueは処理結果を複数のファイルに分割して出力します。
分割されるとpinpointへのインポートが大変になるので、coalesce
を使用して出力ファイルをひとつにまとめます。
より大きなファイルを出力するように AWS Glue ETL ジョブを設定する
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
selected = dfc.select(list(dfc.keys())[0]).toDF()
reprep = selected.coalesce(1)
results = DynamicFrame.fromDF(reprep, glueContext, "results")
return DynamicFrameCollection({"results": results}, glueContext)
Data taget
でS3バケットを選択します。
pinpointに取り込むように非圧縮のCSV形式で出力します。
ジョブ実行
作成したジョブを実行していきます。
ジョブプロパティは以下の通りです。実行のたびに全件データを抽出したいので今回Job bookmark
はDisableにしておきます。
ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue
Run
を選択して、ステータスがSucceededになれば実行完了です。
Data targetに指定したS3バケットにデータが格納されていることが確認できます。
ファイルにはCSV形式でフォーマットされたデータが格納されていることが確認できます。
電話番号がダブルクォーテーションに付与されてしまっていますが、pinpointへの取り込み上の問題はないので一旦このままで。。
Address,ChannelType
example1@example.com,EMAIL
example3@example.com,EMAIL
example2@example.com,EMAIL
"+8180xxxx0004",SMS
"+8180xxxx0001",SMS
"+8180xxxx0002",SMS
Pinpointへのインポート
出力したデータをPinpointのセグメント情報としてインポートしたいと思います。
無事、インポートされることが確認できました。
最後に
ほとんどコードを書かずにGlueを使ってやりたいことを実現できたと思います。
ただ、このブログを書いているときにGlue DataBrewでは電話番号のフォーマットを行う機能なども使えることを知ったので、今回やりたかったことはDataBrewの方が簡単にできそうな気がしました。
次はGlue Databrewでためしてみたいと思います。
以上、たかやまでした。