[Glue4.0] シングルノードで動くpandas処理を1行書き換えて並列分散処理にする!

タイトルは「○○ができる場合がある」をあたかも一般に「○○できる」かのように書いている悪い例です
2023.01.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日Glue4.0がリリースされました。

[アップデート] 最新バージョンGlue 4.0がリリースされました #reinvent

Glue4.0に対応するSparkのバージョンは3.3.0となり、 これによってGlueでもSparkのpandas APIが使えるようになりました! これはどういうことかというと、 シングルノードで動かしていたpandasの処理が コードの変更はほとんどなしにそのままGlue上での分散処理に移行できる(可能性が高い)ということです。

ちょうど、pandasが1つのノードで動いており、 データ量によってはそれがメモリ不足で処理が失敗することがあったので、 これが解決できそうかの探りを入れてみました。

検証タスク

以下2つのタスクを実行してみました。

裏を話すと、最初はタスク1だけで確かめられるかと思ったのですが、 あまり面白くない結果だったのでタスク2もやった感じになります。 結果的としてタスク1はタスク2の準備というぐらいの価値しかないので、 タスク1は読み飛ばして頂いても問題ないと思います。

  • タスク1
    • 10行のcsvを水増しして1億行にする
  • タスク2
    • (タスク1の結果の)1億行のcsvをソートする

タスク1

「10行のcsvを水増しして1億行にする」というタスクをやってみます。

入力となるファイルはこれです。

10lines.csv

id,name
x,name0
x,name1
x,name2
x,name3
x,name4
x,name5
x,name6
x,name7
x,name8
x,name9

idと言いつつ全て共通の値 x が入っています。 これに対して「idが同一のものをjoinする」という処理を行ってデータの水増しをして行こうと思います。 今回は3回やって

10 -> 100 -> 10000 -> 100000000(1億)

と行を増やします。

普通のpandas

普通のpandasを使う処理をGlueのスクリプトに書いて実行してみます。 Glueで動かしますが、分散環境を使わずマスターノードだけが仕事をすることになります。

ハイライトした行が本質的な部分で、 それ以外はGlueという枠組み上で動かすためにテンプレートをそのまま残している箇所です。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pandas as pd

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

input_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/10lines.csv"
output_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/100mlines.csv"
df = pd.read_csv(input_s3path)
print(f"df.shape: {df.shape}")
df = pd.merge(df, df, on="id")
print(f"df.shape: {df.shape}")
df = pd.merge(df, df, on="id")
print(f"df.shape: {df.shape}")
df = pd.merge(df, df, on="id")
print(f"df.shape: {df.shape}")
df.to_csv(output_s3path)
print("done")

job.commit()

結果

以下のように、単一のcsvファイルが出力されます。 1億行あるので5.9GB程度あります。

$ aws s3 ls s3://cm-hirano-shigetoshi-study/glue4-pandas-df/
2023-01-11 10:30:41 5888888982 100mlines.csv

中身を確認すると、想定通りの中身となっていました。 name列の数が倍々で増えていくので8カラムになっています。 一応注意点として、以下はpandasが勝手にやってくれた部分です

  • 先頭に行番号カラムが追加される
  • mergeして作成されたカラムに適当な名前がつく
,id,name_x_x_x,name_y_x_x,name_x_y_x,name_y_y_x,name_x_x_y,name_y_x_y,name_x_y_y,name_y_y_y
0,x,name0,name0,name0,name0,name0,name0,name0,name0
1,x,name0,name0,name0,name0,name0,name0,name0,name1
2,x,name0,name0,name0,name0,name0,name0,name0,name2
3,x,name0,name0,name0,name0,name0,name0,name0,name3
4,x,name0,name0,name0,name0,name0,name0,name0,name4
5,x,name0,name0,name0,name0,name0,name0,name0,name5
6,x,name0,name0,name0,name0,name0,name0,name0,name6
7,x,name0,name0,name0,name0,name0,name0,name0,name7
8,x,name0,name0,name0,name0,name0,name0,name0,name8
9,x,name0,name0,name0,name0,name0,name0,name0,name9
10,x,name0,name0,name0,name0,name0,name0,name1,name0

この処理は10分ほどで終わっていました。 1億行まで増やしているのでもっと時間がかかったり、 途中でメモリ不足になったりするかなと思いましたが、意外とすんなり終わりました。

pysparkのpandasでやってみる

次にこれをpysparkのpandasでやってみます。

やり方は簡単です。これだけです。超簡単です。

- import pandas as pd
+ import pyspark.pandas as pd

importするpandasをpyspark.pandasに変更します。 as pdで同じエイリアスを与えてあげれば、 ソースコード本体は一切書き換える必要はありません。

よって、上とほぼ全く同じスクリプトがこちらです。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

#import pandas as pd
import pyspark.pandas as pd

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

