S3データを直接クエリ出来る新機能『Amazon Redshift Spectrum』を実際に試してみました

Amazon Redshiftの新機能として『Redshift Spectrum』が発表されました。Amazon S3にあるデータに対して、Amazon Redshiftから直接クエリを投げる事が出来る機能です。本日から利用可能(Generally Available Today)となっていますので、早速試してみました。

【速報】Amazon Redshift:S3のデータを直接検索出来る新機能『Redshift Spectrum』が発表されました! #awssummit

Amazon Redshift Spectrum – Exabyte-Scale In-Place Queries of S3 Data

Amazon Redshift Spectrum とは

Redshift Spectrumは Amazon S3にあるデータに対してAmazon Redshiftから直接クエリを投げる事が出来る機能です。

アーキテクチャ

Redshift Spectrumは、Redshiftクラスタとは独立した専用のAmazon Redshiftサーバーにあります。Redshift Spectrumは、条件フィルタリングや集約など、多くの計算中心型タスクをRedshift Spectrumレイヤーにプッシュするので、Redshift Spectrumクエリはクラスタの処理能力を大幅に削減します。Redshift Spectrumはインテリジェントにスケールします。Redshift Spectrumは、クエリの要求に基づいて、大量の並列処理を利用するために数千ものインスタンスを潜在的に使用できます。

メタデータの管理

Redshift Spectrumテーブルを作成するには、ファイルの構造を定義し、外部データカタログにテーブルとして登録します。外部データカタログは、Amazon Athenaに付属のデータカタログまたは独自のApache Hiveメタストアのいずれかになります。外部テーブルを作成および管理するには、Amazon Redshiftからデータ定義言語(DDL)コマンドを使用するか、外部データカタログに接続する他のツールを使用します。外部データカタログへの変更は、Amazon Redshiftクラスタですぐに利用できます。

パーティショニング

必要に応じて、外部テーブルを1つまたは複数の列にパーティション化できます。Amazon Redshiftクエリオプティマイザはクエリのデータを含まないパーティションを除外するため、外部テーブルの一部としてパーティションを定義するとパフォーマンスが向上します。

クエリー

Redshift Spectrumテーブルが定義されたら、ほかのAmazon Redshiftテーブルと同様に、テーブルを照会して結合することができます。Amazon Redshiftは外部テーブルに対する更新操作をサポートしていません。Redshift Spectrumテーブルを複数のAmazon Redshiftクラスタに追加し、同じリージョン内の任意のクラスタからAmazon S3の同じデータをクエリできます。Amazon S3データファイルを更新すると、そのデータはAmazon Redshiftクラスタからのクエリですぐに利用できます。

料金

クエリ処理中にS3から取得されたデータの量に基づいており、テラバイトあたり5ドルの料金で請求されます(データを圧縮したり、列指向の形式で保存してお金を節約できます)。Redshiftクラスタを実行してS3にデータを保存するには通常の料金を支払うが、クエリを実行していないときは料金はかかりません。

Spectrum の機能

Amazon Redshift Spectrumは、外部テーブルを使用してAmazon S3に格納されているデータをクエリします。他のAmazon Redshiftテーブルで使用するものと同じSELECT構文を使用して、外部テーブルを照会することができます。外部テーブルは読取り専用です。外部テーブルに書き込むことはできません。

Redshift または Athena または Hiveメタストアによる外部スキーマ・外部テーブルの管理

Amazon Redshift または Athena または Hiveメタストアで外部テーブルを作成できます。Athenaのテーブルを作成する方法については、Getting Started を参照してください。Hiveメタストアの使用方法については、Amazon EMRのドキュメントにあるApache Hiveを参照してください。

外部テーブルがAthenaまたはHiveメタストアで定義されている場合、まず外部データベースを参照する外部スキーマを作成します。次に、SELECT文で外部表を参照するには、表名の前にスキーマ名を付けます。詳細については、Creating External Schemas for Amazon Redshift Spectrum を参照してください。

