「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました #dbts2018 #dbasSPR

2018.06.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

2018年6月23日、札幌で開催されている「db analytics showcase Sapporo 2018」の玉川竜司さんのセッション「Parquet あるいは - まだCSVで消耗しているの? -」へ参加してきましたのでそのレポートとなります。

玉川さんのセッションは、単なるParquetの技術解説で終わらず、Apache ArrowによってHadoopエコシステムとPythonエコシステムの連携や、分散処理ではなくむしろ分散せずにローカルでうまくやるというパラダイムシフトを予感させる内容です。

スピーカー

あの玉川竜司さんです。「あの」って、、、そうです、下記の技術翻訳書をはじめ、他にも多数発刊している方です。みなさんも語りだすと止まらなくなるような印象深い本ばかりです。

最近のおすすめは、ヘルシープログラマという本で、プログラマが直面する様々な健康問題を回避し克服するアイデアとテクニックが真面目に紹介されている本です。昨年、SRE本が日本でもかなり話題になり、従来の運用チームと呼んでいた組織をSREチームと改め、これまでのやり方を見直すきっかけを提供してくれました。

その他、玉川さんの個人的なブログ(tamagawa-ryuji - Qiita)の中で、Python のデータシステムの中で重要な位置を占める Pandasというデータ解析を支援する機能を提供するライブラリの開発の中心人物である、Wes McKinneyさんの主要なブログの日本語翻訳もPyDataコミュニティに提供しています。PyDataやApache Arrowが今後どうなっていくのか把握するためにも役立つはずです。

そんな、現役エンジニアである玉川さんが、あえて「Parquet」一本勝負でお話してくれました。

お品書き

  • Parquet?なにそれ?
  • 閑話休題:分散処理必要ですか?
  • Pythonから利用するParquet
  • 性能計測してみました
  • Apache Arrowについて少し説明

Parquet?なにそれ?

Parquetの良さを理解してもらうことが目標

Parquetは重要かつお得なので、よく話しているのだけれど海外はともかく日本ではいっこうに普及している印象を受けない。ということもあり、Parquetの良さを理解してもらうことが目標です。

データの話をするときに、「データサイエンティスト」というと言葉が使われますが、厳密な定義はなく、現実に即して言うと以下の2つに分類されます。

  • データサイエンス:数学バリバリで、データそのものを統計的な手法によって知見を導き出す人達
  • データエンジニアリング:ストレージとか、メモリとか、CPUとか、コードを書いてデータを作成する、データサイエンスの下支えするインフラ系の人達

当然、データサイエンスとデータエンジニアリングは重なり合う部分もあります。

私(玉川さん)はエンジニアリングの人であり、「データエンジニアリング」とは余り聞かないので、もっと広がればいいな思っています。今日お話するのは、「データエンジニアリング」の方です。なので、データをいかにに早く、軽く動かすかという話をします。

そもそも何? - CERNの事例

Hadoopで有名なClouderaのShimauchiさん(@shiumachi)が最近つぶやいていた事例です。

CERNというヨーロッパの素粒子の研究で、無圧縮の1.2TBのCSVファイルをParquetに変換すると288GB、約1/4になり、分散SQLエンジンのImpalaといういわゆるHDFS上のデータにSQLを実行したら、16台のマシンでスキャンできたという事例。Impalaの性能の部分によるものも当然ありますが、この規模のデータを2分でスキャンするにはCSVでは厳しいはず。Parquetフォーマットがデータ基盤にないと達成できない。

CSVをデータの基盤に用いていることが多く、汎用性はありますが効率は良くないので、それをParquetに変換するだけで、(ワークロードの特性によりますが、)読み書きについては効率が上がると思います。

Parquet: 列指向データフォーマット

Parquetのフォーマットは、列指向のデータフォーマットです。

  • Twitterが開発して、その後 Apache Foundation に寄贈したデータフォーマット
  • スキーマ付きの列指向フォーマット
    • RDBのテーブルのように列名とデータ型を含むバイナリフォーマット
  • SparkのDataFrameの保存にピッタリ、SparkのDataFrame APIではほぼ標準フォーマット扱い
  • PandasのDataFrameの保存にピッタリ
    • Pandas0.22以降ではDataFrameをto_parquetすると直接Parquetファイルとして保存されます
  • Python(fastparquetやpyarrow)からも直接読み書きできるようになってきた
  • 分析的なタスクのデータソースとして便利、かつ効率的
    • 逆にトランザクション処理や細かい更新が必要な処理には向かない
    • データセットがあり、保存しておいて後でまとめて分析する用途が向いている

