この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
インメモリの列指向データフォーマットを持つApache Arrow(pyarrow)を用いて簡単かつ高速にParquetに変換できることを「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました のレポートで以前ご紹介しました。今回は最新のpyarrow バージョン0.13.0にてCSVファイルをParquetファイルに変換する方法と、Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型がどこまでサポートしているかも検証します。
「db analytics showcase Sapporo 2018」で玉川竜司さんのParquetの話を聞いてきました #dbts2018 #dbasSPR
Parquetファイルに変換する方法
一般にCSVファイルをParquetに変換するは、Apache Spark(AWS GlueやEMR)を用いることが一般的ですが、過去のブログでApache DrillでParquetに変換したり、Amazon AthenaのCTASでParquetに変換する方法についても紹介してきました。
Parquet用のPythonのライブラリとして以下の3つが有名です。
- fastparquet: PyDataの分散処理フレームワークのDaskプロジェクトで開発
- pyarrow: pandasの開発者、Wes McKinneyさんたちが開発
- Pandas(0.22以降):DataFrameをto_parquetすると直接Parquetファイルとして保存
今回は、Hadoopエコシステムとの親和性(分散ファイルシステムに置かれたデータセットの読み取り)が優れており、進化のスピードが早い、Wes McKinneyさんを応援したいという気持ち「大」のため、PyArrowをご紹介します。
Apache Arrow(PyArrow)とは
Apache Arrowは、インメモリの列指向データフォーマットです。Hadoop/Spark(Hadoopのエコシステム)やPandas(PyDataのエコシステム)をはじめとするツール間でゼロコピーデータアクセスを目指し、Apache Arrowがその共通データフォーマットになるように設計されています。つまり、Apache Arrowに読み込むと他のツールとデータが共有できたり、様々なデータファイルフォーマットに出力できるようになります。
詳しくは、(翻訳)Apache Arrowと「pandasの10項目の課題」をお読みください。
PyArrowによるParquetファイルに変換
Parquetファイルに変換する方法は、「方法1:PyArrowから直接CSVファイルを読み込んでParquet出力」と「方法2:PandasでCSVファイルを読み込んでPyArrowでParquet出力」の2つあります。それぞれに対して、サポートしているデータ型をそれぞれ検証します。
サポートしているデータ型
Amazon AthenaとAmazon Redshift Spectrumの両方でサポートしているデータ型がどこまでサポートしているか、以下のデータ型について検証します。それぞれのParquetに変換したときのデータ型の対応は以下のとおりです。Apache Arrowでサポートしているデータ型のすべてが、PyArrowでサポートしているわけではないことにご注意ください。
※ Apache Arrowがサポートしている全てのデータ型は、Data Types and Schemasをご覧ください。
Python3 | Pandas | PyArrow | Amazon Athena | Amazon Redshift Spectrum | 対応状況 |
---|---|---|---|---|---|
int | int64 | int16 | smallint | smallint | ○ |
int | int64 | int32 | int | integer | ○ |
int | int64 | int64 | bigint | bigint | ○ |
float | float64 | float32 | float | real | ○ |
float | float64 | float64 | double | double precision | ○ |
decimal | - | - | decimal | decimal | △ |
bool | bool | bool_ | boolean | boolean | ○ |
str | object | string | string | varchar(65535) | ○ |
str | object | string | char(n) | char(n) | ○ |
str | object | string | varchar(n) | varchar(n) | ○ |
date | - | - | date | date | ☓ |
datetime | datetime64 | timestamp | timestamp | timestamp | ○ |
今回は、この組み合わせを試すために以下のカラムのデータ(sales.tsv)を用意しました。
No | カラム名 | PyArroのデータ型の定義 | 実際のデータ | 備考 |
---|---|---|---|---|
1 | id | int16() | 1 | |
2 | price | int32() | 20000 | |
3 | total | int64() | 300000000 | |
4 | price_profit | float32() | 4.56 | |
5 | total_profit | float64() | 67.89 | |
6 | discount | decimal(10, 2) | 789012.34 | 方法1はfloat64で作成しました |
7 | visible | bool_() | True | |
8 | name | string() | QuietComfort 35 | |
9 | created | timestamp('s') | 2019-06-14 | 方法1と方法2ともにdate32をサポートしていないので、timestampで作成しました |
10 | updated | timestamp('s') | 2019-06-14 23:59:59 |
id price total price_profit total_profit discount visible name created updated
1 20000 300000000 4.56 67.89 789012.34 True QuietComfort 35 2019-06-14 2019-06-14 23:59:59
方法1:PyArrowから直接CSVファイルを読み込んでParquet出力
まずは最もシンプルなPyArrowで変換する方法をご紹介します。入力ファイルのパス、出力ファイルのパス、カラムのデータ型定義の3つを指定するのみです。
処理の流れ
PyArrowの入力ファイル名をカラムのデータ型定義に基づいて読み込みread_csv()、pyarrow.Tableを作成します。作成したpyarrow.Tableから出力ファイルに出力write_table()します。
制限事項
PyArrowのread_csv()では、decimalとdate32をサポートしていません。そのため、データ型はdecimalはfloat64、date32はtimestampで作成しました。PyArrowの将来的なアップデートで、decimalやdate32のサポートを期待したいところです。
decimalとdate32を指定すると、それぞれ以下のエラーが発生します。
Traceback (most recent call last):
File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 62, in <module>
column_types
File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 31, in get_pyarrow_table
convert_options=convertoptions
File "pyarrow/_csv.pyx", line 397, in pyarrow._csv.read_csv
File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: CSV conversion to date32[day] is not supported
Traceback (most recent call last):
File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 62, in <module>
column_types
File "/Users/cmuser/IdeaProjects/pyarrow/pyarrow_parquet.py", line 31, in get_pyarrow_table
convert_options=convertoptions
File "pyarrow/_csv.pyx", line 397, in pyarrow._csv.read_csv
File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: CSV conversion to decimal(10, 2) is not supported
サンプルスクリプト
- PyArrowのread_csv()では、入力ファイルの他に読み込みや変換、パースに関するオプション指定が可能です。(下記の10〜25行)
- カラムの定義は、カラム名とデータ型をPythonのディクショナリで定義します。(下記の58〜69行)
- PyArrowのwrite_table()では、出力ファイルの他に圧縮タイプやApache SparkのParquetフォーマット互換を指定します。(下記の42〜47行)
from pyarrow.csv import read_csv, ReadOptions, ParseOptions, ConvertOptions
from pyarrow import int16, int32, int64, float64, float32, bool_, date32, decimal128, timestamp, string, Table, parquet as pq
from boto3.session import Session
#
# Functions
#
def get_pyarrow_table(input_files: str, column_types: dict) -> Table:
readoptions = ReadOptions(
use_threads=True, # 複数の読み取りスレッドの利用
block_size=1000 # 読み取りブロック数
)
#
convertoptions = ConvertOptions(
check_utf8=True, # 文字列カラムのUTF-8妥当性をチェック
column_types=column_types, # 列のデータ型を辞書型で渡す
null_values=[''] # データ内のNULLを表す文字列
)
parseoptions = ParseOptions(
delimiter='\t', # タブ区切り指定
double_quote=False, # ダブルクオートで括らない
escape_char='\'', # エスケープ文字の指定
header_rows=1 # 先頭がヘッダ行
)
pyarrow_table = read_csv(
input_file=input_files,
read_options=readoptions,
parse_options=parseoptions,
convert_options=convertoptions
)
return pyarrow_table
def tsv2parquet(input_file: str, output_file: str, column_types: dict):
arrow_table = get_pyarrow_table(
input_file,
column_types
)
pq.write_table(
arrow_table,
output_file,
compression='snappy', # snappyで圧縮
flavor=['spark'], # spark互換の設定
)
#
# main
#
def main():
pyarrow_home = '/Users/cmuser/Desktop'
tablename = 'sales'
input_files = pyarrow_home + '/' + tablename + '.tsv'
output_file = pyarrow_home + '/' + tablename + '.parquet'
column_types = {
'id': int16(),
'price': int32(),
'total': int64(),
'price_profit': float32(),
'total_profit': float64(),
'discount': float64(),
'visible': bool_(),
'name': string(),
'created': timestamp('s'),
'updated': timestamp('s')
}
tsv2parquet(input_files, output_file, column_types)
# Parquet -> S3
s3bucket = 'cm-user'
s3key = 'tmp/' + tablename + '/' + tablename + '.snappy.parquet'
session = Session(profile_name='cm-user')
s3 = session.resource('s3', region_name='ap-northeast-1')
bucket = s3.Bucket(s3bucket)
bucket.upload_file(output_file, s3key)
if __name__ == "__main__":
main()
テーブル定義
Glueのクローラで自動生成したDDLは以下のとおりです。createdカラムは日付のみですがdate型ではなくtimestamp型として定義されました。
CREATE EXTERNAL TABLE `sales`(
`id` smallint,
`price` int,
`total` bigint,
`price_profit` float,
`total_profit` double,
`discount` double,
`visible` boolean,
`name` string,
`created` timestamp,
`updated` timestamp)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://cm-user/tmp/sales/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='sales',
'averageRecordSize'='673',
'classification'='parquet',
'compressionType'='none',
'objectCount'='1',
'recordCount'='1',
'sizeKey'='2214',
'typeOfData'='file')
方法2:PandasでCSVファイルを読み込んでPyArrowでParquet出力
PandasでCSVファイルを読み込んでPyArrowでParquet出力する方法をご紹介します。入力ファイルのパス、出力ファイルのパス、カラムのデータ型や変換の定義などを指定する。
処理の流れ
Pandasの入力ファイル名をカラムのデータ型定義に基づいて読み込みread_csv()、pyarrow.Tableを作成します。作成したpyarrow.Tableから出力ファイルに出力write_table()します。現状、decimalを定義するにはこの方式でParquetに変換する必要があります。入力ファイルにPandasを利用するメリットは、Dataframeに読み込んだデータならParquet出力できる点です。例えば、JSON、XML/HTML、ExcelファイルなんかもParquetに変換できる柔軟さがあります。
制限事項
データをPandasとPyArrowの2つのツールでメモリに持つことになるので、大きなファイルの変換には不向きです。
注意
- int型でNaNが存在する場合は、float型で定義 -> parquet作成時にint型に指定するように上記のソースに反映。
- float型でNaNが存在する場合は、回避方法が見つからない
サンプルコード
サンプルコードでは、データ構造に依存する部分とそれ以外(読み込み・変換・出力部分)の分離を試みましたが、引数が多くなってしまいました。
import pandas as pd
from pyarrow import int16, int32, int64, float64, float32, bool_, date32, decimal128, timestamp, string, Table, schema, parquet as pq
import decimal as dc
from boto3.session import Session
# テキストファイル読込み
def get_dataframe(input_file: str, pandas_dtype: dict, pandas_convert: dict, pandas_parse_dates: list) -> pd.DataFrame:
dataframe = pd.read_csv(
filepath_or_buffer=input_file,
header=0, # 0:ヘッダーあり
encoding="utf-8", # 文字コード
sep='\t', # セパレータ文字
na_values='', # NaNを置き換える文字列
# compression='gzip', # gzip圧縮
dtype=pandas_dtype,
converters=pandas_convert,
parse_dates=pandas_parse_dates,
infer_datetime_format=True # 日付データを読み込む速度を向上させる
)
return dataframe
# TSVファイルをParquetファイルに変換
def tsv2parquet(input_file: str, output_file: str, pandas_dtype: dict, pandas_convert: dict, pandas_parse_dates: list, pyarrow_schema: schema):
dataframe = get_dataframe(
input_file=input_file,
pandas_dtype=pandas_dtype,
pandas_convert=pandas_convert,
pandas_parse_dates=pandas_parse_dates
)
pyarrow_table = Table.from_pandas(
dataframe,
pyarrow_schema,
preserve_index=False
)
pq.write_table(
pyarrow_table,
output_file,
compression='snappy', # 圧縮タイプ指定
flavor=['spark'], # 互換性のためsparkを設定
use_deprecated_int96_timestamps=True # int96のParquet形式で書き込み
)
#
# main
#
def main():
pyarrow_home = '/Users/cmsuer/Desktop'
tablename = 'sales'
input_files = pyarrow_home + '/' + tablename + '.tsv'
output_file = pyarrow_home + '/' + tablename + '.parquet'
# Pandasに読み込むデータ定義、
pandas_dtype = {
'id': float, # NaN対策(int -> float)
'price': float, # NaN対策(int -> float)
'total': float, # NaN対策(int -> float)
'price_profit': float,
'total_profit': float,
# 'discount': decimal(10, 2),
'visible': bool,
'name': str,
# 'created': timestamp,
# 'updated': timestamp
}
# Pandasに読み込んだ型変換
pandas_convert = {'discount': dc.Decimal}
# 日時型のカラムの指定
pandas_parse_dates = ['created', 'updated']
# PandasからPyArrowのテーブルに変換するテータ定義
pyarrow_schema = schema({
'id': int16(),
'price': int32(),
'total': int64(),
'price_profit': float32(),
'total_profit': float64(),
'discount': decimal128(10, 2),
'visible': bool_(),
'name': string(),
'created': timestamp('s'),
'updated': timestamp('s')
})
# TSVファイルをParquetファイルに変換
tsv2parquet(input_files, output_file, pandas_dtype, pandas_convert, pandas_parse_dates, pyarrow_schema)
# ParquetファイルをS3にアップロード
s3bucket = 'cm-user'
s3key = 'tmp/' + tablename + '/' + tablename + '.snappy.parquet'
session = Session(profile_name='cmuser')
s3 = session.resource('s3', region_name='ap-northeast-1')
bucket = s3.Bucket(s3bucket)
bucket.upload_file(output_file, s3key)
if __name__ == "__main__":
main()
テーブル定義
Glueのクローラで自動生成したDDLは以下のとおりです。discountカラムが、decimal(10,2)で定義されました。
CREATE EXTERNAL TABLE `sales`(
`id` smallint,
`price` int,
`total` bigint,
`price_profit` float,
`total_profit` double,
`discount` decimal(10,2),
`visible` boolean,
`name` string,
`created` timestamp,
`updated` timestamp)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://cm-user/tmp/sales/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='sales',
'averageRecordSize'='648',
'classification'='parquet',
'compressionType'='none',
'objectCount'='1',
'recordCount'='1',
'sizeKey'='3424',
'typeOfData'='file')
Amazon Athenaからクエリした結果
float型やdouble型除き、カラムがNullでもエラーにならず結果が返ります。
まとめ
PyArrowでdate型をサポートできるまで、ブログ化を我慢していましたが、シビレを切らして書いてしまいました。Aapche Arrowを使うと、Parquetファイルが非常に早く、直感的に作成できます。元々HiveやSparkでデータ分析していた方であれば、bitgint型とstring型があればよいのかもしれませんが、Redshift Spectrumとの連携を考えると、decimal型やdate型のサポートは望ましいです。サポートされたらブログにてまたご報告します。
参考文献
以下のブログは、大変参考になりました。