[PySpark]*.csvファイルを再帰的にヘッダーを除去しつつ読み込む

2018.10.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Sparkはファイルシステムから直接ファイルを読むことができますが、ログとして出力されたファイルなどはいろいろなディレクトリに散らばっていることが多いです。 今回はPySparkでそれらのファイルを親ディレクトリから再帰的に走査して、目的のファイルだけをRDDに読み込んでみます。 ついでに、それらのファイルには先頭に1行ヘッダがあるという前提で、その除去も行います。

ファイルを読み込むメソッド

RDDにテキストファイルを読み込むメソッドは2つあります。

  • textFile
  • wholeTextFiles

textFileは単独ファイル、wholeTextFilesは複数ファイル、と思ってしまいがちですが、実際のところはそういう使い分けをすることはないです。 これらのメソッドの本質的な違いは、ファイルを読み込んだ後のRDDの形式が異なることです。

下のような2つのファイルがあるとします。

$ head aaa xxx
==> aaa <==
aaa
bbb
ccc

==> xxx <==
xxx
yyy
zzz
print sc.textFile("aaa,xxx").collect()
==> [u'aaa', u'bbb', u'ccc', u'xxx', u'yyy', u'zzz']

textFileはファイルを読み込み、改行で区切った各要素を並べたRDDが作成されます。

print sc.wholeTextFile("aaa,xxx").collect()
[(u'file:/Users/hirano.shigetoshi/Spark/spark-2.3.2-bin-hadoop2.7/aaa', u'aaa\nbbb\nccc\n'),
 (u'file:/Users/hirano.shigetoshi/Spark/spark-2.3.2-bin-hadoop2.7/xxx', u'xxx\nyyy\nzzz\n')]

wholeTextFilesはファイルを読み込み、[<ファイルパス>, <ファイル内容全文>]というtupleの集合としてRDDが作成されます。

指定できるパスについて

上記の例で、ファイルパスとしてaaa,xxxと指定していますが、textFilewholeTextFilesも、複数のパスをカンマ区切りで指定できます。 また、指定したパスがディレクトリであれば、その中の全てのファイルを読み込みますし、ある程度ワイルドカードを使うこともできます。1

このような仕様であるため、名前の印象とは裏腹に、この2つのメソッドを「どのファイルを読み込むか」という視点で使い分けも特に意味はないです。 どういう形式のRDDを作成するか、という視点で使い分けるのが良いようです。

wholeTextFilesを使って読み込み

では本題の読み込みを行いたいと思います。 2つの読み込みメソッドを紹介しましたが、今回使うのはwholeTextFilesの方です。 使い方と合わせてその理由もご紹介します。

ファイル構成

以下のようにファイルが並んでいるとします。 今回はこの中のcsvだけが読み込み対象であるとします。

.
├── aaa
│   ├── 000.txt
│   ├── 000.csv
│   ├── 001.txt
│   └── 001.csv
├── bbb
│   └── ccc
│       ├── 003.txt
│       └── 003.csv
・・・

PySparkのコード

まずヘッダを除去する関数を定義しておきます。

def cut_header(text):
        # 最初の1行を除外する
    pos = text.find("\n")
    if pos < 0:
        return ""
    else
      return text[pos+1]`

次にwholeTextFilesで読み込み、ヘッダを除去して、改行区切りデータのRDDを作成します。

rdd = sc.wholeTextFiles("*/*.csv,*/*/*.csv")\
  .map(lambda x: x[1])\
  .map(lambda x: cut_header(x))\
  .flatMap(lambda x: x.split("\n"))

csvファイルだけを指定するために、*/*.csv,*/*/*.csvと指定しています。カッコ悪いですね。
しかし残念ながらこれよりも良い書き方は見つかりませんでした。 zshなどで使える **/*.csv という書き方はできないようです。
データがある程度動的に吐き出されるものだとしても、ディレクトリ階層の深さが全く読めないという事態は滅多にないと思うので、現実的には、ある程度の深さまでのパスを書いておけば良いかと思います。 もっとスマートに書けるよ、という方は是非教えていただければと思います。

以下、ヘッダを除去してフラットなRDDを得る手順です。

  • map(lambda x: x[1])
    • 今回はファイル名は不要なので、ファイルの中身(tupleの[1])だけにしています。
  • map(lambda x: cur_header(x))
    • xには1ファイルの中身全てが(改行コード込みで)格納されていますので、各々をcut_headerで処理しています。
  • flatMap(lambda x: x.split("\n"))`
    • flatMapは、各処理から配列を受け取って、その要素を全て一つのRDD(配列)に入れる、という処理です。
    • flatMapによってwholeTextFilesで読んだ後にtextFilesで読む形式に変換しています。