Amazon Redshiftは、AthenaのデータカタログとAmazon S3のデータファイルにアクセスするための認証が必要です。この承認を提供するには、IAMロールを作成し、そのロールをクラスタに添付して、外部スキーマを作成する時のCREATE EXTERNAL SCHEMAステートメントでロールARNを指定します。認可の詳細については、Amazon Redshift SpectrumのIAM Policy を参照してください。

ファイル形式

Redshift Spectrumは、以下の構造化データフォーマットおよび半構造化データフォーマットをサポートしています。

  • PARQUET
  • TEXTFILE
  • SEQUENCEFILE
  • RCFILE

Parquetなどのカラムナストレージファイル形式を使用することをお勧めします。カラム型ストレージファイルフォーマットを使用すると、必要なカラムだけを選択して、Amazon S3からのデータ転送を最小限に抑えることができます。

圧縮

ストレージスペースを削減し、パフォーマンスを向上させ、コストを最小限に抑えるために、データファイルを圧縮することを強くお勧めします。Amazon Redshift Spectrumは、次の圧縮タイプをサポートしています。

  • gzip
  • bz2
  • snappy (PARQUET)

暗号化

Redshift Spectrumは、次の暗号化オプションを使用して暗号化されたデータファイルを透過的に復号化します。

  • Amazon S3によって管理されるAES-256暗号化キーを使用するサーバー側の暗号化(SSE-S3)
  • デフォルト鍵を使用するAWS鍵管理サービス(SSE-KMS)によって管理される鍵によるサーバ側暗号化

Spectrumの設定・利用手順

今回は、Getting Started with Amazon Redshift Spectrum の手順に従い、Amazon Redshiftからデータ定義言語(DDL)コマンドを使用してテーブルを定義してみたいと思います。

ステップ1:Amazon Redshift 用の IAM ロールを作成する

バックエンドではAmazon Athenaと連携しているので、Amazon Athenaの外部データカタログとAmazon S3のデータファイルにアクセス持つIAMロールを作成してRedshiftクラスタに付与、もしくは権限を追加する必要があります。Amazon Redshiftでロールを使用する方法の詳細については、「IAMロールを使用したCOPYおよびUNLOAD操作の承認」を参照してください。

ここでは mySpectrumRole というロール名に、AmazonS3ReadOnlyAccess と AmazonAthenaFullAccessの2つのマネージドポリシーを付与しています。

20170420-Step1

ステップ2:IAM ロールをクラスタに関連付ける

Amazon Redshiftが外部データカタログとAmazon S3にアクセスすることを許可するIAMロールを作成したら、そのロールをAmazon Redshiftクラスタに関連付けます。ステップ1で作成した IAMロールの ロール ARN は外部テーブルを定義する際に利用します。

20170420-Step2

AWS Management Consoleにサインインし、Amazon Redshiftコンソール を開き、ステップ1で作成したmySpectrumRole を選択します。

20170420-Step2-attachedrole

ステップ3:外部スキーマと外部テーブルの作成

外部テーブルは外部スキーマに作成する必要がありますので、外部スキーマと外部テーブルを作成します。外部スキーマは外部データカタログ内のデータベースを参照し、あなたの代わりにAmazon S3にアクセスするためにクラスタを認可する IAMロールARN を提供します。Amazon Redshiftで、またはAmazon EMRなどのAmazon AthenaまたはApache Hiveメタストアを使用して、外部データベースを作成できます。この例では、外部スキーマを作成するときにAmazon Redshiftで外部データベースを作成します。

外部テーブルを作成すると、Amazon Redshiftで列を定義できます。AthenaのHiveデータ定義言語(DDL)やAmazon EMRなどのHiveメタストアを使用して定義することもできます。この例では、Amazon RedshiftのCREATE EXTERNAL TABLE定義内で列とデータ型を指定します。詳細については、Creating External Schemas for Amazon Redshift Spectrum を参照してください。

