AWS Glue と SQLのみで、サクッとETL(Extract、Transform、Load)するJobを作成する

2018.05.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS Glueは、指定した条件に基づいてPySparkのETL(Extract、Transform、Load)の雛形コードが自動生成されますが、それ以上の高度な変換は、PySparkのコードを作成、デバックする必要があります。これには AWS Glue の DynamicFrame や Apache Spark のコード作成、デバックのノウハウが必要になります。今回はAWS Glueを利用するハードルを一気に下げられるように、SQLのみでサクッとETL(Extract、Transform、Load)する実現する方法、Jobを作成する方法をご紹介します。

AWS GlueのETLによるSQL中心アーキテクチャの実現

AWS Glueは、指定した条件に基づいてPySparkのETLの雛形コードが自動生成されます。ソースデータのカラムの選択や型変換はGUIで指定が可能ですが、カラムの関数適用、条件(CASE〜WHEN)による編集、データのフィルタ、データの集計などは、AWS Glueが標準で提供しているTransformApache SparkのDataFrameの関数を用いて、コードを書かなければなりません。

DWHでは、プログラムを作成せずにデータファイルを変換する方法として、ELTがあります。ELTはExtract、Load、Transformの略で、ソースデータをテーブルにロードした後、SQLによってデータを変換する方法のことを表します。この方法でデータを変換することを「SQL中心アーキテクチャ」などと呼びます。AWS Glue や Apache Sparkに比べて、SQLを使いこなせるエンジニアは遥かに多いはずです。AWS Glueでは主にスキーマオンリードなので、ロードせずに変換するので、ETLになります。「SQL中心アーキテクチャ」なのにETLというのはAWS Glueらしい、これまでにないソリューションです。

SQLのみでETLを実現する方法

今回は必要最小限の情報である、ソースデータ(データベース名、テーブル名)とターゲットデータ(出力先のS3のURL、出力ファイルフォーマット)、実行するSQL文をGlueのジョブに情報として渡します。

ETLする流れは以下のとおりです。

  1. ソースデータのテーブル定義
  2. ETL Jobの作成
  3. 設定ファイルのアップロード
  4. 設定ファイルを引数に指定して実行

ETL Jobは、一度作成すると何度も利用できます。

ソースデータのテーブル定義

データソースのテーブル定義は以下のとおりです。

CREATE EXTERNAL TABLE `elb_parquet_20150101`(
`request_timestamp` string,
`elb_name` string,
`request_ip` string,
`request_port` int,
`backend_ip` string,
`backend_port` int,
`request_processing_time` double,
`backend_processing_time` double,
`client_response_time` double,
`elb_response_code` string,
`backend_response_code` string,
`received_bytes` bigint,
`sent_bytes` bigint,
`request_verb` string,
`url` string,
`protocol` string,
`user_agent` string,
`ssl_cipher` string,
`ssl_protocol` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://athena-examples-ap-northeast-1/elb/parquet/year=2015/month=1/day=1'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='elb_parquet',
'classification'='parquet',
'compressionType'='none',
'transient_lastDdlTime'='1527140798',
'typeOfData'='file')

Job Parametersに設定ファイルのS3のURLを指定する

AWS Glue の Job は実行時にJob Parametersを渡すことが可能ですが、この引数にSQLのような空白を含む文字列は引数に指定できません。そのため、必要なパラメタをキーバリュー形式のjsonの設定ファイルを作成、S3にアップロードしておいて、ジョブには設定ファイルのS3のURLを指定しています。結果として、引数に渡すS3のURLを切り替えるだけで様々なETLジョブに対応できる汎用的なジョブとして利用できるようになります。

パラメタファイル(elb_request_count.json)は以下のような形式です。

{
"source_database":"default",
"source_table":"elb_parquet_20150101",
"target_s3_url":"s3:///tmp/",
"target_format":"parquet",
"sql":"SELECT elb_name, count(*) as request_count FROM stagingtable group by elb_name order by elb_name"
}

ETL Jobの詳細

設定ファイルのバケット(extra_job_parameters_bucket)とキー(extra_job_parameters_key)から、S3のjsonファイルを読み取り、パラメタを取得します。

SQLを実行するには、AWS Glue の DynamicFrame から Apache Spark のDataFrameを取得します。このDataFrameをcreateOrReplaceTempView()関数を用いて、stagingtableというテーブルにマッピングした後、このテーブル名をSQLに指定します。このDataFrameから実行するSQLは、Spark SQL と呼ばれるものです。SQLの詳細は、Apache Spark SQL documentationを参照してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
import boto3
from boto3.session import Session
import json

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
['JOB_NAME',
'extra_job_parameters_bucket',
'extra_job_parameters_key'])
print "JOB_NAME : ", args['JOB_NAME']
print "extra_job_parameters_bucket: ", args['extra_job_parameters_bucket']
print "extra_job_parameters_key : ", args['extra_job_parameters_key']

