Apache Arrow(PyArrow)を使って簡単かつ高速にParquetファイルに変換する

2019.06.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

インメモリの列指向データフォーマットを持つApache Arrow(pyarrow)を用いて簡単かつ高速にParquetに変換できることを「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました のレポートで以前ご紹介しました。今回は最新のpyarrow バージョン0.13.0にてCSVファイルをParquetファイルに変換する方法と、Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型がどこまでサポートしているかも検証します。

「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました #dbts2018 #dbasSPR

Parquetファイルに変換する方法

一般にCSVファイルをParquetに変換するは、Apache Spark(AWS GlueやEMR)を用いることが一般的ですが、過去のブログでApache DrillでParquetに変換したり、Amazon AthenaのCTASでParquetに変換する方法についても紹介してきました。

Parquet用のPythonのライブラリとして以下の3つが有名です。

  • fastparquet: PyDataの分散処理フレームワークのDaskプロジェクトで開発
  • pyarrow: pandasの開発者、Wes McKinneyさんたちが開発
  • Pandas(0.22以降):DataFrameをto_parquetすると直接Parquetファイルとして保存

今回は、Hadoopエコシステムとの親和性(分散ファイルシステムに置かれたデータセットの読み取り)が優れており、進化のスピードが早い、Wes McKinneyさんを応援したいという気持ち「大」のため、PyArrowをご紹介します。

Apache Arrow(PyArrow)とは

Apache Arrowは、インメモリの列指向データフォーマットです。Hadoop/Spark(Hadoopのエコシステム)やPandas(PyDataのエコシステム)をはじめとするツール間でゼロコピーデータアクセスを目指し、Apache Arrowがその共通データフォーマットになるように設計されています。つまり、Apache Arrowに読み込むと他のツールとデータが共有できたり、様々なデータファイルフォーマットに出力できるようになります。

詳しくは、(翻訳)Apache Arrowと「pandasの10項目の課題」をお読みください。

PyArrowによるParquetファイルに変換

Parquetファイルに変換する方法は、「方法1:PyArrowから直接CSVファイルを読み込んでParquet出力」と「方法2:PandasでCSVファイルを読み込んでPyArrowでParquet出力」の2つあります。それぞれに対して、サポートしているデータ型をそれぞれ検証します。

サポートしているデータ型

Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型がどこまでサポートしているか、以下のデータ型について検証します。それぞれのParquetに変換したときのデータ型の対応は以下のとおりです。Apache Arrowでサポートしているデータ型のすべてが、PyArrowでサポートしているわけではないことにご注意ください。

※ Apache Arrowがサポートしている全てのデータ型は、Data Types and Schemasをご覧ください。

Python3 Pandas PyArrow Amazon Athena Amazon Redshift Spectrum 対応状況
int int64 int16 smallint smallint
int int64 int32 int integer
int int64 int64 bigint bigint
float float64 float32 float real
float float64 float64 double double precision
decimal - - decimal decimal
bool bool bool_ boolean boolean
str object string string varchar(65535)
str object string char(n) char(n)
str object string varchar(n) varchar(n)
date - - date date
datetime datetime64 timestamp timestamp timestamp

今回は、この組み合わせを試すために以下のカラムのデータ(sales.tsv)を用意しました。

No カラム名 PyArroのデータ型の定義 実際のデータ 備考
1 id int16() 1
2 price int32() 20000
3 total int64() 300000000
4 price_profit float32() 4.56
5 total_profit float64() 67.89
6 discount decimal(10, 2) 789012.34 方法1はfloat64で作成しました
7 visible bool_() True
8 name string() QuietComfort 35
9 created timestamp('s') 2019-06-14 方法1と方法2ともにdate32をサポートしていないので、timestampで作成しました
10 updated timestamp('s') 2019-06-14 23:59:59
id  price   total   price_profit    total_profit    discount    visible name    created updated
1   20000   300000000   4.56    67.89   789012.34   True    QuietComfort 35 2019-06-14  2019-06-14 23:59:59

方法1:PyArrowから直接CSVファイルを読み込んでParquet出力

まずは最もシンプルなPyArrowで変換する方法をご紹介します。入力ファイルのパス、出力ファイルのパス、カラムのデータ型定義の3つを指定するのみです。

処理の流れ

PyArrowの入力ファイル名をカラムのデータ型定義に基づいて読み込みread_csv()pyarrow.Tableを作成します。作成したpyarrow.Tableから出力ファイルに出力write_table()します。

制限事項

PyArrowのread_csv()では、decimalとdate32をサポートしていません。そのため、データ型はdecimalはfloat64、date32はtimestampで作成しました。PyArrowの将来的なアップデートで、decimalやdate32のサポートを期待したいところです。

decimalとdate32を指定すると、それぞれ以下のエラーが発生します。

Traceback (most recent call last):
  File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 62, in <module>
    column_types
  File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 31, in get_pyarrow_table
    convert_options=convertoptions
  File "pyarrow/_csv.pyx", line 397, in pyarrow._csv.read_csv
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: CSV conversion to date32[day] is not supported
Traceback (most recent call last):
  File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 62, in <module>
    column_types
  File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 31, in get_pyarrow_table
    convert_options=convertoptions
  File "pyarrow/_csv.pyx", line 397, in pyarrow._csv.read_csv
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: CSV conversion to decimal(10, 2) is not supported