input_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/10lines.csv"
output_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/100mlines.csv"
df = pd.read_csv(input_s3path)
print(f"df.shape: {df.shape}")
df = pd.merge(df, df, on="id")
print(f"df.shape: {df.shape}")
df = pd.merge(df, df, on="id")
print(f"df.shape: {df.shape}")
df = pd.merge(df, df, on="id")
print(f"df.shape: {df.shape}")
df.to_csv(output_s3path)
print("done")

job.commit()

結果

出力ファイル名を前と同じにしてしまったので、ファイルが上書きされるはず、、、 と思いきや、100mlines.csv/というディレクトリができており、 その中にcsvファイルが保存されています。

$ aws s3 ls s3://cm-hirano-shigetoshi-study/glue4-pandas-df/
2023-01-11 10:30:41 5888888982 100mlines.csv
                           PRE 100mlines.csv/
$ aws s3 ls s3://cm-hirano-shigetoshi-study/glue4-pandas-df/100mlines.csv/
2023-01-11 09:57:03 5000000091 part-00000-86ef9d0d-2338-4a40-b6cc-6768971f9c75-c000.csv

これは分散処理環境ではお馴染みな感じですが、 各ワーカーノードが処理したものがpartに分かれて出力されている形になっています。 分散処理なので、処理分割数の分だけファイルが出力されているということです。

ただし今の場合出力ファイルは1つなので、1つのワーカーノードが処理を行ったようです。 これはちょっと期待した動きではないですが、 まずはimport文を変更するだけで、コードはそのままに分散環境が利用できた(っぽい)ことが確認できました。

この処理は5分くらいで終わっていました。 先ほどより早いのは、 ワーカーノードの方が処理に適したリソース構成になっているためなのかなと思います。

こちらも中を確認してみます。 本質的な部分はもちろん同じですが、 先頭の行番号カラムは付与されません。 これは分散で処理された場合、 ワーカーノードは、自分がデータ全体のどの部分を処理しているかを知らないので行番号をつけられないためです。

id,name_x_x_x,name_y_x_x,name_x_y_x,name_y_y_x,name_x_x_y,name_y_x_y,name_x_y_y,name_y_y_y
x,name0,name9,name9,name0,name0,name9,name9,name0
x,name0,name9,name9,name0,name0,name9,name9,name1
x,name0,name9,name9,name0,name0,name9,name9,name2
x,name0,name9,name9,name0,name0,name9,name9,name3
x,name0,name9,name9,name0,name0,name9,name9,name4
x,name0,name9,name9,name0,name0,name9,name9,name5
x,name0,name9,name9,name0,name0,name9,name9,name6
x,name0,name9,name9,name0,name0,name9,name9,name7
x,name0,name9,name9,name0,name0,name9,name9,name8

import文を変えるだけで、なんか違う環境で実行されてるっぽいことはわかりましたが、 やはりこの結果だけでは全然実感が湧きません。 次のタスク2へ進みます。

タスク2

タスク1の結果出力されたファイルを使って 「1億行のcsvをソートする」 をやっていきます。

通常のpandas

スクリプトを掲示します。 8個に増えたカラムを後ろから順にソートするようにしています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pandas as pd
#import pyspark.pandas as pd

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

input_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/100mlines.csv"
output_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/sorted_100mlines.csv"
df = pd.read_csv(input_s3path)
print(f"df.shape: {df.shape}")
df = df.sort_values(
    [
        "name_y_y_y",
        "name_x_y_y",
        "name_y_x_y",
        "name_x_x_y",
        "name_y_y_x",
        "name_x_y_x",
        "name_y_x_x",
        "name_x_x_x",
    ]
)
print("sort done")
df.to_csv(output_s3path)
print("done")

job.commit()

結果

こちらはGlueジョブが失敗に終わりました。 さすがに1億行のソートは1ノードには少々荷が重かったようです。

pysparkのpandasでやってみる

変更点は前と同じく、import文を変更するだけです。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

#import pandas as pd
import pyspark.pandas as pd

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

input_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/100mlines.csv"
output_s3path = "s3://cm-hirano-shigetoshi-study/glue4-pandas-df/sorted_100mlines.csv"
df = pd.read_csv(input_s3path)
print(f"df.shape: {df.shape}")
df = df.sort_values(
    [
        "name_y_y_y",
        "name_x_y_y",
        "name_y_x_y",
        "name_x_x_y",
        "name_y_y_x",
        "name_x_y_x",
        "name_y_x_x",
        "name_x_x_x",
    ]
)
print("sort done")
df.to_csv(output_s3path)
print("done")

job.commit()

結果

Glueジョブは正常に成功したようです!

結果を見てみます。

