ちょっと話題の記事

Amazon Athena: カラムナフォーマット『Parquet』でクエリを試してみた #reinvent

2016.12.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日『AWS re:Invent 2016』にて発表された新サービス『Amazon Athena』は、マニュアルにもある通りAWSが提供するフルマネージドHiveサービスと言えるでしょう。DWH用途で考えるとレコードをフルスキャンするよりも特定のカラムを集計・フィルタするというユースケースが多くなりそうですので、カラムナフォーマット『Parquet』を試したみたいと思いました。Parquetファイルの変換や、一般的なCSVとの簡単な比較をしてみました。(意外な結果が...)

カラムナフォーマット『Parquet』とは

データ分析では大福帳フォーマットのテーブルデータに対して、特定の列の値を集計したり、フィルタリングすることが多いため、カラム毎にデータが連続して格納されていると必要なデータのみをピンポイントで読み込むことができるからです。また、列方向には同じ種類のデータが並んでいるため、圧縮アルゴリズムも効きやすくなることが期待できます。

20161205-row-column-oriented

Parquetは、この列方向にデータを分割して格納することを垂直パーティショニングと言いますが、さらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っています。

検証データ

Amazon Athenaは、第一印象はCSVでも結構速いという印象したので、少ないレコード数ではクエリの実行時間にブレが生じるので、検証データは、AWSが提供しているテストデータのlineorderテーブルの一部(75004739レコード)を利用します。

s3://awssampledb/ssbgz/lineorder0000_part_00.gz

カラムナフォーマット『Parquet』ファイルの作成

カラムなフォーマット『Parquet』ファイルの作成方法は、Converting to Columnar Formats のようにEMR(Hadoop)を利用する方法が紹介されていますが、ここではより簡単に Apache Drill を使う方法を紹介します。

lineorder0000_part_00.gzを展開して作成されたファイルは、「|」(パイプ文字)で区切られたファイルですので、これを「,」(カンマ)に置き換えて、先頭行にカラム名のヘッダ行を追加した、CSVHファイルを変換元ファイルとして作成します。 後は、Apache Drill で フォーマットに parquet を指定して、ファイルを生成します。

[ec2-user@ip-10-0-0-56 ~]$ cd apache-drill-1.9.0
[ec2-user@ip-10-0-0-56 apache-drill-1.9.0]$ bin/drill-embedded
12 03, 2016 11:35:54 午前 org.glassfish.jersey.server.ApplicationHandler initialize
情報: Initiating Jersey application, version Jersey: 2.8 2014-04-29 01:25:26...
apache drill 1.9.0
"got drill?"
0: jdbc:drill:zk=local> ALTER SESSION SET `store.format` = 'parquet';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.909 seconds)
0: jdbc:drill:zk=local>
0: jdbc:drill:zk=local> CREATE TABLE dfs.tmp.`/data/dest3/` AS (
. . . . . . . . . . . > SELECT
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_orderkey = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_orderkey AS INT)
. . . . . . . . . . . > END AS lo_orderkey,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_linenumber = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_linenumber AS INT)
. . . . . . . . . . . > END AS lo_linenumber,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_custkey = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_custkey AS INT)
. . . . . . . . . . . > END AS lo_custkey,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_partkey = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_partkey AS INT)
. . . . . . . . . . . > END AS lo_partkey,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_suppkey = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_suppkey AS INT)
. . . . . . . . . . . > END AS lo_suppkey,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_orderdate = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_orderdate AS INT)
. . . . . . . . . . . > END AS lo_orderdate,
. . . . . . . . . . . > lo_orderpriority,
. . . . . . . . . . . > lo_shippriority,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_quantity = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_quantity AS INT)
. . . . . . . . . . . > END AS lo_quantity,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_extendedprice = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_extendedprice AS INT)
. . . . . . . . . . . > END AS lo_extendedprice,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_ordertotalprice = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_ordertotalprice AS INT)
. . . . . . . . . . . > END AS lo_ordertotalprice,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_discount = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_discount AS INT)
. . . . . . . . . . . > END AS lo_discount,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_revenue = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_revenue AS INT)
. . . . . . . . . . . > END AS lo_revenue,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_supplycost = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_supplycost AS INT)
. . . . . . . . . . . > END AS lo_supplycost,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_tax = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_tax AS INT)
. . . . . . . . . . . > END AS lo_tax,
. . . . . . . . . . . > CASE
. . . . . . . . . . . >   WHEN lo_commitdate = '' THEN NULL
. . . . . . . . . . . >   ELSE CAST(lo_commitdate AS INT)
. . . . . . . . . . . > END AS lo_commitdate,
. . . . . . . . . . . > lo_shipmode
. . . . . . . . . . . > FROM dfs.tmp.`/data/source/lineorder.csvh`
. . . . . . . . . . . > );
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2016/12/03 11:37:30 情報: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 43,751,170B for [lo_orderkey] INT32: 10,936,946 values, 43,748,118B raw, 43,749,456B comp, 42 pages, encodings: [PLAIN, BIT_PACKED, RLE]
2016/12/03 11:37:30 情報: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 18,269,638B for [lo_linenumber] INT32: 10,936,946 values, 43,748,118B raw, 18,267,959B comp, 42 pages, encodings: [PLAIN, BIT_PACKED, RLE]
:
:
2016/12/03 11:44:02 情報: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 29,996,033B for [lo_orderdate] INT32: 9,387,027 values, 37,548,396B raw, 29,994,594B comp, 36 pages, encodings: [PLAIN, BIT_PACKED, RLE]
2016/12/03 11:44:02 情報: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 21,471,877B for [lo_orderpriority] BINARY: 
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 75004739                   |
+-----------+----------------------------+
1 row selected (463.332 seconds)
0: jdbc:drill:zk=local>