サンプルスクリプト

  • PyArrowのread_csv()では、入力ファイルの他に読み込みや変換、パースに関するオプション指定が可能です。(下記の10〜25行)
  • カラムの定義は、カラム名とデータ型をPythonのディクショナリで定義します。(下記の58〜69行)
  • PyArrowのwrite_table()では、出力ファイルの他に圧縮タイプやApache SparkのParquetフォーマット互換を指定します。(下記の42〜47行)
from pyarrow.csv import read_csv, ReadOptions, ParseOptions, ConvertOptions
from pyarrow import int16, int32, int64, float64, float32, bool_, date32, decimal128, timestamp, string, Table, parquet as pq
from boto3.session import Session


#
# Functions
#
def get_pyarrow_table(input_files: str, column_types: dict) -> Table:
    readoptions = ReadOptions(
        use_threads=True,  # 複数の読み取りスレッドの利用
        block_size=1000  # 読み取りブロック数
    )
    #
    convertoptions = ConvertOptions(
        check_utf8=True,  # 文字列カラムのUTF-8妥当性をチェック
        column_types=column_types,  # 列のデータ型を辞書型で渡す
        null_values=['']  # データ内のNULLを表す文字列
    )
    parseoptions = ParseOptions(
        delimiter='\t',  # タブ区切り指定
        double_quote=False,  # ダブルクオートで括らない
        escape_char='\'',  # エスケープ文字の指定
        header_rows=1  # 先頭がヘッダ行
    )
    pyarrow_table = read_csv(
        input_file=input_files,
        read_options=readoptions,
        parse_options=parseoptions,
        convert_options=convertoptions
    )
    return pyarrow_table


def tsv2parquet(input_file: str, output_file: str, column_types: dict):

    arrow_table = get_pyarrow_table(
        input_file,
        column_types
    )

    pq.write_table(
        arrow_table,
        output_file,
        compression='snappy',  # snappyで圧縮
        flavor=['spark'],  # spark互換の設定
    )


#
# main
#
def main():
    pyarrow_home = '/Users/cmuser/Desktop'
    tablename = 'sales'
    input_files = pyarrow_home + '/' + tablename + '.tsv'
    output_file = pyarrow_home + '/' + tablename + '.parquet'
    column_types = {
        'id': int16(),
        'price': int32(),
        'total': int64(),
        'price_profit': float32(),
        'total_profit': float64(),
        'discount': float64(),
        'visible': bool_(),
        'name': string(),
        'created': timestamp('s'),
        'updated': timestamp('s')
    }

    tsv2parquet(input_files, output_file, column_types)

    # Parquet -> S3
    s3bucket = 'cm-user'
    s3key = 'tmp/' + tablename + '/' + tablename + '.snappy.parquet'
    session = Session(profile_name='cm-user')
    s3 = session.resource('s3', region_name='ap-northeast-1')
    bucket = s3.Bucket(s3bucket)
    bucket.upload_file(output_file, s3key)


if __name__ == "__main__":
    main()

テーブル定義

Glueのクローラで自動生成したDDLは以下のとおりです。createdカラムは日付のみですがdate型ではなくtimestamp型として定義されました。

CREATE EXTERNAL TABLE `sales`(
  `id` smallint, 
  `price` int, 
  `total` bigint, 
  `price_profit` float, 
  `total_profit` double, 
  `discount` double, 
  `visible` boolean, 
  `name` string, 
  `created` timestamp, 
  `updated` timestamp)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://cm-user/tmp/sales/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='sales', 
  'averageRecordSize'='673', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='1', 
  'recordCount'='1', 
  'sizeKey'='2214', 
  'typeOfData'='file')

方法2:PandasでCSVファイルを読み込んでPyArrowでParquet出力

PandasでCSVファイルを読み込んでPyArrowでParquet出力する方法をご紹介します。入力ファイルのパス、出力ファイルのパス、カラムのデータ型や変換の定義などを指定する。

処理の流れ

Pandasの入力ファイル名をカラムのデータ型定義に基づいて読み込みread_csv()pyarrow.Tableを作成します。作成したpyarrow.Tableから出力ファイルに出力write_table()します。現状、decimalを定義するにはこの方式でParquetに変換する必要があります。入力ファイルにPandasを利用するメリットは、Dataframeに読み込んだデータならParquet出力できる点です。例えば、JSON、XML/HTML、ExcelファイルなんかもParquetに変換できる柔軟さがあります。

制限事項

データをPandasとPyArrowの2つのツールでメモリに持つことになるので、大きなファイルの変換には不向きです。

注意

  • int型でNaNが存在する場合は、float型で定義 -> parquet作成時にint型に指定するように上記のソースに反映。
  • float型でNaNが存在する場合は、回避方法が見つからない

サンプルコード

サンプルコードでは、データ構造に依存する部分とそれ以外(読み込み・変換・出力部分)の分離を試みましたが、引数が多くなってしまいました。

