Apache Arrow(PyArrow)を使って簡単かつ高速にParquetファイルに変換する
インメモリの列指向データフォーマットを持つApache Arrow(pyarrow)を用いて簡単かつ高速にParquetに変換できることを「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました のレポートで以前ご紹介しました。今回は最新のpyarrow バージョン0.13.0にてCSVファイルをParquetファイルに変換する方法と、Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型がどこまでサポートしているかも検証します。
「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました #dbts2018 #dbasSPR
Parquetファイルに変換する方法
一般にCSVファイルをParquetに変換するは、Apache Spark(AWS GlueやEMR)を用いることが一般的ですが、過去のブログでApache DrillでParquetに変換したり、Amazon AthenaのCTASでParquetに変換する方法についても紹介してきました。
Parquet用のPythonのライブラリとして以下の3つが有名です。
- fastparquet: PyDataの分散処理フレームワークのDaskプロジェクトで開発
- pyarrow: pandasの開発者、Wes McKinneyさんたちが開発
- Pandas(0.22以降):DataFrameをto_parquetすると直接Parquetファイルとして保存
今回は、Hadoopエコシステムとの親和性(分散ファイルシステムに置かれたデータセットの読み取り)が優れており、進化のスピードが早い、Wes McKinneyさんを応援したいという気持ち「大」のため、PyArrowをご紹介します。
Apache Arrow(PyArrow)とは
Apache Arrowは、インメモリの列指向データフォーマットです。Hadoop/Spark(Hadoopのエコシステム)やPandas(PyDataのエコシステム)をはじめとするツール間でゼロコピーデータアクセスを目指し、Apache Arrowがその共通データフォーマットになるように設計されています。つまり、Apache Arrowに読み込むと他のツールとデータが共有できたり、様々なデータファイルフォーマットに出力できるようになります。
詳しくは、(翻訳)Apache Arrowと「pandasの10項目の課題」をお読みください。
PyArrowによるParquetファイルに変換
Parquetファイルに変換する方法は、「方法1:PyArrowから直接CSVファイルを読み込んでParquet出力」と「方法2:PandasでCSVファイルを読み込んでPyArrowでParquet出力」の2つあります。それぞれに対して、サポートしているデータ型をそれぞれ検証します。
サポートしているデータ型
Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型がどこまでサポートしているか、以下のデータ型について検証します。それぞれのParquetに変換したときのデータ型の対応は以下のとおりです。Apache Arrowでサポートしているデータ型のすべてが、PyArrowでサポートしているわけではないことにご注意ください。
※ Apache Arrowがサポートしている全てのデータ型は、Data Types and Schemasをご覧ください。
Python3 | Pandas | PyArrow | Amazon Athena | Amazon Redshift Spectrum | 対応状況 |
---|---|---|---|---|---|
int | int64 | int16 | smallint | smallint | ○ |
int | int64 | int32 | int | integer | ○ |
int | int64 | int64 | bigint | bigint | ○ |
float | float64 | float32 | float | real | ○ |
float | float64 | float64 | double | double precision | ○ |
decimal | - | - | decimal | decimal | △ |
bool | bool | bool_ | boolean | boolean | ○ |
str | object | string | string | varchar(65535) | ○ |
str | object | string | char(n) | char(n) | ○ |
str | object | string | varchar(n) | varchar(n) | ○ |
date | - | - | date | date | ☓ |
datetime | datetime64 | timestamp | timestamp | timestamp | ○ |
今回は、この組み合わせを試すために以下のカラムのデータ(sales.tsv)を用意しました。
No | カラム名 | PyArroのデータ型の定義 | 実際のデータ | 備考 |
---|---|---|---|---|
1 | id | int16() | 1 | |
2 | price | int32() | 20000 | |
3 | total | int64() | 300000000 | |
4 | price_profit | float32() | 4.56 | |
5 | total_profit | float64() | 67.89 | |
6 | discount | decimal(10, 2) | 789012.34 | 方法1はfloat64で作成しました |
7 | visible | bool_() | True | |
8 | name | string() | QuietComfort 35 | |
9 | created | timestamp('s') | 2019-06-14 | 方法1と方法2ともにdate32をサポートしていないので、timestampで作成しました |
10 | updated | timestamp('s') | 2019-06-14 23:59:59 |
id price total price_profit total_profit discount visible name created updated 1 20000 300000000 4.56 67.89 789012.34 True QuietComfort 35 2019-06-14 2019-06-14 23:59:59
方法1:PyArrowから直接CSVファイルを読み込んでParquet出力
まずは最もシンプルなPyArrowで変換する方法をご紹介します。入力ファイルのパス、出力ファイルのパス、カラムのデータ型定義の3つを指定するのみです。
処理の流れ
PyArrowの入力ファイル名をカラムのデータ型定義に基づいて読み込みread_csv()、pyarrow.Tableを作成します。作成したpyarrow.Tableから出力ファイルに出力write_table()します。
制限事項
PyArrowのread_csv()では、decimalとdate32をサポートしていません。そのため、データ型はdecimalはfloat64、date32はtimestampで作成しました。PyArrowの将来的なアップデートで、decimalやdate32のサポートを期待したいところです。
decimalとdate32を指定すると、それぞれ以下のエラーが発生します。
Traceback (most recent call last): File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 62, in <module> column_types File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 31, in get_pyarrow_table convert_options=convertoptions File "pyarrow/_csv.pyx", line 397, in pyarrow._csv.read_csv File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: CSV conversion to date32[day] is not supported
Traceback (most recent call last): File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 62, in <module> column_types File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 31, in get_pyarrow_table convert_options=convertoptions File "pyarrow/_csv.pyx", line 397, in pyarrow._csv.read_csv File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: CSV conversion to decimal(10, 2) is not supported
サンプルスクリプト
- PyArrowのread_csv()では、入力ファイルの他に読み込みや変換、パースに関するオプション指定が可能です。(下記の10〜25行)
- カラムの定義は、カラム名とデータ型をPythonのディクショナリで定義します。(下記の58〜69行)
- PyArrowのwrite_table()では、出力ファイルの他に圧縮タイプやApache SparkのParquetフォーマット互換を指定します。(下記の42〜47行)
from pyarrow.csv import read_csv, ReadOptions, ParseOptions, ConvertOptions from pyarrow import int16, int32, int64, float64, float32, bool_, date32, decimal128, timestamp, string, Table, parquet as pq from boto3.session import Session # # Functions # def get_pyarrow_table(input_files: str, column_types: dict) -> Table: readoptions = ReadOptions( use_threads=True, # 複数の読み取りスレッドの利用 block_size=1000 # 読み取りブロック数 ) # convertoptions = ConvertOptions( check_utf8=True, # 文字列カラムのUTF-8妥当性をチェック column_types=column_types, # 列のデータ型を辞書型で渡す null_values=[''] # データ内のNULLを表す文字列 ) parseoptions = ParseOptions( delimiter='\t', # タブ区切り指定 double_quote=False, # ダブルクオートで括らない escape_char='\'', # エスケープ文字の指定 header_rows=1 # 先頭がヘッダ行 ) pyarrow_table = read_csv( input_file=input_files, read_options=readoptions, parse_options=parseoptions, convert_options=convertoptions ) return pyarrow_table def tsv2parquet(input_file: str, output_file: str, column_types: dict): arrow_table = get_pyarrow_table( input_file, column_types ) pq.write_table( arrow_table, output_file, compression='snappy', # snappyで圧縮 flavor=['spark'], # spark互換の設定 ) # # main # def main(): pyarrow_home = '/Users/cmuser/Desktop' tablename = 'sales' input_files = pyarrow_home + '/' + tablename + '.tsv' output_file = pyarrow_home + '/' + tablename + '.parquet' column_types = { 'id': int16(), 'price': int32(), 'total': int64(), 'price_profit': float32(), 'total_profit': float64(), 'discount': float64(), 'visible': bool_(), 'name': string(), 'created': timestamp('s'), 'updated': timestamp('s') } tsv2parquet(input_files, output_file, column_types) # Parquet -> S3 s3bucket = 'cm-user' s3key = 'tmp/' + tablename + '/' + tablename + '.snappy.parquet' session = Session(profile_name='cm-user') s3 = session.resource('s3', region_name='ap-northeast-1') bucket = s3.Bucket(s3bucket) bucket.upload_file(output_file, s3key) if __name__ == "__main__": main()
テーブル定義
Glueのクローラで自動生成したDDLは以下のとおりです。createdカラムは日付のみですがdate型ではなくtimestamp型として定義されました。
CREATE EXTERNAL TABLE `sales`( `id` smallint, `price` int, `total` bigint, `price_profit` float, `total_profit` double, `discount` double, `visible` boolean, `name` string, `created` timestamp, `updated` timestamp) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://cm-user/tmp/sales/' TBLPROPERTIES ( 'CrawlerSchemaDeserializerVersion'='1.0', 'CrawlerSchemaSerializerVersion'='1.0', 'UPDATED_BY_CRAWLER'='sales', 'averageRecordSize'='673', 'classification'='parquet', 'compressionType'='none', 'objectCount'='1', 'recordCount'='1', 'sizeKey'='2214', 'typeOfData'='file')
方法2:PandasでCSVファイルを読み込んでPyArrowでParquet出力
PandasでCSVファイルを読み込んでPyArrowでParquet出力する方法をご紹介します。入力ファイルのパス、出力ファイルのパス、カラムのデータ型や変換の定義などを指定する。
処理の流れ
Pandasの入力ファイル名をカラムのデータ型定義に基づいて読み込みread_csv()、pyarrow.Tableを作成します。作成したpyarrow.Tableから出力ファイルに出力write_table()します。現状、decimalを定義するにはこの方式でParquetに変換する必要があります。入力ファイルにPandasを利用するメリットは、Dataframeに読み込んだデータならParquet出力できる点です。例えば、JSON、XML/HTML、ExcelファイルなんかもParquetに変換できる柔軟さがあります。
制限事項
データをPandasとPyArrowの2つのツールでメモリに持つことになるので、大きなファイルの変換には不向きです。
注意
- int型でNaNが存在する場合は、float型で定義 -> parquet作成時にint型に指定するように上記のソースに反映。
- float型でNaNが存在する場合は、回避方法が見つからない
サンプルコード
サンプルコードでは、データ構造に依存する部分とそれ以外(読み込み・変換・出力部分)の分離を試みましたが、引数が多くなってしまいました。
import pandas as pd from pyarrow import int16, int32, int64, float64, float32, bool_, date32, decimal128, timestamp, string, Table, schema, parquet as pq import decimal as dc from boto3.session import Session # テキストファイル読込み def get_dataframe(input_file: str, pandas_dtype: dict, pandas_convert: dict, pandas_parse_dates: list) -> pd.DataFrame: dataframe = pd.read_csv( filepath_or_buffer=input_file, header=0, # 0:ヘッダーあり encoding="utf-8", # 文字コード sep='\t', # セパレータ文字 na_values='', # NaNを置き換える文字列 # compression='gzip', # gzip圧縮 dtype=pandas_dtype, converters=pandas_convert, parse_dates=pandas_parse_dates, infer_datetime_format=True # 日付データを読み込む速度を向上させる ) return dataframe # TSVファイルをParquetファイルに変換 def tsv2parquet(input_file: str, output_file: str, pandas_dtype: dict, pandas_convert: dict, pandas_parse_dates: list, pyarrow_schema: schema): dataframe = get_dataframe( input_file=input_file, pandas_dtype=pandas_dtype, pandas_convert=pandas_convert, pandas_parse_dates=pandas_parse_dates ) pyarrow_table = Table.from_pandas( dataframe, pyarrow_schema, preserve_index=False ) pq.write_table( pyarrow_table, output_file, compression='snappy', # 圧縮タイプ指定 flavor=['spark'], # 互換性のためsparkを設定 use_deprecated_int96_timestamps=True # int96のParquet形式で書き込み ) # # main # def main(): pyarrow_home = '/Users/cmsuer/Desktop' tablename = 'sales' input_files = pyarrow_home + '/' + tablename + '.tsv' output_file = pyarrow_home + '/' + tablename + '.parquet' # Pandasに読み込むデータ定義、 pandas_dtype = { 'id': float, # NaN対策(int -> float) 'price': float, # NaN対策(int -> float) 'total': float, # NaN対策(int -> float) 'price_profit': float, 'total_profit': float, # 'discount': decimal(10, 2), 'visible': bool, 'name': str, # 'created': timestamp, # 'updated': timestamp } # Pandasに読み込んだ型変換 pandas_convert = {'discount': dc.Decimal} # 日時型のカラムの指定 pandas_parse_dates = ['created', 'updated'] # PandasからPyArrowのテーブルに変換するテータ定義 pyarrow_schema = schema({ 'id': int16(), 'price': int32(), 'total': int64(), 'price_profit': float32(), 'total_profit': float64(), 'discount': decimal128(10, 2), 'visible': bool_(), 'name': string(), 'created': timestamp('s'), 'updated': timestamp('s') }) # TSVファイルをParquetファイルに変換 tsv2parquet(input_files, output_file, pandas_dtype, pandas_convert, pandas_parse_dates, pyarrow_schema) # ParquetファイルをS3にアップロード s3bucket = 'cm-user' s3key = 'tmp/' + tablename + '/' + tablename + '.snappy.parquet' session = Session(profile_name='cmuser') s3 = session.resource('s3', region_name='ap-northeast-1') bucket = s3.Bucket(s3bucket) bucket.upload_file(output_file, s3key) if __name__ == "__main__": main()
テーブル定義
Glueのクローラで自動生成したDDLは以下のとおりです。discountカラムが、decimal(10,2)で定義されました。
CREATE EXTERNAL TABLE `sales`( `id` smallint, `price` int, `total` bigint, `price_profit` float, `total_profit` double, `discount` decimal(10,2), `visible` boolean, `name` string, `created` timestamp, `updated` timestamp) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://cm-user/tmp/sales/' TBLPROPERTIES ( 'CrawlerSchemaDeserializerVersion'='1.0', 'CrawlerSchemaSerializerVersion'='1.0', 'UPDATED_BY_CRAWLER'='sales', 'averageRecordSize'='648', 'classification'='parquet', 'compressionType'='none', 'objectCount'='1', 'recordCount'='1', 'sizeKey'='3424', 'typeOfData'='file')
Amazon Athenaからクエリした結果
float型やdouble型除き、カラムがNullでもエラーにならず結果が返ります。
まとめ
PyArrowでdate型をサポートできるまで、ブログ化を我慢していましたが、シビレを切らして書いてしまいました。Aapche Arrowを使うと、Parquetファイルが非常に早く、直感的に作成できます。元々HiveやSparkでデータ分析していた方であれば、bitgint型とstring型があればよいのかもしれませんが、Redshift Spectrumとの連携を考えると、decimal型やdate型のサポートは望ましいです。サポートされたらブログにてまたご報告します。
参考文献
以下のブログは、大変参考になりました。