この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
最近、データサイズは大きくない割に、ジョブが徐々に遅くなったり、メモリ不足が発生して処理が中断するといった相談を受けましたので、その対策の一つである小さなファイルをまとめて読み込むgroupFiles/groupSize指定について解説します。
多くの小さなファイル読み込みの問題点
普通の Apache Sparkは多くの小さなファイル(Kinesis Firehoseのファイル等)は、本来不向きです。理由は多くの小さなファイルを処理するとタスクが多すぎて、スケジューリングとメモリオーバーヘッドの増加し、最終的にはメモリ不足が発生して処理が中断します。一方、AWS Glue(のSpark)は、今回ご紹介するgroupFiles/groupSize指定することで、タスクごとにファイルを自動的にグループ化して正常にETLを終えることができます。
この機能については、昨年のre:Inventのセッションで紹介されていました。
このセッションについては、日本語で現地レポートとしてまとめています。
groupFiles/groupSize指定とは
AWS GlueのETLジョブはS3データストアからファイルをまとめて読み取るグループ化を設定できます。S3データパーティション内のファイルはグループ化(groupFiles)と読み取るグループのサイズ(groupSize)を設定することで、各ETLタスクでは入力ファイルのグループを1つのメモリ内パーティションに読み取ることができます。
groupFilesには、1つのメモリ内パーティションに読み取るinPartition
を設定します。AWS Glue では、入力ファイル数が 50,000 を超える場合、自動的にグループ化を有効にします。以下に例を示します。
'--groupFiles': 'inPartition'
groupSizeには、1つのメモリ内パーティションに読み取るサイズの上限値をバイト数で指定します。以下、グループサイズを1MBに設定する方法は次のとおりです。
'--groupSize': '1024 * 1024'
Amazon S3 から直接読み取る場合
S3データストアのファイルを読み取る場合は create_dynamic_frame_from_options メソッドに以下の接続オプションを追加します。たとえば、ファイルを 1 MB のグループにグループ化する方法は次のとおりです。
dyf = glueContext.create_dynamic_frame_from_options(
"s3",
{'paths': ["s3://s3path/"], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024},
format="json")
Glueデータカタログのテーブルを読み取る場合
マニュアルに明記されているわけではないのですが、マニュアルの内容を精査して検証した結果、データソースのテーブルのTable Propertiesに2つのパラメタを設定することで動作することを確認しました。Glueデータカタログのテーブルの場合はジョブのパラメタではなく、データソースのテーブルに設定が必要になります。
実際の動作の解説
Sparkは、repartitionしない限り入力したファイルごとに出力ファイルが作成されます。つまり、出力がS3の場合は、1入力ファイルに対して1つ以上の出力ファイルが生成されます。groupFiles/groupSize指定することで、1000入力ファイルが1つのファイルにまとめられることによって出力ファイルが1つになるかどうかを確認します。ETLジョブは、jsonファイルを入力して、csvファイルにフォーマット変換する簡単なものです。
検証データ
多くの小さいファイルの例として、1ファイル1レコードのデータファイルを1000ファイル作成しました。
$ aws s3 ls cm-analytics-posdata --recursive
2018-07-06 12:19:40 153 posdata/sales_date=2018-07-01/posdata_0001
2018-07-06 12:19:40 155 posdata/sales_date=2018-07-01/posdata_0002
2018-07-06 12:19:40 155 posdata/sales_date=2018-07-01/posdata_0003
:
2018-07-06 12:20:52 162 posdata/sales_date=2018-07-01/posdata_1000
ファイルの内容は、jsonの1レコードのみのファイルです。
$ aws s3 cp s3://cm-analytics-posdata/posdata/sales_date=2018-07-01/posdata_0001 -
{"shop_id":"1001","shop_name":"秋葉原駅前店","item_id":"35940001","item_name":"単3乾電池(1本)","price":"50","quantity":"1","total":"50"}
groupFiles/groupSize指定しないコードと実行結果
まずは、groupFiles/groupSize指定しないデータソース(L.22)を入力した場合の結果を確認してみます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame_from_options(
"s3",
{'paths': ["s3://cm-analytics-posdata/posdata/sales_date=2018-07-01/"]},
format="json",
transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
以下、実行結果です。出力先のS3フォルダの下に入力ファイルと同じ数の出力ファイルが生成されています。ファイルの中にヘッダ付きのCSVファイルの中に1レコードのみ出力されているファイルが1000ファイルあります。
$ aws s3 ls s3://cm-analytics-posdata/targetdata/
2018-07-06 19:38:48 0
2018-07-06 19:59:40 124 run-1531047575387-part-r-00000
2018-07-06 19:59:40 126 run-1531047575387-part-r-00001
:
2018-07-06 20:00:11 133 run-1531047575387-part-r-00999
$ aws s3 cp s3://cm-analytics-posdata/targetdata/run-1531047575387-part-r-00000 -
shop_id,shop_name,item_id,item_name,price,quantity,total
1001,秋葉原駅前店,35940001,単3乾電池(1本),50,1,50
groupFiles/groupSize指定したコードと実行結果
次にgroupFiles/groupSize指定したデータソース(L.22)を入力した場合の結果を確認してみます。先程のソースコードとの変更点は'groupFiles': 'inPartition', 'groupSize': 1024 * 1024
です。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame_from_options(
"s3",
{'paths': ["s3://cm-analytics-posdata/posdata"], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024},
format="json",
transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("shop_id", "string", "shop_id", "string"), ("shop_name", "string", "shop_name", "string"), ("item_id", "string", "item_id", "string"), ("item_name", "string", "item_name", "string"), ("price", "string", "price", "string"), ("quantity", "string", "quantity", "string"), ("total", "string", "total", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-analytics-posdata/targetdata"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
なお、Glueデータカタログのテーブルをデータソースを指定する場合は、AWS Glueのデータソースのままでかまいません。groupFiles/groupSizeの指定はテーブルのTable Propertiesに設定します。
## @type: DataSource
## @args: [database = "default", table_name = "posdata", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "posdata", transformation_ctx = "datasource0")
以下の通り、出力ファイルは1つになり、中に複数レコード格納されていることが確認できました。
$ aws s3 ls s3://cm-analytics-posdata/targetdata/
2018-07-06 19:38:48 0
2018-07-06 19:48:52 74512 run-1531046838806-part-r-00000
$ aws s3 cp s3://cm-analytics-posdata/targetdata/run-1531046838806-part-r-00000 -
shop_id,shop_name,item_id,item_name,price,quantity,total
1001,秋葉原駅前店,35940001,単3乾電池(1本),50,1,50
1001,秋葉原駅前店,35940002,単3乾電池(2本),100,1,100
:
1001,秋葉原駅前店,35941000,単3乾電池(1000本),50000,1,50000
補足: groupFilesのデフォルト動作
groupFilesオプションは入力ファイルが50,000個を超える場合、ファイルのグループ化はデフォルトで有効になっています。入力ファイルが50,000個未満の場合に、グループ化を無効にするには、このパラメータをinPartition
に設定します。入力ファイルが 50,000 個を超える場合に、グループ化を無効にするには、このパラメータをnone
に設定します。
最後に
Apache Sparkが多くの小さなファイルが本来不向きな短所を克服するため、AWSの Glueでは、groupFiles/groupSize指定によって多くの小さなファイルを処理するとタスクの増加やスケジューリングとメモリオーバーヘッドを軽減しパフォーマンスを改善できるように設計されています。
多くの小さなファイルを指定したサイズ(groupSize)までまとめたファイルを入力することで、出力ファイルを相対的に調整できることを意味します。Amazon Redshift SpectrumやAmazon Athena のクエリに最適なファイルサイズに調整する一つの選択肢の一つとしても有効です。