どーも、データアナリティクス事業本部コンサルティングチームのsutoです。
GlueのジョブをGlue Studioのビジュアルジョブエディターで作成した経験はあるのですが、もともとSparkのコードを書いたことがないこともあり、さすがにコードで読めるようにならないと、と思って今回のブログとして証跡を残しておきます。
まだまだ素人レベルなので「Pyspark強くない人が頑張って勉強しているんだな〜」と温かい目で見ていただけると幸いです。
今回の題材について
今回は以下のGithubの資料を使わせていただき、作成したGlueジョブをSparkコードで見た際の中身を理解していこうと思います。
リソース作成
※構築の詳しい手順はGithub上にあるため、ここでは省略したかたちで記述しています。
- Cloudformationスタックを実行して、Glueテーブル、S3バケット、IAMロールといった必要なリソースを自動作成します
- Glue Studioの画面から「Vusial with a blank canvas」を選択して「Create」をクリック
- エディタ画面で手順に従いジョブの内容を作成していきます。
- 「Job detail」では以下のように、本書の手順とは違い「Glue 4.0」を選択してジョブを実行しています
- 結果、Glue4.0でも問題なく完了しており、データ確認においても問題なさそうでした。
コードの中身を読み解いてみる
Glue Studioの「Script」タブからジョブの内容をSparkコードで表示できます。
各ノードごとに対応しているコード部分を抜粋しながら見ていきます。
Tickets, Trials (Data source - S3 bucket)
from pyspark.context import SparkContext
from awsglue.context import GlueContext
〜
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
〜
# Script generated for node Tickets
Tickets_node1674540573545 = glueContext.create_dynamic_frame.from_catalog(
database="yyz-tickets",
table_name="tickets",
transformation_ctx="Tickets_node1674540573545",
)
# Script generated for node Trials
Trials_node1674540653602 = glueContext.create_dynamic_frame.from_catalog(
database="yyz-tickets",
table_name="trials",
transformation_ctx="Trials_node1674540653602",
)
- GlueContextクラスはSparkContextをラップすることによりApache Spark プラットフォームとやり取りするためのメカニズムが提供されます。
- 今回はCloudformationにて事前に作成してあるGlue Data Catalogテーブル(tickets, trials)があるので、「create_data_frame_from_catalog」でデータフレームを作成する際にテーブル名などのカタログ情報を参照しています。
- transformation_ctxはジョブのブックマークを行うオプションであり、AWS Glue ではジョブの実行による状態情報を保持することで、ETL ジョブの以前の実行中にすでに処理されたデータを追跡できます。処理履歴を保持してくれるので「1度目のジョブ実行後、ソースデータの増加があって再度ジョブ実行した際、増分データのみをロードして処理する」といった動作が可能です。
Tickets_Maping, Trrial_Mapping (ApplyMapping)
# Script generated for node Ticket_Mapping
Ticket_Mapping_node1674540804473 = ApplyMapping.apply(
frame=Tickets_node1674540573545,
mappings=[
("tag_number_masked", "string", "tag_number_masked", "string"),
("date_of_infraction", "string", "date_of_infraction", "string"),
("ticket_date", "string", "ticket_date", "string"),
("ticket_number", "decimal", "ticket_number", "int"),
("officer", "decimal", "officer", "decimal"),
("infraction_code", "decimal", "infraction_code", "decimal"),
("infraction_description", "string", "infraction_description", "string"),
("set_fine_amount", "decimal", "set_fine_amount", "decimal"),
("time_of_infraction", "decimal", "time_of_infraction", "decimal"),
],
transformation_ctx="Ticket_Mapping_node1674540804473",
)
# Script generated for node Trial_Mapping
Trial_Mapping_node1674540948034 = ApplyMapping.apply(
frame=Trials_node1674540653602,
mappings=[
("court_date", "date", "court_date", "date"),
("court_location", "string", "court_location", "string"),
("court_room", "string", "court_room", "string"),
("court_time", "int", "court_time", "int"),
("parking_ticket_number", "long", "parking_ticket_number", "int"),
("infraction_date", "date", "infraction_date", "date"),
("first_3_letters_name", "string", "first_3_letters_name", "string"),
("sentence", "string", "sentence", "string"),
],
transformation_ctx="Trial_Mapping_node1674540948034",
)
- Glue Data Catalogから作成したデータフレームをもとに、ApplyMapping クラスでフィールドタイプを変更しています。
- TicketとTrialからそれぞれ一部のカラムタイプを変更してTickets_Maping, Trrial_Mappingとしてでデータフレームを保持しています。
Join_Ticket_Trial
# Script generated for node Join_Ticket_Trial
Join_Ticket_Trial_node1674541023488 = Join.apply(
frame1=Ticket_Mapping_node1674540804473,
frame2=Trial_Mapping_node1674540948034,
keys1=["ticket_number"],
keys2=["parking_ticket_number"],
transformation_ctx="Join_Ticket_Trial_node1674541023488",
)
- Joinクラスで2つのデータフレーム(Tickets_Maping, Trrial_Mapping)を結合させています。必須項目としてそれぞれのデータフレームと結合キーを指定しています。
- Inner joinということで、今回生成されたスクリプトはJoinクラスでしたが、Outer joinなどの他の結合の場合は以下のような処理になるようです。
- 2つのダイナミックフレームをDataframeクラスに変換する
- joinメソッドで結合タイプを指定しつつ、データフレームを結合する
- 結合したデータフレームをダイナミックフレームに戻す
# Outer joinを指定した場合のコード
Ticket_Mapping_node1674540804473DF = Ticket_Mapping_node1674540804473.toDF()
Trial_Mapping_node1674540948034DF = Trial_Mapping_node1674540948034.toDF()
Join_Ticket_Trial_node1674541023488 = DynamicFrame.fromDF(
Ticket_Mapping_node1674540804473DF.join(
Trial_Mapping_node1674540948034DF,
(
Ticket_Mapping_node1674540804473DF["ticket_number"]
== Trial_Mapping_node1674540948034DF["parking_ticket_number"]
),
"outer",
),
glueContext,
"Join_Ticket_Trial_node1674541023488",
)
Aggregate_Tickets (Transform - Custom code)
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
# Script generated for node Aggregate_Tickets
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
selected = dfc.select(list(dfc.keys())[0]).toDF()
selected.createOrReplaceTempView("ticketcount")
totals = spark.sql(
"select court_location as location, infraction_description as infraction, count(infraction_code) as total FROM ticketcount group by infraction_description, infraction_code, court_location order by court_location asc"
)
results = DynamicFrame.fromDF(totals, glueContext, "results")
return DynamicFrameCollection({"results": results}, glueContext)
〜
# Script generated for node Aggregate_Tickets
Aggregate_Tickets_node1674541125620 = MyTransform(
glueContext,
DynamicFrameCollection(
{"Join_Ticket_Trial_node1674541023488": Join_Ticket_Trial_node1674541023488},
glueContext,
),
)
- カスタム変換コードを指定した変換処理の部分です。
- 定義したカスタム変換処理用の関数(MyTransform)に引数(glueContext, Join_Ticket_Trial_node1674541023488)を渡して、DynamicFrameCollectionクラスとして返り値を取得しています。
- 変換内容は、結合テーブルに対して「court_locationごとのinfraction_codeの合計数を集計する」SQLを実行して得られた結果を返しています。
- 関数処理の流れは、
- 受け取ったダイナミックをtoDFでデータフレームに変換
- createOrReplaceTempViewで一時テーブル作成
- SQL実行
- 結果のデータフレームをfromDFでダイナミックフレームに戻してreturn
- といった感じです。
Select_Aggregated_Data, Ticket_Count_Dest
# Script generated for node Select_Aggregated_Data
Select_Aggregated_Data_node1674541295187 = SelectFromCollection.apply(
dfc=Aggregate_Tickets_node1674541125620,
key=list(Aggregate_Tickets_node1674541125620.keys())[0],
transformation_ctx="Select_Aggregated_Data_node1674541295187",
)
# Script generated for node Ticket_Count_Dest
Ticket_Count_Dest_node1674541336665 = glueContext.write_dynamic_frame.from_options(
frame=Select_Aggregated_Data_node1674541295187,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://glue-studio-blog-133998493178/parking_tickets_count/",
"partitionKeys": [],
},
format_options={"compression": "gzip"},
transformation_ctx="Ticket_Count_Dest_node1674541336665",
)
- SelectFromCollectionクラスで変換後のデータフレームを指定し、glueContextのwrite_dynamic_frame.from_optionsを使って出力処理を行なっています。
- オプションで出力先S3バケットパス、フォーマット、圧縮形式を指定しています。
- また、Glueジョブは上記のような処理で出力すると分割ファイルとしてS3バケットに出力されますが、これを1つの出力ファイルとして保存したい場合もあるかと思います。
- 方法としては、最低限repartition(1)を追記してファイル数(一つ)でパーティションを再設定、さらにs3への接続オプションに‘groupSize’: ‘10485760’などのように分割されるファイルサイズの上限値を設定しています。
- ただしファイル分割を1つに指定するということはGlue側で並列処理が働かない分パフォーマンスは悪化しますのでご注意ください。
Ticket_Count_Dest_node1674541336665 = glueContext.write_dynamic_frame.from_options(
frame=Select_Aggregated_Data_node1674541295187.repartition(1),
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://glue-studio-blog-133998493178/parking_tickets_count/",
"partitionKeys": [],
'groupSize': '10485760'
},
format_options={"compression": "gzip"},
transformation_ctx="Ticket_Count_Dest_node1674541336665",
)
実行結果
最後に
Glueジョブにおける基本シナリオとなる、「Glueテーブルの読み込み」「テーブル定義の変換」「テーブル結合」「カスタム変換の挿入」「S3出力」をコードで読み込めるように勉強しました。
基礎となる処理を理解できれば応用も聴きやすくなるし、何よりpySparkを覚えるとっかかりとして良かったと思っています。