Parquetの特徴

  • スキーマ付きテーブルデータの保存
  • 列指向フォーマット(Apache Parquet)
    • 高圧縮率(経験的にはzipでCSVを固めたた場合と同程度)
    • 指定したフィールドだけを読み取ることによるI/O削減
    • 更新には向かない
    • (基本的には)ランダムアクセスにも向かない

普通のCSVファイルは行ごとにデータを格納しますが、列指向フォーマットは列ごとにデータを格納します。Parquetでは1万行程度のROWBLOCK に分割して、更にカラム毎に順にまとめて格納します。下の図の色の違いはカラムの違いを表します。詳しいフォーマットはApache Parquetのドキュメントを参照してください。

列ごとにデータを格納することで、参照したいカラムのみをスキャンしたり、圧縮率を高めることが可能になります。例えば、gzip圧縮したCSVファイルを読み込むにはファイル全体を展開する必要がありますが、Parquetの場合は必要なブロックごとに展開できます。

Parquet用のPythonのライブラリ

  • 主に2つ、fastparquetpyarrow
    • fastparquet: PyDataの分散処理フレームワークのDaskプロジェクトで開発
    • pyarrow: pandasの開発者、Wes McKinneyさんたちが開発
  • どちらも活発に開発が進んでおり機能がリッチになっているが、Parquetのすべての機能をサポートしているわけではない
  • Hadoopエコシステムとの親和性(分散ファイルシステムに置かれたデータセットの読み取り)ではPyArrowのほうが優れているかも
  • pandasではこの両者をエンジンとして利用可能(現時点ではデフォルトはPyArrow)
  • 現時点では共にAnaconda Pythonには含まれていないので、pipでインストールする

閑話休題:分散処理必要ですか?

なぜ分散しなければならなかったのか?

日本で分散処理が話題になったのが2005年くらい、Hadoopの像本(Hadoop 第3版の初版のこと)が出たのが2009年でしたが、その頃は分散しないと処理しきれなかった規模のデータが現在ではある程度処理できるようになっています。(もちろんデータのスケールによります)皆さんが日常的に扱う規模のデータを分散処理しなくても、工夫すれば処理できるのではないかと考えています。例えば、一日20〜30GB程度のデータであれば分散しなくてもいけるのではないかというのが最近の実感です。

元々、Hadoopが出たときに分散しなければならなかった一番の理由はハードディスクが遅いからでした。ハードディスクのシークだけは物理的に動いているので技術が進歩してもランダムアクセスが改善しない傾向がありました。その解決策として、ストレージを並べる、つまり分散処理するという考えが生まれ、Hadoopで分散処理するという方法が広がりました。

ハードウェアの変化とその影響

2009年からこの10年間で、以下の技術革新により、大々的に分散処理するまでもないケースが増加しています

  • メモリの低価格化
    • キャッシュ増加によって一気に読んでランダムアクセスできるようになった
  • SSDの普及と大容量化
    • ランダムアクセスの遅さが克服され、ディスクアクセスのボトルネックが緩和された
  • CPUのコア数の増大
    • 1ソケットあたりのコア数が増加することにより、10年前に8台必要な処理が1台でできるようになった
  • ネットワークの高速化

要は、工夫をちゃんとすれば分散しなくて済むのではないかというケースが増えています。Hadoopがこれだけ普及して、ブレイクスルーだったのは何かというと、分散ファイルシステムを用意して大量のハードディスクに負荷を分けることによってスピードを稼げることでしたが、現在は高速なエンタープライズSSDを使えば同じようなパフォーマンスが出るのでわざわざ分散する必要がないというケースが増えています。

その一翼を担っているのが、Parquetのようなデータフォーマットです。Parquetは圧縮が効くフォーマットですが、圧縮が効くということはストレージの負担を避けて、その分をCPUに負担を回しているという側面があります。ハードウェアの進化の影響は大きく、CPUのコア数が増えたのでストレージの負担をCPUに回すというバランスが取れるようになりました。

データストレージの重要な動きとして、クラウドサービスが利用できるようになった点です。従来のHadoopは生のハードディスクを利用する前提で設計されていたものが、今ではクラウドで運用されることが多く、直接ローカルのハードディスクを使うというのではなく、一時データではなく、本当のデータセットを置くところはS3のようなクラウドのオブジェクトストレージを使って、データを読み書きするという使い方をすること事が増えています。要はストレージのスピードを稼ぐために分散するという必要性がどんどん少なくなっています。

従来であればログを単純に処理するワークロードが中心でしたが、最近では機械学習やディープラーニングのようなワークロードが増えて、CPUがボトルネックになるケースが徐々に増加しています。なので、いかに効率よくCPUを使うかということも重要になるという傾向があります。

