AWS Glue 新しくサポートされたScalaでETL Job作成を試してみました
はじめに
AWS Glueは、Pythonに加えてScalaプログラミング言語をサポートし、AWS Glue ETLスクリプトの作成時にPythonとScalaを選択できるようになりました。新しくサポートされたScalaでETL Jobを作成・実行して、ScalaとPythonコードの違いやScalaのユースケースについて解説します。
ScalaでETL Jobを作成して実行する
ETL Jobは、ソース、ターゲット、カラムのマッピング、ETL言語などを指定すると対応したETLコードが自動生成されます。その生成されたコードに対して、さらにテンプレートを追加したり、コードを編集します。では早速、ScalaでETL Jobを作成してみます。
Job Properties
全般的なETL Jobについての情報を設定します。Scalaプログラミング言語をサポートにつもない、ETL Language が追加され、Python
とScala
を選択できるようになりました。今回は、Scala
を選択します。
Data Sources
ETLの元となるデータの入力先を指定します。Glue Data Catalogに登録済みのデータソースの中から選択します。
Data Targets
ETLの後データを出力先を指定します。Parquetフォーマットのファイルに変換して、指定したS3に保存します。
Schema
入力(Data Source)と出力(Data Target)のカラムやデータ型の対応付けを定義します。今回はParquetフォーマットのファイルに変換するのみなのでそのまま変更しません。
Review
ETL Job設定の再確認です。ETL languageがscala
になっています。
Scalaのソースコード
定義が終わると、Scalaのソースコードが自動生成されて、エディタに表示されます。
ここでは特に変更を加えません。ScalaのETL Jobもこれまで通り、実行できました。
PythonとScalaのコードの違い
ETL JobをPythonとScala、ほぼ同じ条件で自動生成コードを掲載します。御覧頂いたとおり、実行しているステップや呼ばれているメソッドにほとんど違いがありません。AWS Glueは、SparkのRDDやDataFrameでデータ操作するのではなく、DataFrameをラップしたDynamicFrameと、その高レベルAPI(メソッド)を用いることで、プログラム言語間の違いによるメソッドの違いや制約、パフォーマンスの低下を回避しています。
自動生成されたコードでは、Data Soueceから直接DynamicFrameを取得して、変換からData Targetへの出力までDynamicFrameの操作で済むようなコードが生成されていますので、本質的にPythonとScalaのコードの違いは生じません。
Pythonのソースコード
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "default", table_name = "csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://my-bucket/flights_in_python"}, format = "parquet", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://my-bucket/flights_in_python"}, format = "parquet", transformation_ctx = "datasink4") job.commit()
Scalaのソースコード
import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // @type: DataSource // @args: [database = "default", table_name = "csv", transformation_ctx = "datasource0"] // @return: datasource0 // @inputs: [] val datasource0 = glueContext.getCatalogSource(database = "default", tableName = "csv", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame() // @type: ApplyMapping // @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"] // @return: applymapping1 // @inputs: [frame = datasource0] val applymapping1 = datasource0.applyMapping(mappings = Seq(("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")), caseSensitive = false, transformationContext = "applymapping1") // @type: ResolveChoice // @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] // @return: resolvechoice2 // @inputs: [frame = applymapping1] val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice2") // @type: DropNullFields // @args: [transformation_ctx = "dropnullfields3"] // @return: dropnullfields3 // @inputs: [frame = resolvechoice2] val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3") // @type: DataSink // @args: [connection_type = "s3", connection_options = {"path": "s3://my-bucket/tmp"}, format = "parquet", transformation_ctx = "datasink4"] // @return: datasink4 // @inputs: [frame = dropnullfields3] val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://my-bucket/tmp"}"""), transformationContext = "datasink4", format = "parquet").writeDynamicFrame(dropnullfields3) Job.commit() } }
Scalaのユースケース
これまでのコードを振り返ると、どちらの言語を選択してもDynamicFrameとそのAPIを使う限りどちらでも良いので、あえてScalaを使う理由がないように印象を持つかもしれません。確かにその通りなのですが、私なりのAWS GlueにおけるScalaの利点とユースケースについて述べたいと思います。
Scalaの利点
SparkのコードはScalaによって書かれていることもあり、Pythonと比較して一般的なSpark2.1ではSparkが提供する機能やライブラリの全てが利用できます。DataFrameが登場するまでRDDに対するPythonのクエリの速度はScalaの同じクエリに比べて半分以下になることもありました。なお、クエリのパフォーマンスの低下の原因は、PythonとJVM間でのコミュニケーションのオーバーヘッドによるものです。
Scalaのユースケース
既存のSparkのETLコードをAWS Glueに移行する場合、Scalaであれば利点でも述べた通り、Pythonと比較して一般的なScalaはSparkが提供する機能やライブラリのより多くが利用でき、Pythonで生じる性能低下の懸念がありません。AWSのGlueの入出力の仕組みを利用して、Data SourceからDynamicFrameの生成と、Data TargetへDynamicFrameを出力する以外は、既存のSparkのETLコードをそのまま置き換えることが可能であると考えられます。
ScalaでDataSetを利用する
(2018/02/24 当初は「Scalaを利用してもDataSetが利用できないかも」と書いていましたが、動作が確認できましたので修正しました。) AWS GlueのクエリエンジンであるSpark2.1では、DataFrameとDataSetが統合されており、DataSetはScalaから利用できます。なお、PythonでDataset APIが実装されていない理由の一つは、Pythonが型安全な言語ではないためです。
AWS Glue(Spark2.1)環境でも、ScalaであればDataSetを生成できることを動作確認しました。
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import org.apache.spark.sql.types._ case class Person(name: String, age: Long) object GlueApp { // Dataset def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession = glueContext.getSparkSession // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Create DataSet with filter. import sparkSession.implicits._ val persons = Seq(Person("foo", 25),Person("bar", 35),Person("baz", 37)).toDS() persons.filter(_.age > 30).show() Job.commit() } }
以下の通り、DataSetが利用できました。
Log Contents: +----+---+ |name|age| +----+---+ | bar| 35| | baz| 37| +----+---+ End of LogType:stdout
最後に
Sparkのサンプルコードは、Scalaで書かれていることが多く、これらをGlueで活用できることは大きなメリットでしょう。Pythonと比較して一般的なScalaは、Sparkが提供する機能やライブラリのより多くが利用でき、RDDに対するPythonのクエリの速度低下を気にせずにETLコードが作成できることで、既存のSparkのETLコードをAWS Glueに移行するが容易になります。
AWS Glueは、SparkのRDDやDataFrameでデータ操作するのではなく、RDDをラップしたDynamicFrameと、その高レベルAPI(メソッド)を用いることで、プログラム言語間の違いによるメソッドの違いや制約、パフォーマンスの低下を回避しています。ScalaがサポートされたことでDynamicFrameに依存しないコードが書けるようになりましたが、今後もAWS GlueのETLコードは、ETL目的に最適化されたDynamicFrameを活用して簡潔にコードを作成することをおすすめします。