AWS Glue の Pushdown Predicates を用いてすべてのファイルを読み込むことなく、パーティションをプレフィルタリングする

2018.05.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS Glueが自動生成するETLコード(PySpark)では、ソースデータをDynamicFrame内部のRDDに読み込み、後続の条件に基づいてデータのフィルタ・変換を繰り返し、最終的にターゲットデータに出力します。一般的なユースケースではすべてのデータが対象で構いませんが、この動作は特定のパーティションのデータのみが対象の場合でも全てのデータの読み込みが発生しまうことを意味します。

例えば、過去1年間のデータを日毎にパーティションしている外部テーブルあるとします。必要なデータがこのテーブルの最新の日付のデータのみであっても、すべてを読み込んだ後、後続の条件に基づいてデータをフィルタすることが必要になります。今回ご紹介するPushdown Predicates引数には、必要なパーティションのデータのみを読み込む指定ができます。不要なデータの読み込みやRDDオブジェクトの生成・破棄のコストを削減できます。つまり、最新の日付のデータのみであった場合、1/365の読み取りで済みます。

Pushdown Predicates とは

AWS Gule の Pushdown Predicates とは、データ(例.S3上のファイル)に対してAWS Glueの各ワーカーが必要なパーティションのデータのみを読み込んでRDDを生成し、後続のフィルタ・変換処理に引渡す、といったプロセスをとります。不要なデータを読まないことでデータの生成・破棄のコストが下がり、結果的にパフォーマンスが向上、、コスト削減が期待できます。

指定方法

push_down_predicate引数には、その述語式を指定します。述語式は、Spark SQLでサポートされている任意のブール式やSpark SQLクエリでWHERE句に入れることができるものはすべて動作します。詳細は、Apache Spark SQL documentation、特にScala SQL functions referenceを参照してください。

(変更前)

datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "default",
table_name = "elb_parquet",
transformation_ctx = "datasource0")

(変更後)

partition_predicate="year='2015' and month='01' and day='01'"
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database = "default",
table_name = "elb_parquet",
push_down_predicate = partition_predicate
transformation_ctx = "datasource0")

例.1ヶ月のELBのログから一日のデータをフィルタする

では、実際にフィルタ動作を確認してみましょう。

1ヶ月のELBのログテーブルの作成

圧縮済みファイル(snappy)11 GBのデータファイルに対してテーブル定義します。

$ aws s3 ls s3://athena-examples-us-east-1/elb/parquet/year=2015/month=1/ --recursive --sum --human
2017-02-16 09:42:23 364.8 MiB elb/parquet/year=2015/month=1/day=1/part-r-00156-e764ec48-9e47-4c2c-8d00-68b3a8534cc2.snappy.parquet
2017-02-16 09:42:23 367.6 MiB elb/parquet/year=2015/month=1/day=10/part-r-00104-e764ec48-9e47-4c2c-8d00-68b3a8534cc2.snappy.parquet
:
:
2017-02-16 09:43:34 359.4 MiB elb/parquet/year=2015/month=1/day=9/part-r-00013-e764ec48-9e47-4c2c-8d00-68b3a8534cc2.snappy.parquet

Total Objects: 31
Total Size: 11.0 GiB
CREATE EXTERNAL TABLE `elb_parquet`(
`request_timestamp` string,
`elb_name` string,
`request_ip` string,
`request_port` int,
`backend_ip` string,
`backend_port` int,
`request_processing_time` double,
`backend_processing_time` double,
`client_response_time` double,
`elb_response_code` string,
`backend_response_code` string,
`received_bytes` bigint,
`sent_bytes` bigint,
`request_verb` string,
`url` string,
`protocol` string,
`user_agent` string,
`ssl_cipher` string,
`ssl_protocol` string)
PARTITIONED BY (
`day` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://athena-examples-ap-northeast-1/elb/parquet/year=2015/month=1/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='elb_parquet',
'classification'='parquet',
'compressionType'='none',
'typeOfData'='file');

MSCK REPAIR TABLE elb_parquet;

方式1:Filterによるパーティションのフィルタリング