やらなくて済むならやらないほうがいいです

  • 分散処理を「ちゃんと」使えば、確かにパフォーマンスが上がることは多いです
    • ただし、落とし穴はたくさんあります
    • 分散させるがゆえに考えなければならないことがある
    • うっかりすると、かえって遅くなってしまったということもあり得る

この辺の勉強をしたいひとは、この本(Designing Data-Intensive Applications)を読むと良いです。(玉川さんがに翻訳本を出してくれる予定です。)

そんなにがんばって分散しなくて良いケースでは、分散しなくてよいのではないかと特に思っており、そのための道具として、Parquetのようなものを上手に利用していただければと思います。

Pythonから利用するParquet

PythonからParquet利用するコードの紹介します。SparkでもPandasでもDataFrameをファイル名を指定して読み書きします。

Spark(PySpark)の場合

Spark は分散処理するので、coalesce(20)を指定することで、20に分散するという指定をあえて指定しています。20のタスクで分散処理して、それぞれのタスクが分散ファイルシステムに並列に書き出しをするので、指定したフォルダに20のファイルが出力されます。

df = spark.read.csv(csvFilename, header=True, schema = theSchema).coalesce(20)
df.write.save(filename)

pandasの場合

pandasはシングルスレッドなので、1つのファイルに出力されます。

from fastparquet import write
pdf = pd.read_csv(csvFilename)
write(filename, pdf, compression='UNCOMPRESSED')

pyarrowの場合

(上の「pandasの場合」で読み込んだ、pdfオブジェクトをfrom_pandas()に渡している)

import pyarrow as pa
import pyarrow.parquet as pq
arrow_table = pa.Table.from_pandas(pdf) 
pq.write_table(arrow_table, filename, compression = 'GZIP')

性能計測してみました

どんなデータを使って計測したか

以下のようにJupiterNotebookを使ってサンプルコードを作成しています。

データのバリエージョンは、

  • カーディナリティの低い数値
  • カーディナリティの低い文字列
  • ランダムな数字
  • ランダムな文字列
  • ソートされた数字
  • ソートされた文字列

実際に動かし読み書きにかかった時間を集計して、読み取りのパフォーマンスを計測しています。

圧縮について計測

  • Parquetでは透過的に様々な圧縮アルゴリズムが利用可能
    • 現時点の0.9では snappy, gzip, brotli にて検証しました
    • 今後、0.10では lz4, zstd をサポートしそう
  • データの量やデータの性質によって、圧縮率と読み書きの速度はどう変わるか?(対CSVで計測)
  • 計測環境はMacBook 2017(メモリ16GB、SSD)、Python3.5
  • スワップが生じない範囲で計測
  • 非圧縮で数百GB、1千万行

データ作成時間の比較

表は、データの種類、圧縮のパターン、データ処理時間、parquetのサイズ、CSVと圧縮率の比較です。

snappyはあまり圧縮率は高くありませんが圧縮に要している時間が圧倒的に速い傾向がありますので、一時データのように一度しか読み書きし無いデータには適していると言えます。

brotliとgzipはあまり差がなく全体的にgzipのほうが良い結果なので、brotliをあえて選択する必要がないかもしれません。

読み取り時間の比較

データの種類、圧縮のパターン、データ読み取り時間、CSVと読み取り時間の比較です。

CSVの読み取り時間に比べて、半分から1/4に短縮できています。注目に値するのが数値の読み取りが1/4以下なっている点です。

CSVはパースする必要があります。CSVはパースでは数値を読み、数値を認識して、INTに変換するという手間がかかると考えられます。大きなデータの場合、CSVのパースはCPUにものすごく負担をかけます。一方、Parquetにするとものすごく軽くなります。文字列の場合はCPUの負荷に大きな差がありませんが、データが圧縮されるので文字データに対してもPaquetに変換する価値はあります。

読んだ列数と時間の比較

100列のデータを作成して、1列から順に100列まで取得した場合の読み取り時間を計測しました。

圧縮はデフォルトのsnappyです。列数が少ない場合は、読み取り時間が少なくて済む傾向が見られます。検証の結果、100列のデータのうち10列だけで分析できる場合はパフォーマンスが10倍になることがわかります。

圧縮について(気づいたところ)

検証の結果をまとめますと、

  • 圧縮の傾向
    • snappyは高速・低圧縮
    • gzipは低速・高圧縮
    • brotliは余り使う意味がなさそう?
  • 圧縮の用途
    • 一般的なデータセットであれば、速度重視でsnappyが有効
    • 扱うデータセットが繰り返し分析に使われるのなら、書き込み時間に多少時間を使ってもgzipでストレージを節約できる(読み取り速度はgzipもsnappyも余り変わらない)
  • その他の傾向
    • 当然ながら、データが多く、カーディナリティが小さいデータは圧縮が効きやすい
    • 数値データに対しては、特に読み取り時のCPU負荷の面で極めて圧縮の有効性が高い
    • CSVではパースが入るのでCPU負荷が多分大きいはず
  • zstdは期待されている圧縮フォーマットなのでサポートされたら再度計測したい

