AWS Glue ジョブタイプ『Spark』が Apache Spark 2.4.3 と Python 3.6をサポートしました

2019.07.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

本日から Apache Spark 2.4 と Python 3 による『Spark』ジョブを使用してスクリプトを実行できるようになりました。今後はPython 2(Spark 2.2 又は Spark 2.4)と Python 3(Spark 2.4)のいずれかを選択可能になりました。早速確認してみたいと思います。

Spark ジョブ の作成

Configure the job properties 画面の [Type] でSparkを選択すると、[Glue version]でSparkのバージョンと言語を選べるように変わりました。今後のデフォルトは、Spark 2.4, Python 3 (Glue Version 1.0)になります。

組み合わせを見ると、従来の Spark 2.2は、Python 2 とScalaのどちらかを選択できます。今日から 新しい Spark 2.4は、Python 2 と Python 3 と Scala のいずれかを選択できます。これまでの Spark 2.2は、Glue version 0.9新しい Spark 2.4は、Glue version 1.0とGlueのバージョンも異なります。

Glue versionは、ジョブを追加または更新するときに設定します。Glueのバージョンは、AWS Glueのバージョンによって Apache Spark と Pythonのバージョンが決定します。

| Glue のバージョン | Spark と Python バージョン | | :---------------- | :------------------------------------------ | | Glue 0.9 | Spark 2.2.1 Python 2.7 | | Glue 1.0 | Spark 2.4.3 Python 2.7 Python 3.6 |

引用:Adding Jobs in AWS Glue

Spark ジョブ の実行

Glue versionに新しいSpark 2.4, Python3 (Glue version 1.0)を指定してCSVファイルからParquetに変換するジョブを作成してみます。しかし、生成されたスクリプトは他のGlue versionを指定した場合と変わりませんでした。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "orders", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "orders", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("row id", "long", "row id", "long"), ("order id", "string", "order id", "string"), ("order date", "string", "order date", "string"), ("ship date", "string", "ship date", "string"), ("ship mode", "string", "ship mode", "string"), ("customer id", "string", "customer id", "string"), ("customer name", "string", "customer name", "string"), ("segment", "string", "segment", "string"), ("city", "string", "city", "string"), ("state", "string", "state", "string"), ("country", "string", "country", "string"), ("region", "string", "region", "string"), ("product id", "string", "product id", "string"), ("category", "string", "category", "string"), ("sub-category", "string", "sub-category", "string"), ("product name", "string", "product name", "string"), ("sales", "double", "sales", "double"), ("quantity", "long", "quantity", "long"), ("discount", "double", "discount", "double"), ("profit", "double", "profit", "double")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("row id", "long", "row id", "long"), ("order id", "string", "order id", "string"), ("order date", "string", "order date", "string"), ("ship date", "string", "ship date", "string"), ("ship mode", "string", "ship mode", "string"), ("customer id", "string", "customer id", "string"), ("customer name", "string", "customer name", "string"), ("segment", "string", "segment", "string"), ("city", "string", "city", "string"), ("state", "string", "state", "string"), ("country", "string", "country", "string"), ("region", "string", "region", "string"), ("product id", "string", "product id", "string"), ("category", "string", "category", "string"), ("sub-category", "string", "sub-category", "string"), ("product name", "string", "product name", "string"), ("sales", "double", "sales", "double"), ("quantity", "long", "quantity", "long"), ("discount", "double", "discount", "double"), ("profit", "double", "profit", "double")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-superstore/target_spark24_py3/orders"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://cm-superstore/target_spark24_py3/orders"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

上記のPythonスクリプトを実行しました。Spark 2.4.3 であることが確認できます。

Python 3によるマルチバイトのエラーが解決

デバック目的でDataframeの内容を表示することがよくありますが、従来は、マルチバイトを含むDataframeの内容をshow()で確認すると、エラーでDriverが落ちてジョブが終了しました。そのため、データの中にマルチバイトを含む場合はshow()が使えないという制限がありました。

例えば、Spark 2.2, Python2 (Glue version 0.9)以下のコードを実行すると

## @type: DataSource
## @args: [database = "default", table_name = "orders", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "orders", transformation_ctx = "datasource0")

df = datasource0.toDF()
df.show()

以下のようにUnicodeEncoderErrorが発生してジョブが終了します。

新しい Spark 2.4, Python3 (Glue version 1.0)を用いることで、マルチバイト文字も問題なくログ出力できるようになります。

Spark 2.4による機能やパフォーマンスの改善に期待!

AWS GlueにおいてApache Spark 2.4の全ての機能が利用できるとは限りませんが、Avroの読み込みやArrayやMapなどの関数サポート、Optimizerの改善によるパフォーマンスの向上などが期待できます。新機能やパフォーマンスについては又の機会にご報告します。

まとめ

2020年1月1日にPython 2 EOLを迎えます。Apache Spark 2.4は、Python 2をサポートしていますが、Python 2 EOL以降は、Python 2に固有のパッチは使用されない可能性があります。新規のジョブは新しいSpark 2.4, Python3 (Glue version 1.0)で作成すること、既存のジョブも順次移行することをご検討ください。