import pandas as pd
from pyarrow import int16, int32, int64, float64, float32, bool_, date32, decimal128, timestamp, string, Table, schema, parquet as pq
import decimal as dc
from boto3.session import Session


# テキストファイル読込み
def get_dataframe(input_file: str, pandas_dtype: dict, pandas_convert: dict, pandas_parse_dates: list) -> pd.DataFrame:
    dataframe = pd.read_csv(
        filepath_or_buffer=input_file,
        header=0,  # 0:ヘッダーあり
        encoding="utf-8",  # 文字コード
        sep='\t',  # セパレータ文字
        na_values='',  # NaNを置き換える文字列
        # compression='gzip', # gzip圧縮
        dtype=pandas_dtype,
        converters=pandas_convert,
        parse_dates=pandas_parse_dates,
        infer_datetime_format=True  # 日付データを読み込む速度を向上させる
    )
    return dataframe


# TSVファイルをParquetファイルに変換
def tsv2parquet(input_file: str, output_file: str, pandas_dtype: dict, pandas_convert: dict, pandas_parse_dates: list, pyarrow_schema: schema):
    dataframe = get_dataframe(
        input_file=input_file,
        pandas_dtype=pandas_dtype,
        pandas_convert=pandas_convert,
        pandas_parse_dates=pandas_parse_dates
    )
    pyarrow_table = Table.from_pandas(
        dataframe,
        pyarrow_schema,
        preserve_index=False
    )
    pq.write_table(
        pyarrow_table,
        output_file,
        compression='snappy',  # 圧縮タイプ指定
        flavor=['spark'],  # 互換性のためsparkを設定
        use_deprecated_int96_timestamps=True  # int96のParquet形式で書き込み
    )


#
# main
#
def main():
    pyarrow_home = '/Users/cmsuer/Desktop'
    tablename = 'sales'
    input_files = pyarrow_home + '/' + tablename + '.tsv'
    output_file = pyarrow_home + '/' + tablename + '.parquet'

    # Pandasに読み込むデータ定義、
    pandas_dtype = {
        'id': float,  # NaN対策(int -> float)
        'price': float,  # NaN対策(int -> float)
        'total': float,  # NaN対策(int -> float)
        'price_profit': float,
        'total_profit': float,
        # 'discount': decimal(10, 2),
        'visible': bool,
        'name': str,
        # 'created': timestamp,
        # 'updated': timestamp
    }

    # Pandasに読み込んだ型変換
    pandas_convert = {'discount': dc.Decimal}

    # 日時型のカラムの指定
    pandas_parse_dates = ['created', 'updated']

    # PandasからPyArrowのテーブルに変換するテータ定義
    pyarrow_schema = schema({
        'id': int16(),
        'price': int32(),
        'total': int64(),
        'price_profit': float32(),
        'total_profit': float64(),
        'discount': decimal128(10, 2),
        'visible': bool_(),
        'name': string(),
        'created': timestamp('s'),
        'updated': timestamp('s')
    })

    # TSVファイルをParquetファイルに変換
    tsv2parquet(input_files, output_file, pandas_dtype, pandas_convert, pandas_parse_dates, pyarrow_schema)

    # ParquetファイルをS3にアップロード
    s3bucket = 'cm-user'
    s3key = 'tmp/' + tablename + '/' + tablename + '.snappy.parquet'
    session = Session(profile_name='cmuser')
    s3 = session.resource('s3', region_name='ap-northeast-1')
    bucket = s3.Bucket(s3bucket)
    bucket.upload_file(output_file, s3key)


if __name__ == "__main__":
    main()

テーブル定義

Glueのクローラで自動生成したDDLは以下のとおりです。discountカラムが、decimal(10,2)で定義されました。

CREATE EXTERNAL TABLE `sales`(
  `id` smallint, 
  `price` int, 
  `total` bigint, 
  `price_profit` float, 
  `total_profit` double, 
  `discount` decimal(10,2), 
  `visible` boolean, 
  `name` string, 
  `created` timestamp, 
  `updated` timestamp)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://cm-user/tmp/sales/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='sales', 
  'averageRecordSize'='648', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='1', 
  'recordCount'='1', 
  'sizeKey'='3424', 
  'typeOfData'='file')

Amazon Athenaからクエリした結果

float型やdouble型除き、カラムがNullでもエラーにならず結果が返ります。

まとめ

PyArrowでdate型をサポートできるまで、ブログ化を我慢していましたが、シビレを切らして書いてしまいました。Aapche Arrowを使うと、Parquetファイルが非常に早く、直感的に作成できます。元々HiveやSparkでデータ分析していた方であれば、bitgint型とstring型があればよいのかもしれませんが、Redshift Spectrumとの連携を考えると、decimal型やdate型のサポートは望ましいです。サポートされたらブログにてまたご報告します。

参考文献

以下のブログは、大変参考になりました。

合わせて読みたい

Amazon Athena: カラムナフォーマット『Parquet』でクエリを試してみた #reinvent

Amazon Athena が待望のCTAS(CREATE TABLE AS)をサポートしました!