列の指定(述語プッシュダウン)

  • Parquetでは、読み取り時に列の指定をすることでI/Oを削減できます
  • 上位のツール(SQL on XXXの類)での列指定が、I/O層まで降りていって反映されます
  • 100列のデータで、読み取る列数を変えて速度計測してみました
  • Amazon AthenaもS3上のファイルがParquetになっていれば、列の指定でI/Oの削減可能。このサービスはI/Oの量で課金が生じるので圧縮と合わせるとコストが1/10になったりするのでは

Apache Arrowについて少し説明

Parquetはファイルフォーマットでしたが、Apache Arrowはインメモリ列指向データフォーマットです。

Apache Arrow

  • Apache Arrowは、インメモリ列指向データフォーマット
  • PyData / ビックデータ会の強力なOSS製品のスター開発者たちが集結
  • 無駄なデータコピー/変換をすることなくツール感でデータをやり取り

データ分析の世界は、PyDataのエコシステムの世界とHadoop(というかApache Fundation)エコシステムの世界の2つのエコシステムがありますが、Apache Arrowは、両方の世界のメジャー製品のスター開発者たちが集結しているという興味深い状況です。

従来は、下の図のようにいろんなツールやサーバが間でデータ交換しようとするとそれぞれとやり取りをしなければならず、決して効率が良いとはいえませんでした。例えば、SparkとPandasの間でデータ交換しようとするとむっちゃくちゃ重たい、という問題がありました。遅い理由はデータ変化の効率が悪いことが原因ですが、そのような問題がたくさんありました。

この解決策としてArrowのメモリ上のデータを介してデータ交換することで様々なツール間で効率よくデータのやり取りをしようというのがArrowの目的です。

「ゼロデータコピー」という言葉がよく言われており、サーバーの共有メモリの空間にArrowのフォーマットのデータを置いて、そこからの読み取りはローカルのメモリ空間にデータをコピーしなくても各ツールで読めるようにすることで効率良くするという方式です。最近のデータ処理では、CPUの負担が大きいので、メモリのデータのコピーのようにCPUの負担が大きい処理をなくすというのが発想としてあります。

インメモリのデータ変換の問題

  • 例えば、pandasとSpark間のデータのやり取りしたいというユースケースが考えられます
    • pandasでややこしいCSVを読んでDataframeをSparkに渡したい
    • Sparkで大規模なデータをサンプリングしてからDataFrameをpandasに渡したい
  • Spark・Pandasの場合、シリアライズ・デシリアライズの負荷が大きかった(1MB/s程度、むっちゃ遅い)
  • 特にPandas → Sparkは絶望的に遅かったが、Spark2.2からArrow対応したことで大きく改善した
  • Spark2.3ではPySparkのUDFにpandasを使えるようになって大きく改善した
    • PySparkのUDFを使うとものすごく遅くなっていたが、Arrowが使えるようになって改善している
  • これから数年間、インメモリでデータのやり取りをするようになり、さらに速くなると思います

ますます、分散しないで、オンメモリでデータを処理することが増えて、この1年ぐらいでガラッと変わるのではないかと予想されます。分散処理はオーバーヘッドも大きいので、2台にするとかえって遅くなるということは十分考えられます。それよりはむしろうまくローカルで処理するということの有用性が高まるのではないかと考えられます。

Wesさんのblog

  • 現在はPandasの改善、Apache Arrowとその周辺の開発
  • Blogでちょくちょく受講が公開されている
  • 玉川さんが日本のPyDataコミュニティ向けにBlog翻訳するOKをもらって活動している。

その中でも、おすすめブログは、(翻訳)Apache Arrowと「pandasの10項目の課題」

最後に

データ分析の世界は、PyDataのエコシステムの世界とHadoop(というかApache Fundation)エコシステムの世界の2つのクラスタがより密接に連携できるようになりつつあります。Parquetファイル形式のデータは2つのエコシステムが相互に利用できるイミュータブルな永続化ストレージとして機能し、なおかつ、S3をはじめとするクラウド上のオブジェクトストレージに格納することで、データは分散処理のストレージとしても機能します。さらに Apache Arrow によって、2つのエコシステムがインメモリでこれまで以上に密接にデータのやり取りをするようになります。ParquetとApache Arrowによる連携が完成することで、分散しないで、オンメモリでデータを処理するパラダイムシフトを予感させられる大変興味深い内容でした。Designing Data-Intensive Applicationsの日本語版が出版されるのを楽しみにしています。