ビルトインの Filter Transformを用いて、1日のデータを取得します。

1ヶ月分の全てのデータを読み込み、DynamicFramedatasource0を生成した後、Filter Transformで、dayの値が1のデータのDynamicFramefilteringを再生成します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0")
print "Full record count: ", datasource0.count()
datasource0.printSchema()
## @type: Filter
## @args: [f = lambda x: x["day"] = "1", transformation_ctx = "filtering"]
## @return: filtering
## @inputs: [frame = applymapping1]
filtering = Filter.apply(frame = datasource0, f = lambda x: x["day"] == "1", transformation_ctx = "filtering")
print "Filtered record count: ", filtering.count()
## @type: ApplyMapping
## @args: [mapping = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3:///elb_filtered"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3:///elb_filtered"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

実行時間は、DPU「10」で約28分でした。

方式2:push_down_predicateによるパーティションのプレフィルタリング

DynamicFrameを生成する際にpush_down_predicateに指定したパーティションのみを読み込み、DynamicFrameを生成します。必要なパーティションの読み込みや、DynamicFrameの再生成が不要になります。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "elb_parquet", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "elb_parquet", push_down_predicate = "day='1'", transformation_ctx = "datasource0")
print "Full record count: ", datasource0.count()
datasource0.printSchema()
## @type: ApplyMapping
## @args: [mapping = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("request_timestamp", "string", "request_timestamp", "string"), ("elb_name", "string", "elb_name", "string"), ("request_ip", "string", "request_ip", "string"), ("request_port", "int", "request_port", "int"), ("backend_ip", "string", "backend_ip", "string"), ("backend_port", "int", "backend_port", "int"), ("request_processing_time", "double", "request_processing_time", "double"), ("backend_processing_time", "double", "backend_processing_time", "double"), ("client_response_time", "double", "client_response_time", "double"), ("elb_response_code", "string", "elb_response_code", "string"), ("backend_response_code", "string", "backend_response_code", "string"), ("received_bytes", "long", "received_bytes", "long"), ("sent_bytes", "long", "sent_bytes", "long"), ("request_verb", "string", "request_verb", "string"), ("url", "string", "url", "string"), ("protocol", "string", "protocol", "string"), ("user_agent", "string", "user_agent", "string"), ("ssl_cipher", "string", "ssl_cipher", "string"), ("ssl_protocol", "string", "ssl_protocol", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3:///elb_prefiltered"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3:///elb_prefiltered"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

実行時間は、DPU「10」で約2分でした。

考察

方式2は、方式1の14倍速いという結果でした。パフォーマンスの改善の理由は以下の2点と考えられます。

  • 方式2は、ソースデータの読み込みが1/31に削減できた
  • 方式2は、フィルター適用後のDynamicFrameの再作成が不要

上記の性能改善はあったが、DynamicFrameからCSVファイル出力する時間は変わらないため、31倍ではなく14倍程度になったのではないかと考えられます。

AWS Glueにおけるデータフィルタリングのベストプラクティス

方式1と2を比較しましたが、どちらかが優れているというものではなく、用途に応じて組み合わせることが効果的です。つまり、

  • パーティションしているテーブルは、方式2によってパーティションキーでフィルタリングする
  • 生成されたDynamicFrameの情報は、方式1によってフィルタリングする
  • 上記を方式2、方式1の順に適用する

ということがベストプラクティスです。

最後に

AWS Glueのデータソースは、S3上のデータファイルを直接データソースとするのではなく、データカタログの情報に基づきS3上のデータファイルをスキャンできます。この方式を採用することで不要なパーティションのデータを読み込むことなく、初期のDynamicFrame生成の際にフィルタリングできます。この機能はETLコードのデバックの際に、入力データを絞込む目的で利用することも効果的です。

テーブル設計の際には、パーティショニングを常に心がけておくことで、2つのフィルタリング方式を用途に応じて組み合わせることでパフォーマンス向上、コスト削減が期待できます。テラバイト以上のソースデータを取り扱う場合は、ぜひご検討ください。