AWS Glue と SQLのみで、サクッとETL(Extract、Transform、Load)するJobを作成する
はじめに
AWS Glueは、指定した条件に基づいてPySparkのETL(Extract、Transform、Load)の雛形コードが自動生成されますが、それ以上の高度な変換は、PySparkのコードを作成、デバックする必要があります。これには AWS Glue の DynamicFrame や Apache Spark のコード作成、デバックのノウハウが必要になります。今回はAWS Glueを利用するハードルを一気に下げられるように、SQLのみでサクッとETL(Extract、Transform、Load)する実現する方法、Jobを作成する方法をご紹介します。
AWS GlueのETLによるSQL中心アーキテクチャの実現
AWS Glueは、指定した条件に基づいてPySparkのETLの雛形コードが自動生成されます。ソースデータのカラムの選択や型変換はGUIで指定が可能ですが、カラムの関数適用、条件(CASE〜WHEN)による編集、データのフィルタ、データの集計などは、AWS Glueが標準で提供しているTransform や Apache SparkのDataFrameの関数を用いて、コードを書かなければなりません。
DWHでは、プログラムを作成せずにデータファイルを変換する方法として、ELTがあります。ELTはExtract、Load、Transformの略で、ソースデータをテーブルにロードした後、SQLによってデータを変換する方法のことを表します。この方法でデータを変換することを「SQL中心アーキテクチャ」などと呼びます。AWS Glue や Apache Sparkに比べて、SQLを使いこなせるエンジニアは遥かに多いはずです。AWS Glueでは主にスキーマオンリードなので、ロードせずに変換するので、ETLになります。「SQL中心アーキテクチャ」なのにETLというのはAWS Glueらしい、これまでにないソリューションです。
SQLのみでETLを実現する方法
今回は必要最小限の情報である、ソースデータ(データベース名、テーブル名)とターゲットデータ(出力先のS3のURL、出力ファイルフォーマット)、実行するSQL文をGlueのジョブに情報として渡します。
ETLする流れは以下のとおりです。
- ソースデータのテーブル定義
- ETL Jobの作成
- 設定ファイルのアップロード
- 設定ファイルを引数に指定して実行
ETL Jobは、一度作成すると何度も利用できます。
ソースデータのテーブル定義
データソースのテーブル定義は以下のとおりです。
CREATE EXTERNAL TABLE `elb_parquet_20150101`( `request_timestamp` string, `elb_name` string, `request_ip` string, `request_port` int, `backend_ip` string, `backend_port` int, `request_processing_time` double, `backend_processing_time` double, `client_response_time` double, `elb_response_code` string, `backend_response_code` string, `received_bytes` bigint, `sent_bytes` bigint, `request_verb` string, `url` string, `protocol` string, `user_agent` string, `ssl_cipher` string, `ssl_protocol` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://athena-examples-ap-northeast-1/elb/parquet/year=2015/month=1/day=1' TBLPROPERTIES ( 'CrawlerSchemaDeserializerVersion'='1.0', 'CrawlerSchemaSerializerVersion'='1.0', 'UPDATED_BY_CRAWLER'='elb_parquet', 'classification'='parquet', 'compressionType'='none', 'transient_lastDdlTime'='1527140798', 'typeOfData'='file')
Job Parametersに設定ファイルのS3のURLを指定する
AWS Glue の Job は実行時にJob Parametersを渡すことが可能ですが、この引数にSQLのような空白を含む文字列は引数に指定できません。そのため、必要なパラメタをキーバリュー形式のjsonの設定ファイルを作成、S3にアップロードしておいて、ジョブには設定ファイルのS3のURLを指定しています。結果として、引数に渡すS3のURLを切り替えるだけで様々なETLジョブに対応できる汎用的なジョブとして利用できるようになります。
パラメタファイル(elb_request_count.json)は以下のような形式です。
{ "source_database":"default", "source_table":"elb_parquet_20150101", "target_s3_url":"s3:///tmp/", "target_format":"parquet", "sql":"SELECT elb_name, count(*) as request_count FROM stagingtable group by elb_name order by elb_name" }
ETL Jobの詳細
設定ファイルのバケット(extra_job_parameters_bucket)とキー(extra_job_parameters_key)から、S3のjsonファイルを読み取り、パラメタを取得します。
SQLを実行するには、AWS Glue の DynamicFrame から Apache Spark のDataFrameを取得します。このDataFrameをcreateOrReplaceTempView()関数を用いて、stagingtableというテーブルにマッピングした後、このテーブル名をSQLに指定します。このDataFrameから実行するSQLは、Spark SQL と呼ばれるものです。SQLの詳細は、Apache Spark SQL documentationを参照してください。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame import boto3 from boto3.session import Session import json ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME', 'extra_job_parameters_bucket', 'extra_job_parameters_key']) print "JOB_NAME : ", args['JOB_NAME'] print "extra_job_parameters_bucket: ", args['extra_job_parameters_bucket'] print "extra_job_parameters_key : ", args['extra_job_parameters_key'] # extra parameters bucket_name = args['extra_job_parameters_bucket'] json_key = args['extra_job_parameters_key'] s3 = boto3.resource('s3') obj = s3.Object(bucket_name, json_key) body = obj.get()['Body'] json_str = body.read() json_dict = json.loads(json_str) #print "debug: ", json_dict source_database = json_dict['source_database'] source_table = json_dict['source_table'] target_s3_url = json_dict['target_s3_url'] target_format = json_dict['target_format'] sql = json_dict['sql'] print "-- Extra Job Parameters --" print "source_database :", source_database print "source_table :", source_table print "target_s3_url :", target_s3_url print "target_format :", target_format print "sql :", sql # Initalize sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = args['source_table'], table_name = "elb_logs", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = source_database, table_name = source_table, transformation_ctx = "datasource0") ## Transform df = datasource0.toDF() df.createOrReplaceTempView('stagingtable') df_sql = spark.sql(sql) df_sql.show() transformed = DynamicFrame.fromDF(df_sql, glueContext, 'transformed') ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3:///tmp"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = transformed] datasink2 = glueContext.write_dynamic_frame.from_options(frame = transformed, connection_type = "s3", connection_options = {"path": target_s3_url}, format = target_format, transformation_ctx = "datasink2") # Terminate job.commit()
AWS Glue が内部的に実行している spark-submit の実行時の引数を参考に添付します。(一部変更しています。)
/usr/lib/spark/bin/spark-submit --conf spark.hadoop.yarn.resourcemanager.connect.max-wait.ms=60000 --conf spark.hadoop.fs.defaultFS=hdfs://ip-172-31-22-238.ec2.internal:8020 --conf spark.hadoop.yarn.resourcemanager.address=ip-172-31-22-238.ec2.internal:8032 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=2 --conf spark.executor.memory=5g --conf spark.executor.cores=4 --name tape --master yarn --deploy-mode cluster --jars /opt/amazon/superjar/glue-assembly.jar --files /tmp/glue-default.conf,/tmp/glue-override.conf,/opt/amazon/certs/InternalAndExternalAndAWSTrustStore.jks,/opt/amazon/certs/rds-combined-ca-bundle.pem,/opt/amazon/certs/redshift-ssl-ca-cert.pem,/tmp/g-2f0f37eb8b8d42f66efca17cb21d16d893a01082-7678943759023064999/script_2018-05-27-12-15-36.py --py-files /tmp/PyGlue.zip --driver-memory 5g --executor-memory 5g /tmp/runscript.py script_2018-05-27-12-15-36.py --JOB_NAME sqletl --JOB_ID j_25cd9d390955525173f15b345dcd9555620a813b0e9ba269e184f821c0b93628 --extra_job_parameters_bucket aws-glue-scripts-XXXXXXXXXXXX-us-east-1 --JOB_RUN_ID jr_4dedf22a2d35d7902ec94712cf35d2ac2650820c2e47fd8d4ece7ad37c07a6f1 --job-bookmark-option job-bookmark-disable --extra_job_parameters_key extra-job-parameters/elb_request_count.json --TempDir s3://aws-glue-temporary-XXXXXXXXXXXX-us-east-1/cm-user
実行結果
実行結果は以下のとおりです。ELB(CLB)ごとのリクエスト数を取得しています。
: LogType:stdout Log Upload Time:Sun May 27 12:18:25 +0000 2018 LogLength:1093 Log Contents: JOB_NAME : sqletl extra_job_parameters_bucket: aws-glue-scripts-XXXXXXXXXXXX-us-east-1 extra_job_parameters_key : extra-job-parameters/elb_request_count.json -- Extra Job Parameters -- source_database : default source_table : elb_parquet_20150101 target_s3_url : s3:///tmp/ target_format : parquet sql : SELECT elb_name, count(*) as request_count FROM stagingtable group by elb_name order by elb_name +------------+-------------+ | elb_name|request_count| +------------+-------------+ |elb_demo_001| 1166566| |elb_demo_002| 1165640| |elb_demo_003| 1164292| |elb_demo_004| 1166552| |elb_demo_005| 1167118| |elb_demo_006| 1167336| |elb_demo_007| 1166289| |elb_demo_008| 1164801| |elb_demo_009| 1163364| +------------+-------------+ End of LogType:stdout :
出力ファイルフォーマットに指定した、parquet
形式のファイルターゲットのS3のURLに出力されました。
最後に
今回は、AWS Glue と SQLのみでETLする方法をご紹介しました。データの変換する方法は、AWS Glueが提供するTransformと、Spark SQL/Functionの2つあります。この2つを効果的に組み合わせることが理想的ですが、Spark SQLは、Catalyst Optimizerによる最適化が期待できますので、本番環境の大量データでも十分に利用できます。また、複雑な処理もSQLのほうが簡潔に書ける場合が多いので、メンテナンスを重視するのであれば、SQLでETLができないか検討すると良いでしょう。