以下のようにファイルが生成されましたので、S3にアップロードします。

[ec2-user@ip-10-0-0-56 ~]$ ll /tmp/data/source/
合計 7401036
-rw-rw-r-- 1 ec2-user ec2-user 7578653248 12月  2 04:02 lineorder.csvh

[ec2-user@ip-10-0-0-56 dest3]$ ls -la
合計 3599968
drwxrwxr-x 2 ec2-user ec2-user      4096 12月  3 11:43 .
drwxr-xr-x 8 ec2-user ec2-user      4096 12月  3 11:25 ..
-rw-r--r-- 1 ec2-user ec2-user   4166352 12月  3 11:37 .0_0_0.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user   4166592 12月  3 11:38 .0_0_1.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user   4166508 12月  3 11:39 .0_0_2.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user   4166684 12月  3 11:40 .0_0_3.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user   4166508 12月  3 11:42 .0_0_4.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user   4166684 12月  3 11:43 .0_0_5.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user   3576732 12月  3 11:44 .0_0_6.parquet.crc
-rw-r--r-- 1 ec2-user ec2-user 533291834 12月  3 11:37 0_0_0.parquet
-rw-r--r-- 1 ec2-user ec2-user 533322269 12月  3 11:38 0_0_1.parquet
-rw-r--r-- 1 ec2-user ec2-user 533311842 12月  3 11:39 0_0_2.parquet
-rw-r--r-- 1 ec2-user ec2-user 533334192 12月  3 11:40 0_0_3.parquet
-rw-r--r-- 1 ec2-user ec2-user 533311533 12月  3 11:42 0_0_4.parquet
-rw-r--r-- 1 ec2-user ec2-user 533334166 12月  3 11:43 0_0_5.parquet
-rw-r--r-- 1 ec2-user ec2-user 457820356 12月  3 11:44 0_0_6.parquet

[ec2-user@ip-10-0-0-56 dest3]$ aws s3 sync /tmp/data/dest3/ s3://cm-datalake/lineorder/parquet/
upload: ./.0_0_5.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_5.parquet.crc
upload: ./.0_0_6.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_6.parquet.crc
upload: ./.0_0_0.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_0.parquet.crc
upload: ./.0_0_1.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_1.parquet.crc
upload: ./.0_0_3.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_3.parquet.crc
upload: ./.0_0_2.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_2.parquet.crc
upload: ./.0_0_4.parquet.crc to s3://cm-datalake/lineorder/parquet/.0_0_4.parquet.crc
upload: ./0_0_1.parquet to s3://cm-datalake/lineorder/parquet/0_0_1.parquet
upload: ./0_0_4.parquet to s3://cm-datalake/lineorder/parquet/0_0_4.parquet
upload: ./0_0_0.parquet to s3://cm-datalake/lineorder/parquet/0_0_0.parquet
upload: ./0_0_2.parquet to s3://cm-datalake/lineorder/parquet/0_0_2.parquet
upload: ./0_0_3.parquet to s3://cm-datalake/lineorder/parquet/0_0_3.parquet
upload: ./0_0_5.parquet to s3://cm-datalake/lineorder/parquet/0_0_5.parquet
upload: ./0_0_6.parquet to s3://cm-datalake/lineorder/parquet/0_0_6.parquet

テーブルの作成

マネジメントコンソールの Athenaの画面からフォーマット指定〜カラム指定を繰り返して定義しても構いませんが、実際の分析データは100カラム超えも少ないくないので、CREATE EXTERNAL TABLE を書くほうが楽ですし、変更管理ができる点でもおすすめです。

以下の定義をマネジメントコンソールから[Run Query]します。 20161205-create-external-table