外部スキーマの作成

外部スキーマを作成するには、CREATE EXTERNAL SCHEMAコマンドを実行します。次のコマンドのIAMロールARNを、手順1で作成したロールARNに置き換えてから、SQLクライアントでコマンドを実行します。

cmdb=# create external schema spectrum
cmdb-# from data catalog
cmdb-# database 'spectrumdb'
cmdb-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole'
cmdb-# create external database if not exists;
INFO: External database "spectrumdb" created
CREATE SCHEMA

外部テーブルの作成

外部テーブルを作成するには、CREATE EXTERNAL TABLEコマンドを実行します。
cmdb=# create external table spectrum.sales(
cmdb(# salesid integer,
cmdb(# listid integer,
cmdb(# sellerid integer,
cmdb(# buyerid integer,
cmdb(# eventid integer,
cmdb(# dateid smallint,
cmdb(# qtysold smallint,
cmdb(# pricepaid decimal(8,2),
cmdb(# commission decimal(8,2),
cmdb(# saletime timestamp)
cmdb-# row format delimited
cmdb-# fields terminated by '\t'
cmdb-# stored as textfile
cmdb-# location 's3://awssampledb/tickit/spectrum/sales/';
CREATE EXTERNAL TABLE

psqlの\dコマンドでは、外部スキーマのテーブル名一覧や外部テーブルの定義は参照できませんでした。

cmdb=# \d spectrum.sales
Did not find any relation named "spectrum.sales".

pg_external_schemaは、外部スキーマ名の一覧を取得できます。

cmdb=# select * FROM pg_external_schema pe join pg_namespace pn ON pe.esoid = pn.oid;
 esoid  | eskind |  esdbname  |                                esoptions                                | nspname  | nspowner | nspacl
--------+--------+------------+-------------------------------------------------------------------------+----------+----------+--------
 707623 |      1 | spectrumdb | {"IAM_ROLE":"arn:aws:iam::123456789012:role/mySpectrumRole"} | spectrum |      100 |
(1 row)

また、Athenaのコンソールを参照すると、自動的に外部スキーマがAthenaのデータベース名、外部テーブルがAthenaのテーブルとして登録されていることが確認できました。

20170420-Step2-athena

ステップ4:Amazon S3でデータを照会する

外部テーブルが作成されたら、他のAmazon Redshift表を照会するために使用するのと同じSELECTステートメントを使用して外部表を照会することができます。これらのSELECT文のクエリには、テーブルの結合、データの集計、および条件によるフィルタリングが含まれます。

SPECTRUM.SALESテーブルの行数を取得します。


Timing is on.
cmdb=# select count(*) from spectrum.sales;
 count
--------
 172462
(1 row)

Time: 903.104 ms

 

ベストプラクティスは、Amazon S3で大きなファクトテーブルを保持し、Amazon Redshiftで小さなディメンションテーブルを保持することです。Amazon Redshift入門で提供されているサンプルデータをにEVENTという名前のテーブルを作成します。


cmdb=# create table event(
cmdb(# eventid integer not null distkey,
cmdb(# venueid smallint not null,
cmdb(# catid smallint not null,
cmdb(# dateid smallint not null sortkey,
cmdb(# eventname varchar(200),
cmdb(# starttime timestamp);
CREATE TABLE

次の COPY コマンドの IAM ロール ARN を、手順1で作成したロールARNに置き換えて、EVENTテーブルをロードします。

cmdb=# copy event from 's3://awssampledb/tickit/allevents_pipe.txt'
cmdb-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole'
cmdb-# delimiter '|'
cmdb-# timeformat 'YYYY-MM-DD HH:MI:SS'
cmdb-# region 'us-east-1';
INFO: Load into table 'event' completed, 8798 record(s) loaded successfully.
COPY
Time: 21852.458 ms

次の例では、外部テーブル SPECTRUM.SALES をローカル表EVENTに結合し、上位10個のイベントの合計売上高を求めます。


cmdb=# select top 10 event.eventname, sum(spectrum.sales.pricepaid) from spectrum.sales, event
cmdb-# where spectrum.sales.eventid = event.eventid
cmdb-# group by eventname
cmdb-# order by 2 desc;
eventname | sum
------------------+------------
Mamma Mia! | 1135454.00
Spring Awakening | 972855.00
The Country Girl | 910563.00
Macbeth | 862580.00
Jersey Boys | 811877.00
Legally Blonde | 804583.00
Chicago | 790993.00
Spamalot | 714307.00
Hedda Gabler | 661018.00
Thurgood | 639877.00
(10 rows)

Time: 6148.114 ms

前のクエリのクエリプランを表示します。Amazon S3のデータに対して実行されたS3 Seq Scan、S3 HashAggregate、およびS3 Query Scan手順に注意してください。 Amazon Redshiftは外部表の統計を更新しないため、外部表を含む問合せ計画では"Tables missing statistics"というメッセージが表示されます。 クエリ実行プランは、外部テーブルが大きいテーブルであり、ローカルテーブルが小さいテーブルであるという前提に基づいて生成されます。

cmdb=# explain
cmdb-# select top 10 event.eventname, sum(spectrum.sales.pricepaid)
cmdb-# from spectrum.sales, event
cmdb-# where spectrum.sales.eventid = event.eventid
cmdb-# group by eventname
cmdb-# order by 2 desc;
                                                                                       QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Limit  (cost=1003290360137.83..1003290360137.86 rows=10 width=29)
   ->  XN Merge  (cost=1003290360137.83..1003290360139.27 rows=576 width=29)
         Merge Key: sum(sales.pricepaid)
         ->  XN Network  (cost=1003290360137.83..1003290360139.27 rows=576 width=29)
               Send to leader
               ->  XN Sort  (cost=1003290360137.83..1003290360139.27 rows=576 width=29)
                     Sort Key: sum(sales.pricepaid)
                     ->  XN HashAggregate  (cost=3290360109.99..3290360111.43 rows=576 width=29)
                           ->  XN Hash Join DS_BCAST_INNER  (cost=109.98..3240360109.99 rows=10000000000 width=29)
                                 Hash Cond: ("outer".eventid = "inner".eventid)
                                 ->  XN S3 Query Scan sales  (cost=0.00..200000000.00 rows=10000000000 width=16)
                                       ->  S3 Seq Scan spectrum.sales location:"s3://awssampledb/tickit/spectrum/sales" format:TEXT  (cost=0.00..100000000.00 rows=10000000000 width=16)
                                 ->  XN Hash  (cost=87.98..87.98 rows=8798 width=21)
                                       ->  XN Seq Scan on event  (cost=0.00..87.98 rows=8798 width=21)
 ----- Tables missing statistics: spectrum_sales -----
 ----- Update statistics by running the ANALYZE command on these tables -----
(16 rows)

ステップ5:おまけ

ステップ4では、Spectrumとの組み合わせたときのベストプラクティスを紹介しましたが、実行プランで再分散(DS_BCAST_INNER)が時発生していますので改善を試みます。


cmdb=# select count(*) from event;
count
-------
8798
(1 row)

Time: 1956.364 ms
cmdb=# select count(distinct(eventid)) from event;
count
-------
8798
(1 row)

データを確認したところ、eventidがユニークなので、eventのテーブル定義はeventidを主キー、再分散が発生していたので分散タイプALLに見直します。

cmdb=# drop table event;
DROP TABLE
Time: 362.221 ms
cmdb=# create table event(
cmdb(# eventid integer not null,
cmdb(# venueid smallint not null,
cmdb(# catid smallint not null,
cmdb(# dateid smallint not null sortkey,
cmdb(# eventname varchar(200),
cmdb(# starttime timestamp,
cmdb(# primary key (eventid)
cmdb(# )
cmdb-# diststyle all
cmdb-# ;
CREATE TABLE
Time: 347.209 ms
cmdb=# copy event from 's3://awssampledb/tickit/allevents_pipe.txt'
cmdb-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole'
cmdb-# delimiter '|'
cmdb-# timeformat 'YYYY-MM-DD HH:MI:SS'
cmdb-# region 'us-east-1';
INFO: Load into table 'event' completed, 8798 record(s) loaded successfully.
COPY
Time: 16159.887 ms

cmdb=# select top 10 event.eventname, sum(spectrum.sales.pricepaid) from spectrum.sales, event
cmdb-# where spectrum.sales.eventid = event.eventid
cmdb-# group by eventname
cmdb-# order by 2 desc;
eventname | sum
------------------+------------
Mamma Mia! | 1135454.00
Spring Awakening | 972855.00
The Country Girl | 910563.00
Macbeth | 862580.00
Jersey Boys | 811877.00
Legally Blonde | 804583.00
Chicago | 790993.00
Spamalot | 714307.00
Hedda Gabler | 661018.00
Thurgood | 639877.00
(10 rows)

Time: 4792.301 ms

cmdb=# explain
cmdb-# select top 10 event.eventname, sum(spectrum.sales.pricepaid)
cmdb-# from spectrum.sales, event
cmdb-# where spectrum.sales.eventid = event.eventid
cmdb-# group by eventname
cmdb-# order by 2 desc;
                                                                                          QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Limit  (cost=1000150007638.33..1000150007638.36 rows=10 width=44)
   ->  XN Merge  (cost=1000150007638.33..1000150007639.77 rows=576 width=44)
         Merge Key: sum(sales.derived_col1)
         ->  XN Network  (cost=1000150007638.33..1000150007639.77 rows=576 width=44)
               Send to leader
               ->  XN Sort  (cost=1000150007638.33..1000150007639.77 rows=576 width=44)
                     Sort Key: sum(sales.derived_col1)
                     ->  XN HashAggregate  (cost=150007610.48..150007611.92 rows=576 width=44)
                           ->  XN Hash Join DS_DIST_ALL_NONE  (cost=150000109.97..150006610.48 rows=200000 width=44)
                                 Hash Cond: ("outer".derived_col2 = "inner".eventid)
                                 ->  XN S3 Query Scan sales  (cost=150000000.00..150002000.50 rows=200000 width=31)
                                       ->  S3 HashAggregate  (cost=150000000.00..150000000.50 rows=200000 width=16)
                                             ->  S3 Seq Scan spectrum.sales location:"s3://awssampledb/tickit/spectrum/sales" format:TEXT  (cost=0.00..100000000.00 rows=10000000000 width=16)
                                 ->  XN Hash  (cost=87.98..87.98 rows=8798 width=21)
                                       ->  XN Seq Scan on event  (cost=0.00..87.98 rows=8798 width=21)
 ----- Tables missing statistics: spectrum_sales -----
 ----- Update statistics by running the ANALYZE command on these tables -----
(17 rows)

以上で、再分散が回避され性能が改善しました。

まとめ

SpectrumはS3のデータに対して既存のRedshiftのテーブルと組み合わせてクエリーが実行できることが確認できました。Spectrumは条件フィルタリングや集約など、多くの計算中心型タスクを既存のRedshiftとは別のRedshift Spectrumにオフロードできるので既存クラスタの処理負荷の削減が期待できます。一方、クエリ実行プランは、外部テーブルが大きいテーブルであり、ローカルテーブルが小さいテーブルであるという前提に基づいて生成されるという挙動についてはチューニングの観点で抑えておいた方が良いでしょう。

これまでは、Redshiftでデータ変換「ELT」する場合は、データのロードと変換後のテーブルのINSERTが必要でしたが、SpectrumでELT済みデータを直接RedshiftにINSERTすることができるようになり、処理時間の短縮やIOの削減などの期待ができるので私も使い倒したいと思います。