# extra parameters
bucket_name = args['extra_job_parameters_bucket']
json_key = args['extra_job_parameters_key']
s3 = boto3.resource('s3')

obj = s3.Object(bucket_name, json_key)
body = obj.get()['Body']
json_str = body.read()
json_dict = json.loads(json_str)
#print "debug: ", json_dict

source_database = json_dict['source_database']
source_table = json_dict['source_table']
target_s3_url = json_dict['target_s3_url']
target_format = json_dict['target_format']
sql = json_dict['sql']

print "-- Extra Job Parameters --"
print "source_database :", source_database
print "source_table :", source_table
print "target_s3_url :", target_s3_url
print "target_format :", target_format
print "sql :", sql

# Initalize
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource
## @args: [database = args['source_table'], table_name = "elb_logs", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = source_database, table_name = source_table, transformation_ctx = "datasource0")

## Transform
df = datasource0.toDF()
df.createOrReplaceTempView('stagingtable')
df_sql = spark.sql(sql)
df_sql.show()
transformed = DynamicFrame.fromDF(df_sql, glueContext, 'transformed')

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3:///tmp"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = transformed]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = transformed, connection_type = "s3", connection_options = {"path": target_s3_url}, format = target_format, transformation_ctx = "datasink2")

# Terminate
job.commit()

AWS Glue が内部的に実行している spark-submit の実行時の引数を参考に添付します。(一部変更しています。)

/usr/lib/spark/bin/spark-submit --conf spark.hadoop.yarn.resourcemanager.connect.max-wait.ms=60000 --conf spark.hadoop.fs.defaultFS=hdfs://ip-172-31-22-238.ec2.internal:8020 --conf spark.hadoop.yarn.resourcemanager.address=ip-172-31-22-238.ec2.internal:8032 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=2 --conf spark.executor.memory=5g --conf spark.executor.cores=4 --name tape --master yarn --deploy-mode cluster --jars /opt/amazon/superjar/glue-assembly.jar --files /tmp/glue-default.conf,/tmp/glue-override.conf,/opt/amazon/certs/InternalAndExternalAndAWSTrustStore.jks,/opt/amazon/certs/rds-combined-ca-bundle.pem,/opt/amazon/certs/redshift-ssl-ca-cert.pem,/tmp/g-2f0f37eb8b8d42f66efca17cb21d16d893a01082-7678943759023064999/script_2018-05-27-12-15-36.py --py-files /tmp/PyGlue.zip --driver-memory 5g --executor-memory 5g /tmp/runscript.py script_2018-05-27-12-15-36.py --JOB_NAME sqletl --JOB_ID j_25cd9d390955525173f15b345dcd9555620a813b0e9ba269e184f821c0b93628 --extra_job_parameters_bucket aws-glue-scripts-XXXXXXXXXXXX-us-east-1 --JOB_RUN_ID jr_4dedf22a2d35d7902ec94712cf35d2ac2650820c2e47fd8d4ece7ad37c07a6f1 --job-bookmark-option job-bookmark-disable --extra_job_parameters_key extra-job-parameters/elb_request_count.json --TempDir s3://aws-glue-temporary-XXXXXXXXXXXX-us-east-1/cm-user

実行結果

実行結果は以下のとおりです。ELB(CLB)ごとのリクエスト数を取得しています。

:
LogType:stdout
Log Upload Time:Sun May 27 12:18:25 +0000 2018
LogLength:1093
Log Contents:
JOB_NAME : sqletl
extra_job_parameters_bucket: aws-glue-scripts-XXXXXXXXXXXX-us-east-1
extra_job_parameters_key : extra-job-parameters/elb_request_count.json
-- Extra Job Parameters --
source_database : default
source_table : elb_parquet_20150101
target_s3_url : s3:///tmp/
target_format : parquet
sql : SELECT elb_name, count(*) as request_count FROM stagingtable group by elb_name order by elb_name
+------------+-------------+
| elb_name|request_count|
+------------+-------------+
|elb_demo_001| 1166566|
|elb_demo_002| 1165640|
|elb_demo_003| 1164292|
|elb_demo_004| 1166552|
|elb_demo_005| 1167118|
|elb_demo_006| 1167336|
|elb_demo_007| 1166289|
|elb_demo_008| 1164801|
|elb_demo_009| 1163364|
+------------+-------------+

End of LogType:stdout
:

出力ファイルフォーマットに指定した、parquet形式のファイルターゲットのS3のURLに出力されました。

最後に

今回は、AWS Glue と SQLのみでETLする方法をご紹介しました。データの変換する方法は、AWS Glueが提供するTransformと、Spark SQL/Functionの2つあります。この2つを効果的に組み合わせることが理想的ですが、Spark SQLは、Catalyst Optimizerによる最適化が期待できますので、本番環境の大量データでも十分に利用できます。また、複雑な処理もSQLのほうが簡潔に書ける場合が多いので、メンテナンスを重視するのであれば、SQLでETLができないか検討すると良いでしょう。