Amazon Athena のPartitioningとBucketingによるパフォーマンス戦略

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

先日ご紹介したAmazon AthenaのCTASサポートで、出力ファイルのBucketingとPartitioningに対応しました。BucketingとPartitioningは、相反するものではなく、必要に応じて組み合わせることでパフォーマンスの改善が期待できます。本日はBucketingとPartitioningの具体的なユースケースとパフォーマンス戦略について解説します。

Amazon Athena が待望のCTAS(CREATE TABLE AS)をサポートしました!

PartitioningとBucketingの解説

Partitioningとは

Partitioningとは、データを指定したキーごとにファイルにまとめてグループ化します。指定したキーのことをパーティションキーと呼びます。一般的なDBのパーティショニングと概念は一緒ですが、Hiveのパーティショニングはキーごとにフォルダを作成してその下にデータファイルを配置します。フォルダ名は「キーと値」もしく「値」のみのいずれかで、前者をカラムありパーティション、後者をカラム名なしパーティションと呼びます。

Partitioningは、where句で読み込み範囲を絞るときに頻繁に使われるカラムをキーとして指定することでスキャンするファイルを限定できるため、パフォーマンスの改善とAWS利用費の削減の点で効果的です。

Bucketingとは

一方、Bucketingとは、データファイルを指定したキーとファイル数で分割します。Bucketingという言葉からS3のバケットを想像しがちですが全く関係ありません。Bucketingは複数のノードで水平分散処理ができるように大きなデータファイルを分割するのに効果的です。

2つの共通点と相違点

共通点は、指定したキーに基づきファイルを分割します。相違点はPartitioningによってスキャンする対象ファイルを限定することが目的であることに対して、Bucketingはファイルを水平分散できるように分割することが目的です。

PartitioningとBucketingのCTASクエリ

Partitioningの例

Athenaのパーティション数は100個に限定されているため、CTASクエリの結果のパーティション化は、予定しているパーティションの数が限られている場合に効果的です。下記の例では、100個以上ありあえないstate(日本なら都道府県)をパーティションキーに指定しています。例えば、where句でstateが「北海道」を指定すると、北海道でデータファイルのみがスキャンされるため、クエリが高速かつスキャン範囲を限定できます。

パーティションキーは、テーブルの最後のカラムでなけばなりません。ワークアラウンドは、SELECT句に任意でカラム名を順に指定して、最後のカラムをパーティションキーに指定します。

CREATE TABLE orders_partitioned 
WITH (
     format = 'TEXTFILE',  
     external_location = 's3://cm-bucket/orders_partitioned/', 
     partitioned_by = ARRAY['state']) 
AS 
SELECT row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,city,state  
FROM cm_db.orders

上記のクエリを実行するとorders_partitionedフォルダの下には以下にようなファイルが生成されます。例えば「state=三重」というフォルダ名の下のファイルは三重県のデータのみが格納されています。

$ aws s3 ls s3://cm-bucket/ --recursive
2018-10-15 18:47:03       4669 orders_partitioned/state=三重/20181015_094657_00006_7ci54_8f651f4c-b2de-46b6-b0a2-242403fb08eb.gz
2018-10-15 18:47:02        758 orders_partitioned/state=三重/20181015_094657_00006_7ci54_d3a8fc36-b6cc-457b-b3f7-be3801206bbd.gz
2018-10-15 18:47:02       1034 orders_partitioned/state=三重/20181015_094657_00006_7ci54_dd504252-b05a-4eed-ab2b-62a98bbbfc09.gz
2018-10-15 18:47:04       1770 orders_partitioned/state=京都/20181015_094657_00006_7ci54_6206fed7-1f72-4d77-848a-ee31b808a72f.gz
2018-10-15 18:47:02        400 orders_partitioned/state=京都/20181015_094657_00006_7ci54_d72900a1-b8f2-453e-aed8-db2f6a2aa457.gz
2018-10-15 18:47:03        255 orders_partitioned/state=京都/20181015_094657_00006_7ci54_e7edef17-2edb-4314-814b-e4d2a39d3adb.gz
:
:
2018-10-15 18:47:04        119 orders_partitioned/state=鹿児島/20181015_094657_00006_7ci54_75d42a00-1c8c-493a-9376-2bc5967a0e1a.gz
2018-10-15 18:47:03        105 orders_partitioned/state=鹿児島/20181015_094657_00006_7ci54_f45014f4-0e35-4d03-b00a-bc5df1405517.gz
2018-10-15 18:47:04        810 orders_partitioned/state=鹿児島/20181015_094657_00006_7ci54_fba665e8-fd78-497f-8d19-4fb87bb6e231.gz

