AWS Glueで多くの小さなファイルをまとめて読み込むgroupFiles/groupSize指定でパフォーマンスを改善する

はじめに

最近、データサイズは大きくない割に、ジョブが徐々に遅くなったり、メモリ不足が発生して処理が中断するといった相談を受けましたので、その対策の一つである小さなファイルをまとめて読み込むgroupFiles/groupSize指定について解説します。

多くの小さなファイル読み込みの問題点

普通の Apache Sparkは多くの小さなファイル(Kinesis Firehoseのファイル等)は、本来不向きです。理由は多くの小さなファイルを処理するとタスクが多すぎて、スケジューリングとメモリオーバーヘッドの増加し、最終的にはメモリ不足が発生して処理が中断します。一方、AWS Glue(のSpark)は、今回ご紹介するgroupFiles/groupSize指定することで、タスクごとにファイルを自動的にグループ化して正常にETLを終えることができます。

この機能については、昨年のre:Inventのセッションで紹介されていました。

このセッションについては、日本語で現地レポートとしてまとめています。

【レポート】AWS GlueによるサーバレスETL #reinvent #ABD315

groupFiles/groupSize指定とは

AWS GlueのETLジョブはS3データストアからファイルをまとめて読み取るグループ化を設定できます。S3データパーティション内のファイルはグループ化(groupFiles)と読み取るグループのサイズ(groupSize)を設定することで、各ETLタスクでは入力ファイルのグループを1つのメモリ内パーティションに読み取ることができます。

groupFilesには、1つのメモリ内パーティションに読み取るinPartitionを設定します。AWS Glue では、入力ファイル数が 50,000 を超える場合、自動的にグループ化を有効にします。以下に例を示します。

'--groupFiles': 'inPartition'

groupSizeには、1つのメモリ内パーティションに読み取るサイズの上限値をバイト数で指定します。以下、グループサイズを1MBに設定する方法は次のとおりです。

'--groupSize': '1024 * 1024'

Amazon S3 から直接読み取る場合

S3データストアのファイルを読み取る場合は create_dynamic_frame_from_options メソッドに以下の接続オプションを追加します。たとえば、ファイルを 1 MB のグループにグループ化する方法は次のとおりです。

dyf = glueContext.create_dynamic_frame_from_options(
        "s3", 
        {'paths': ["s3://s3path/"], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, 
        format="json")

Glueデータカタログのテーブルを読み取る場合

マニュアルに明記されているわけではないのですが、マニュアルの内容を精査して検証した結果、データソースのテーブルのTable Propertiesに2つのパラメタを設定することで動作することを確認しました。Glueデータカタログのテーブルの場合はジョブのパラメタではなく、データソースのテーブルに設定が必要になります。

実際の動作の解説

Sparkは、repartitionしない限り入力したファイルごとに出力ファイルが作成されます。つまり、出力がS3の場合は、1入力ファイルに対して1つ以上の出力ファイルが生成されます。groupFiles/groupSize指定することで、1000入力ファイルが1つのファイルにまとめられることによって出力ファイルが1つになるかどうかを確認します。ETLジョブは、jsonファイルを入力して、csvファイルにフォーマット変換する簡単なものです。

検証データ

多くの小さいファイルの例として、1ファイル1レコードのデータファイルを1000ファイル作成しました。

$ aws s3 ls cm-analytics-posdata --recursive
2018-07-06 12:19:40        153 posdata/sales_date=2018-07-01/posdata_0001
2018-07-06 12:19:40        155 posdata/sales_date=2018-07-01/posdata_0002
2018-07-06 12:19:40        155 posdata/sales_date=2018-07-01/posdata_0003
:
2018-07-06 12:20:52        162 posdata/sales_date=2018-07-01/posdata_1000

ファイルの内容は、jsonの1レコードのみのファイルです。

$ aws s3 cp s3://cm-analytics-posdata/posdata/sales_date=2018-07-01/posdata_0001 - 
{"shop_id":"1001","shop_name":"秋葉原駅前店","item_id":"35940001","item_name":"単3乾電池(1本)","price":"50","quantity":"1","total":"50"}

groupFiles/groupSize指定しないコードと実行結果

まずは、groupFiles/groupSize指定しないデータソース(L.22)を入力した場合の結果を確認してみます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame_from_options(
    "s3", 
    {'paths': ["s3://cm-analytics-posdata/posdata/sales_date=2018-07-01/"]}, 
    format="json",
    transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

以下、実行結果です。出力先のS3フォルダの下に入力ファイルと同じ数の出力ファイルが生成されています。ファイルの中にヘッダ付きのCSVファイルの中に1レコードのみ出力されているファイルが1000ファイルあります。

$ aws s3 ls s3://cm-analytics-posdata/targetdata/ 
2018-07-06 19:38:48          0
2018-07-06 19:59:40        124 run-1531047575387-part-r-00000
2018-07-06 19:59:40        126 run-1531047575387-part-r-00001
:
2018-07-06 20:00:11        133 run-1531047575387-part-r-00999

$ aws s3 cp s3://cm-analytics-posdata/targetdata/run-1531047575387-part-r-00000 -
shop_id,shop_name,item_id,item_name,price,quantity,total
1001,秋葉原駅前店,35940001,単3乾電池(1本),50,1,50

groupFiles/groupSize指定したコードと実行結果

次にgroupFiles/groupSize指定したデータソース(L.22)を入力した場合の結果を確認してみます。先程のソースコードとの変更点は'groupFiles': 'inPartition', 'groupSize': 1024 * 1024です。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame_from_options(
    "s3", 
    {'paths': ["s3://cm-analytics-posdata/posdata"], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, 
    format="json",
    transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

なお、Glueデータカタログのテーブルをデータソースを指定する場合は、AWS Glueのデータソースのままでかまいません。groupFiles/groupSizeの指定はテーブルのTable Propertiesに設定します。

## @type: DataSource
## @args: [database = "default", table_name = "posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "posdata", transformation_ctx = "datasource0")

以下の通り、出力ファイルは1つになり、中に複数レコード格納されていることが確認できました。

$ aws s3 ls s3://cm-analytics-posdata/targetdata/
2018-07-06 19:38:48          0
2018-07-06 19:48:52      74512 run-1531046838806-part-r-00000

$ aws s3 cp s3://cm-analytics-posdata/targetdata/run-1531046838806-part-r-00000 -
shop_id,shop_name,item_id,item_name,price,quantity,total
1001,秋葉原駅前店,35940001,単3乾電池(1本),50,1,50
1001,秋葉原駅前店,35940002,単3乾電池(2本),100,1,100
:
1001,秋葉原駅前店,35941000,単3乾電池(1000本),50000,1,50000

補足: groupFilesのデフォルト動作

groupFilesオプションは入力ファイルが50,000個を超える場合、ファイルのグループ化はデフォルトで有効になっています。入力ファイルが50,000個未満の場合に、グループ化を無効にするには、このパラメータをinPartitionに設定します。入力ファイルが 50,000 個を超える場合に、グループ化を無効にするには、このパラメータをnoneに設定します。

最後に

Apache Sparkが多くの小さなファイルが本来不向きな短所を克服するため、AWSの Glueでは、groupFiles/groupSize指定によって多くの小さなファイルを処理するとタスクの増加やスケジューリングとメモリオーバーヘッドを軽減しパフォーマンスを改善できるように設計されています。

多くの小さなファイルを指定したサイズ(groupSize)までまとめたファイルを入力することで、出力ファイルを相対的に調整できることを意味します。Amazon Redshift SpectrumやAmazon Athena のクエリに最適なファイルサイズに調整する一つの選択肢の一つとしても有効です。