-- DROP TABLE default.lineorder_parquet;
CREATE EXTERNAL TABLE IF NOT EXISTS default.lineorder_parquet (
  lo_orderkey int,
  lo_linenumber int,
  lo_custkey int,
  lo_partkey int,
  lo_suppkey int,
  lo_orderdate int,
  lo_orderpriority string,
  lo_shippriority string,
  lo_quantity int,
  lo_extendedprice int,
  lo_ordertotalprice int,
  lo_discount int,
  lo_revenue int,
  lo_supplycost int,
  lo_tax int,
  lo_commitdate int,
  lo_shipmode string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://cm-datalake/lineorder/parquet/';

-- DROP TABLE default.lineorder_csv;
CREATE EXTERNAL TABLE IF NOT EXISTS default.lineorder_csv (
  lo_orderkey int,
  lo_linenumber int,
  lo_custkey int,
  lo_partkey int,
  lo_suppkey int,
  lo_orderdate int,
  lo_orderpriority string,
  lo_shippriority string,
  lo_quantity int,
  lo_extendedprice int,
  lo_ordertotalprice int,
  lo_discount int,
  lo_revenue int,
  lo_supplycost int,
  lo_tax int,
  lo_commitdate int,
  lo_shipmode string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://cm-datalake/lineorder/csv/';

Perquet vs CSV

以下の計測結果は2回目以降の計測値の平均となります。

10レコード表示

SELECT * FROM default.lineorder_parquet limit 10;
SELECT * FROM default.lineorder_csv limit 10;

行方向に10レコード取り出すので、CSVフォーマットが速い結果になりました。

Run time Data scanned
Parquet 平均7.63 s 436.61MB
CSV 平均0.56 s 5.64MB

レコードカウント

SELECT COUNT(*) FROM default.lineorder_parquet;
SELECT COUNT(*) FROM default.lineorder_csv;

CSVフォーマットは全データスキャンが発生しているので遅いようです。と言っても7.06GBを3.43秒はかなり速い。

Run time Data scanned
Parquet 平均1.93 s 0KB
CSV 平均3.43 s 7.06GB

条件指定によるレコード取得

SELECT * FROM default.lineorder_parquet WHERE lo_orderdate = 19950101
SELECT * FROM default.lineorder_csv WHERE lo_orderdate = 19950101

Parquetフォーマットの方がスキャンしているデータが少ないですがCSVフォーマットよりも時間を明らかに遅い結果になりました。

Run time Data scanned
Parquet 平均20.2 s 3.41GB
CSV 平均2.66 s 7.06GB

カラム指定によるレコード取得

SELECT lo_custkey, lo_partkey, lo_suppkey, lo_orderdate 
FROM default.lineorder_parquet
WHERE lo_orderdate = 19950101;

SELECT lo_custkey, lo_partkey, lo_suppkey, lo_orderdate 
FROM default.lineorder_csv
WHERE lo_orderdate = 19950101;

カラムを限定すると早くなるだろうと思っていましたが、これでもCSVフォーマットのほうが速いようです。

Run time Data scanned
Parquet 平均5.69 s 1.06GB
CSV 平均2.36 s 7.06GB

今度は条件と抽出カラムを同じにしたクエリを実行します。

SELECT lo_orderdate 
FROM default.lineorder_parquet
WHERE lo_orderdate = 19950101;

SELECT lo_orderdate 
FROM default.lineorder_csv
WHERE lo_orderdate = 19950101;

この条件でやっとParquetの方が良い結果が得られました。

Run time Data scanned
Parquet 平均3.14 s 228.56MB
CSV 平均3.32 s 7.06GB

集計

SELECT lo_orderdate, sum(lo_ordertotalprice) AS lo_ordertotalprice_daily 
FROM lineorder_parquet 
GROUP BY lo_orderdate 
ORDER BY lo_orderdate;

SELECT lo_orderdate, sum(lo_ordertotalprice) AS lo_ordertotalprice_daily 
FROM lineorder_csv 
GROUP BY lo_orderdate 
ORDER BY lo_orderdate;

この条件の集計では大きな差は見られませんでしたが、集計カラムを増やすとParquetのスキャンが増えて遅くなりました。

Run time Data scanned
Parquet 平均3.82 s 484.49MB
CSV 平均3.65 s 7.06GB

まとめ

「CSVファイルよりParquetファイルの方が速い」という決めつけで始めた検証でしたが、実際にはCSVファイルの方が早いケースが多いという結果に終わりました。Parquetファイルの作成の段階でチューニングの余地があるのかもしれません。一方、Parquetファイルにすることで、data scannedは大きく削減されたことが確認できました。サービス利用費(Amazon Athena Pricing)には「Price Per Query:$5 per TB of data scanned.」との記載があり、データ転送量を少ないParquetファイルは利用費削減に効果が期待できます。

Amazon Athenaは、S3上のCSVファイルにテーブル定義(CREATE EXTERNAL TABLE)を適用するだけで、ファイルに変更を加える必要なく、高速にクエリを実行することができます。他社の類似サービスのように、CSVファイルをロードし直したり、変換する必要が無い点は強みと言えるでしょう。