Partitioningは、自動生成されたテーブルの定義にも影響を与えます。パーティション設定したテーブルはPARTITIONED BY句でパーティションキーが指定されています。

CREATE EXTERNAL TABLE `orders_partitioned`(
  `row_id` bigint COMMENT '', 
  `order_id` varchar(255) COMMENT '', 
  `order_date` date COMMENT '', 
  `ship_date` date COMMENT '', 
  `ship_mode` varchar(255) COMMENT '', 
  `customer_id` varchar(255) COMMENT '', 
  `customer_name` varchar(255) COMMENT '', 
  `segment` varchar(255) COMMENT '', 
  `city` varchar(255) COMMENT '')
PARTITIONED BY ( 
  `state` varchar(255) COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://cm-bucket/orders_partitioned'
TBLPROPERTIES (
  'has_encrypted_data'='false')

Bucketingの例

ファイル分割するキーは、bucketed_byで指定します。同じキーのデータは同じファイルに格納されます。ファイルの分割する数は、bucket_countで指定します。bucket_countを調整することで、最適なファイルサイズに調整できます。

CREATE TABLE orders_bucketed 
WITH (
      format = 'TEXTFILE', 
      external_location = 's3://cm-bucket/orders_bucketed/', 
      bucketed_by = ARRAY['city'], 
      bucket_count = 5) 
AS 
SELECT row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,city,state  
FROM cm_db.orders

上記のクエリを実行するとorders_bucketedフォルダの下には以下にようなファイルが生成されます。bucket_count「5」に指定したので、ファイルは5つに分割されています。逆にファイルを纏めたい場合はbucket_count「1」に指定することになります。

$ aws s3 ls s3://cm-bucket/orders_bucketed/ --recursive --profile tableau-labo
2018-10-15 23:20:49      54632 orders_bucketed/20181015_142047_00007_68vrb_bucket-00000.gz
2018-10-15 23:20:49      26120 orders_bucketed/20181015_142047_00007_68vrb_bucket-00001.gz
2018-10-15 23:20:49      28953 orders_bucketed/20181015_142047_00007_68vrb_bucket-00002.gz
2018-10-15 23:20:50      36960 orders_bucketed/20181015_142047_00007_68vrb_bucket-00003.gz
2018-10-15 23:20:49      36646 orders_bucketed/20181015_142047_00007_68vrb_bucket-00004.gz

PartitioningとBucketingの組み合わせ例

PartitioningとBucketingの組み合わせる事が可能です。

CREATE TABLE orders_partitioned_bucketed 
WITH (
      format = 'TEXTFILE', 
      external_location = 's3://cm-bucket/orders_partitioned_bucketed/', 
      partitioned_by = ARRAY['state'], 
      bucketed_by = ARRAY['city'], 
      bucket_count = 1) 
AS 
SELECT row_id,order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,city,state  
FROM cm_db.orders

PartitioningとBucketingの組み合わせると、都道府県ごとにフォルダを分割して、フォルダ内のファイルは1つにまとめています。

$ aws s3 ls s3://cm-bucket/orders_partitioned_bucketed/ --recursive --profile tableau-labo
2018-10-15 23:27:32       5916 orders_partitioned_bucketed/state=三重/20181015_142727_00006_vf3ey_bucket-00000.gz
2018-10-15 23:27:32       2124 orders_partitioned_bucketed/state=京都/20181015_142727_00006_vf3ey_bucket-00000.gz
2018-10-15 23:27:32       1371 orders_partitioned_bucketed/state=佐賀/20181015_142727_00006_vf3ey_bucket-00000.gz
:
:
2018-10-15 23:27:32       1621 orders_partitioned_bucketed/state=鳥取/20181015_142727_00006_vf3ey_bucket-00000.gz
2018-10-15 23:27:33        883 orders_partitioned_bucketed/state=鹿児島/20181015_142727_00006_vf3ey_bucket-00000.gz

パフォーマンス戦略

Partitioningのキーは100以内、かつカラムの値に偏りがないこと

CTASクエリを実行すると、Athenaは結果をAmazon S3の指定された場所に書き込みます。パーティションを指定すると、パーティションが作成され、各パーティションは同じ場所の別々のパーティションフォルダに格納されます。CTASクエリ結果で構成できるパーティションの最大数は100です。

同じ部門のレコードなど、類似の特性を持ち、限られた数の可能な値(組織内の限られた数の部門など)を持つカラムによってデータの分割します。この特性はカーディナリティとして知られています。たとえば、departmentカラムで区切って、このカラムに固有の値の数が限られている場合は、パーティション分割がうまく機能し、クエリの待ち時間が短くなります。

Amazon S3にパーティションを持たせることで、Athenaのクエリパフォーマンスが向上します。これは、特定のパーティションのみをターゲットにしたクエリを実行するのに役立ちます。Athenaはこれらのパーティションのみをスキャンし、クエリコストとクエリ時間を節約します。

Bucketingのキーはカーディナリティが高く、かつカラムの値に偏りがないこと

カーディナリティが高く、値が均等に分散されたカラムでデータをバケット化すると、CTASクエリ結果のBucketingがうまく機能します。

たとえば、timestampデータを格納するカラムには、非常に多数の異なる値が存在する可能性があり、そのデータはデータセット全体に均等に分散されます。つまり、timestamp型データを格納するカラムは値を持つ可能性が高く、NULLを持たないことを意味します。NULLを持たないカラムのデータを多数のファイルに入れることができることを意味します。各ファイルは、Amazon S3に格納されているデータの量とほぼ同じになります。

1つまたは複数のカラムをバケット名として使用して、CTASクエリ結果に任意の数のバケットを指定できます。CTASクエリ結果をバケットするカラムを選択するには、値の高い(高カーディナリティー)カラムを使用し、データをほぼ同じ量のデータを持つ多数のバケットに格納するために分割できます。たとえば、まばらに値が設定されたカラムは、バケット化の候補にはなりません。これは、データの量が少ないバケットと多くのデータを持つバケットで終わるためです。timestampバケット化のための良い候補です。これは、データのカーディナリティが高く、ほぼ等しいチャンクに格納できるためです。

PartitioningとBucketingのキーは目的が異なるので別のキーを指定する

PartitioningとBucketingによるデータを書き込むためのテクニックはお互いを相反しません。通常、バケット作成に使用するカラムは、パーティショニングに使用するカラムとは異なります。

たとえば、データセットにdepartmentカラム、 sales_quarterカラムおよびtsカラム(timestamp型データを格納するための)がある場合は、CTASクエリ結果departmentを sales_quarterこれらのカラムの値の基数が比較的小さいためです。たとえば、限られた数の部署と営業部署があります。また、パーティションの場合、データセットの一部のレコードにnullが設定されているか、これらのカラムに値が割り当てられていないかどうかは関係ありません。重要なのは、同じ部門のデータなど、同じ特性を持つデータがAthenaでクエリできる1つのパーティションにあることです。

同時に、すべてのデータにはtsカラムにtimestamp格納された型の値が格納されているため、同じクエリ結果に対してカラムによってバケットを設定することができますts。このカラムはカーディナリティが高く、そのデータはAmazon S3の多くのバケットに分散して格納できます。逆のシナリオを考えてみましょう。タイムスタンプタイプのデータ用のバケットを作成せずに、特定の日付または時刻の値に対してクエリを実行すると、Amazon S3の単一の場所に格納された非常に大量のデータをスキャンする必要があります。代わりに、日付と時刻に関連する結果を格納するバケットを構成する場合は、バリューを持つバケットをスキャンして照会するだけで、大量のデータをスキャンする長期実行クエリを回避できます。

ファイルサイズは100MB以上1GB未満を推奨

分割されたデータファイルは読み込みが並列で行われ、データブロックがシーケンシャルに読み込まれる場合に、クエリが効率的に実行されます。ただしファイルサイズが非常に小さい場合、特に 128MB 未満の場合には、実行エンジンは S3ファイルのオープン、ディレクトリのリスト表示、オブジェクトメタデータの取得、データ転送のセットアップ、ファイルヘッダーの読み込み、圧縮ディレクトリの読み込み、といった処理に余分な時間がかかります。その一方で、ファイルのサイズが非常に大きいときには、単一のリーダーがファイル全体の読み込みを完了するまで、クエリ自体の処理は行われません。この場合、並列性が下がってしまいます。

最後に

本日ご紹介した Partitioningによってデータのスキャンを削減し、Bucketingによってファイルサイズの最適化によるスキャンの効率化や並列性を向上します。 Amazon Athena のPartitioningのキーは100以内なので、指定できるキーが限られます。しかし、PartitioningとBucketingが非常に簡単かつ素早くできるのは画期的な機能といえます。Amazon Athena のクエリエンジンPrestoはインメモリなのでメモリ不足にならないように配慮してご利用ください。メモリ不足に陥った場合は、キャパシティの調整が可能なAWS GlueやEMRのクエリエンジンSparkのPartitioningとBucketingもユースケースに応じて使い分けることもご検討すると良いでしょう。