$ aws s3 ls s3://cm-hirano-shigetoshi-study/glue4-pandas-df/sorted_100mlines.csv/
2023-01-11 13:38:51  160383040 part-00000-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  153955415 part-00001-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  158863452 part-00002-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  152652354 part-00003-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  147394430 part-00004-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  167418108 part-00005-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:55  177139944 part-00006-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  177911941 part-00007-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  143179536 part-00008-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  168306084 part-00009-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  166107194 part-00010-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  156108389 part-00011-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:55  171882923 part-00012-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  180935244 part-00013-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:50  153601551 part-00014-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  161682313 part-00015-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  166301064 part-00016-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  159469593 part-00017-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  171061629 part-00018-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  166603227 part-00019-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  160448628 part-00020-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  155015272 part-00021-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:54  178729494 part-00022-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  168120274 part-00023-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  165678378 part-00024-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:54  173178849 part-00025-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  157630558 part-00026-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  161276468 part-00027-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  178819438 part-00028-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:56  193073095 part-00029-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  158531662 part-00030-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  169073806 part-00031-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:51  144867957 part-00032-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  149622240 part-00033-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:52  154258985 part-00034-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv
2023-01-11 13:38:53  159609775 part-00035-ef25667f-8d33-4ce4-9e90-ab4e37439014-c000.csv

36個のファイルに分割されて出力されていますので、 きちんと分散環境で処理されたことがわかります。 もちろん名前の順番に結合すればソートされた単一のデータとなります。 処理時間も3分程度と、思ったよりかなり速い印象です。

1億行のソートというのは1つのマシン上ではかなり厳しい処理ですが、 import文を変えるだけで分散処理環境上に載せ替えることができ、 実用的な時間で処理が完了していることが確認できました!!

次に出力ファイルの中身を確認してみます。

_c0,id,name_x_x_x,name_y_x_x,name_x_y_x,name_y_y_x,name_x_x_y,name_y_x_y,name_x_y_y,name_y_y_y
0,x,name0,name0,name0,name0,name0,name0,name0,name0
10000000,x,name1,name0,name0,name0,name0,name0,name0,name0
20000000,x,name2,name0,name0,name0,name0,name0,name0,name0
30000000,x,name3,name0,name0,name0,name0,name0,name0,name0
40000000,x,name4,name0,name0,name0,name0,name0,name0,name0
50000000,x,name5,name0,name0,name0,name0,name0,name0,name0
60000000,x,name6,name0,name0,name0,name0,name0,name0,name0
70000000,x,name7,name0,name0,name0,name0,name0,name0,name0
80000000,x,name8,name0,name0,name0,name0,name0,name0,name0
90000000,x,name9,name0,name0,name0,name0,name0,name0,name0
1000000,x,name0,name1,name0,name0,name0,name0,name0,name0
11000000,x,name1,name1,name0,name0,name0,name0,name0,name0
21000000,x,name2,name1,name0,name0,name0,name0,name0,name0
31000000,x,name3,name1,name0,name0,name0,name0,name0,name0
41000000,x,name4,name1,name0,name0,name0,name0,name0,name0
51000000,x,name5,name1,name0,name0,name0,name0,name0,name0
61000000,x,name6,name1,name0,name0,name0,name0,name0,name0
71000000,x,name7,name1,name0,name0,name0,name0,name0,name0
81000000,x,name8,name1,name0,name0,name0,name0,name0,name0
91000000,x,name9,name1,name0,name0,name0,name0,name0,name0
2000000,x,name0,name2,name0,name0,name0,name0,name0,name0

元のデータから左右をひっくり返したようなデータになっています(言葉で伝わりにくいですが、伝われ!)。 また元の行番号も表示されているので、ソートされたことがよくわかります。

今度は、元々1ノードでは処理しきれなかったものが分散環境で処理されたという結果になりました! これなら「おぉ!」って感じになりますね!

まとめ

Glue4.0がリリースされたことで、 Sparkのpandas APIがGlueでも使えるようになりました! これによって従来シングルノードで動いていたpandasの処理が、 ほぼコードの変更なしに分散環境で動かせることが確認できました。

pandasでデータ整形のスクリプトを書く時、 最初はそれほど大きなデータが対象になることを想定していないような場合も多いと思いますが、 そのプログラムを使い続けているうちにいつしかデータが大きくなってしまうこともあります。 しかし大きなデータを対象とする際にプログラムを大きく変更しなくてはならない事態は避けたいです。 従来のpadasと同じインタフェースを持つpysparkのpandasを使えば、 プログラムの本体にはほとんど何も手を加えずに分散処理環境に載せ替えることが可能になりそうです。

ただし出力結果において多少の違いが出てくる部分もあります。 分散処理の結果ファイルは通常複数ファイルに分割されますので、 後続の処理が単一ファイルだけを待ち受ける形になっている場合はその修正も必要となります。1 この辺の対処はどうしても必要になってしまいますが、 分散環境でなくては捌けない量のデータを相手にする以上はそういったスケールしない部分を除いていく作業は必須ですので真面目に対処していくしかないですね。

以上、どなたかの参考になれば幸いです。

参考情報


  1. この場合、複数ファイルを結合して1ファイルにするということをやりたくなってしまいますが、それは思ったよりも簡単ではなく、またその後の拡張性も悪くなってしまうことがほとんどなので、一般論として、後続側を複数ファイル対応させることが望ましいです。