Sparkで入力ファイル名を取得する

こんにちは、小澤です。

Hadoopを使うとデータの入出力はディレクトリ単位になります。 また、個々のファイル名は通常、取得することができません。 日常においてこれで困ることはあまりないのですが、Hiveなどではパーティションがディレクトリ単位になっていたりします。 今回はデータをその形式に基づいて配置していたがテーブルにパーティションを追加する前に前処理が必要なったなどで、メタストアから情報は取得できない複数のディレクトリにまとめて処理がしたくなった時などに便利なSparkの機能を紹介します。

なお、今回はPySparkを利用しています。

wholeTextFiles

SparkContextにはwholeTextFilesというメソッドがあります。 これは、ファイル名とファイル内のデータのtupleを返します。

例えば、S3のに以下の2つのファイルがあるとします。

  • s3://bucket-name/data/data1.csv
id,name,value
1,ほげ,10
2,ふが,20

* s3://bucket-name/data/data2.csv

id,name,value
3,ぴよ,30

このデータをwholeTextFilesで読み込むと以下のような2件のデータになります

data = sc.wholeTextFiles('s3a://bucket-name/data/')
data.collect()

[
  ['s3://bucket-name/data/data1.csv', 'id,name,value\n1,ほげ,10\n2,ふが,20'],
  ['s3://bucket-name/data/data2.csv', 'id,name,value\n3,ふが,30']
]

少々見づらいですが、ファイルの中身がそのまま値として入っています。 あとは、データを分割して整形していけば必要な形式でデータが取得できます。 一例としては以下のようになります

data = data.map(lambda x : (x[0], x[1].split('\n')[1:]) \  # データを区切り文字で分割してヘッダ行以外を配列として取得
    .toDF(['file_name', 'values'])  # DataFrame化
    .withColumn(explode('values'))  # 配列を縦持ちに変換することで各行のデータに対応

# CSVの区切り文字で分割した各列の値をDataFrameに追加していく
columns = split(data['values'], ',')
for i, column in enamurate(['id', 'name', 'value']):
    data = data.withColumn(column, columns.getItem(i)
# 元のデータはいらないので削除
data = data.drop('values')

input_file_name

さて、wholeTextFilesを使うことで目的を達成することができました。 めでたし、めでたし。

とはいかない場面も多々あります。

  • そもそもデータがテキストファイルじゃなくてParquetやORCとかを利用してんだけど...
  • というかJSONなんだけど、全部自分でパースすんの?
  • うちの子文字コードがShift-JISだったからそれを変換したいんだよね!
  • 1ファイル10GBくらいあるんだけどそれをRDDの1レコードに入れるの?
  • というかDataFrameに変換するんなら最初からDataFrameで読みたいよね

さて、そんな時に便利なのがinput_file_name関数です。 というかこちらの方が手軽でエレガントかつセクシーに解決できます。

利用するデータは先ほどと同じもので、文字コードがShift-JISで保存されているものとします。 コードを見ていただけると非常に簡単なことがわかります。

data = spark.read.csv('s3a://bucket-name/data/', header=True, encoding='shift-jis')
  .withColumn('file_name', input_file_name())
data.show()

+-----+-------+-------+-------------------------------+
|   id|   name|  value|                      file_name|
+-----+-------+-------+-------------------------------+
|    1|    ほげ|     10|s3://bucket-name/data/data1.csv|
|    2|    ふが|     20|s3://bucket-name/data/data1.csv|
|    3|    ぴよ|     30|s3://bucket-name/data/data3.csv|
+-----+-------+-------+-------------------------------+

すっきりと綺麗に書くことができました。

おわりに

今回はSparkで入力ファイルのファイル名を取得する方法を解説しました。

Sparkは最近登場したAWS Glueなどでも利用されており、ETL処理においても今後利用シーンが一層増えていくのではないかと思います。 そういった前処理やETLまで含めた処理を行う際の参考にしていただければと思います。