wholeTextFilesで読み込んだことにより、ファイルという単位のRDDレコードが作成されたので、その中で先頭行の除去を行っています。
これはtextFileだとうまくいきません。 textFileをやった時点ですでにflatMap相当の処理が起きているので、rddには元のファイルの区切りという情報は残っていません。 ですので、「ファイルの何行目」などの操作が必要な場合にはwholeTextFilesを使用します。

textFileとwholeTextFilesの使い分け

繰り返しになりますが、両者の使い分けはファイルの数などではなく、どうファイルを読み込むかです。 wholeTextFilesはファイル名の情報も使えるし、ファイルという単位での整形ができますから、データの整形をしつつ読み込む場合にはこちらを使うのが良いです。

ただし、wholeTextFilesはファイルの中身を1レコードとして全て読み込みますので、あまり大きなファイルを読むことは避けたいです。 その場合はtextFileで読み込んで、ファイル単位出なくても整形できないかを考えた方が良さそうです。 元データを加工できるのであれば、ヘッダには必ず先頭に"#"をつけるなどの処置をするのがベストです。 そうすればflatになったデータでも先頭だけをみてヘッダを除去できます。

Sparkに向かない処理はNG

もちろん私の失敗談ですが、ファイルの指定の仕方にクセがあるので、それに触りたくないと思って最初は以下のようなコードを書いてしまいがちです。

rdd = sc.parallelize([])
for x in glob.glob("."):
  tmp_rdd = sc.textFile(x)
  rdd = rdd.union(tmp_rdd)

ファイルごとにRDDを作成して結合していく、という配列ではよくあるやり方です。 しかしこれはSpark的には全くパフォーマンスが出ないやり方なので、やってはいけません。 というか、ファイルの量が多くなるとStackOverflowでエラーになってしまうようでした。

再帰的に読み込む別解

2018-10-31追記 wholeTextFiles("*")としても、再帰的に全てのファイルを読んでいる訳ではないようです。

wholeTextFiles("*") とすると、"カレントディレクトリにあるディレクトリの直下のファイル"が全て読み込まれるようです。

`*.csv,*/*.csv,・・・` と書くのはやっぱり嫌なので、一応別解もあります。 `wholeTextFiles`に`*`を渡すと、再帰的にすべてのファイルが読み込まれるようです。 そしてファイル名でフィルターしてcsvにしています。 処理の仕方も"Spark的"ですし、一見完璧に見えるのですが、これだと全てのファイルを一度メモリ上に読み込む必要がありますので、目的のファイル以外にどんなファイルがあるのか確認してからやった方が良いでしょう。 大きいバイナリファイルなんかも問答無用で読み込んでしまいます。 逆に、ある程度整備されていて小さいファイルだけであれば、この方法の方がスマートであるのは確かです。 状況に応じて使い分けましょう。

まとめ

PySparkで散らばったログファイルなどを読み込む際のノウハウについてご紹介しました。 ファイルパスの指定の仕方が厄介なので、そこさえクリアできれば特に難しいことはないのですが、いざやってみるとハマります。 素直でない読み込み方だと非常に遅かったりエラーになってしまうので試行錯誤してみました。

誰かの参考になれば嬉しいです。


  1. ソースコード的にはHadoopのFileInputFormatsetInputPathsに依存しているようです。