Amazon RedshiftからAmazon Redshift Spectrumにマイグレートするツールを公開しました

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Amazon RedshiftのテーブルとSpectrumレイヤの外部テーブルでは、持ち方やDDL構文が全く異なり、いざ移行を試みると煩雑な作業に多くの時間を要してハードルの高さを感じてしまいます。そこで、この問題を解決すべく、Amazon Redshiftのテーブル定義情報からAmazon Redshift Spectrumのテーブルを作成するDDLを自動生成するツールを作成し、公開するに至りました。今回はこのツールを用いてマイグレートとする方法について解説します。

GitHub - aws-datalake-migration/ddl/v_generate_redshift_spectrum_ddl.sql

Amazon Redshift Spectrumにマイグレートする目的

プレディケイトプッシュダウンによるスループットの向上

Amazon Redshift Spectrum は、S3上のファイルに対してWHEREGROUP BYなどの処理を効率的に行うための手法である「プレディケイトプッシュダウン」をサポートしています。プレディケイトプッシュダウンとは、S3上のファイルに対してSpectrumレイヤの各ワーカー内があらかじめWHEREGROUP BYをしておき、その結果を集約ワーカーに転送する、といったプロセスをとります。各ワーカーで事前に集約を行うことでデータの転送コストが下がり、結果的にパフォーマンスが向上します。

Redshiftのストレージの開放

これまでクエリを実行するにはRedshift内のテーブルにデータを事前にロードしなければならず、利用頻度にかかわらずストレージを専有していました。大容量で参照頻度の少ないテーブルのみをRedshift Spectrumに移行することで大きな変更を加えずストレージの開放が可能です。Redshiftのサイジングは、コンピューティングキャパシティとストレージキャパシティの2点で、ノードタイプとそのノード数を選定していましたが、Redshift Spectrumへの移行によってストレージキャパシティが緩和されます。これまでストレージキャパシティを満たすために選定していたDS2(ストレージ重視型ノード:HDD)からDC2(コンピューティング重視型ノード:NVMe-SSD)にサイジングという選択が増え、予算の範囲内でスループットの向上やコスト最適化が可能になります。

RedshiftのELTからGlueのETLに移行する

データがS3のファイルベースになることで、Redshiftのコンピューティングリソースを使うELTから、GlueのスケーラブルなETLに移行できます。

ロードレスなリアルタイム分析

IoTデバイスのセンサーデータやアプリのストリームデータは、Kinesis Data Firehose等を経由して、ハブストレージであるS3に出力されます。Reshift Spectrumであれば、ロードすることなく、S3上のデータに対して直接クエリを実行できます。

移行後もRedshiftのテーブルとシームレスに結合、従来通りクエリの実行が可能

フィルタや集約といったクエリのシンタックスは変わりませんので、従来のクエリに大きな変更を加えず実行可能です。Redshift Spectrumに移行した一部のテーブルとRedshift内のテーブル間もシームレスに結合(JOIN)できます。ビューを作成して利用者にこの違いを意識させないことも可能です。

マイグレートの手順

  1. Amazon Redshift Spectrumのテーブル作成用ビューの登録
  2. S3にデータファイルを配置する
  3. Redshift SpectrumにDBを作成する
  4. Redshift Spectrumにテーブルを作成する
  5. 作成したテーブルにパーティションを設定する
  6. クエリの実行

1. Amazon Redshift Spectrumのテーブル作成用ビューの作成

最初にAmazon Redshiftのテーブル定義情報からAmazon Redshift Spectrumのテーブルを作成するためのビューをRedshiftに作成します。なお、ビューを登録しないで実行したい場合はCREATE OR REPLACE VIEW v_generate_redshift_spectrum_ddl ASをコメントにしてそのまま実行すると全てのテーブルのDDLが生成します。

--DROP VIEW v_generate_redshift_spectrum_ddl;
/**********************************************************************************************
Purpose: View to get the Amazon Redshift Spectrum DDL for a table.
         This will contain the S3 LOCATION, TBLPROPERTIES(for AWS Glue), etc.
Notes:   
         The following filters are useful:
           where tablename in ('t1', 't2')     -- only get DDL for specific tables
           where schemaname in ('s1', 's2')    -- only get DDL for specific schemas
         So for example if you want to order DDL on tablename and only want the tables 't1', 't2'
         and 't4' you can do so by using a query like:
           select ddl from (
           ) where tablename in ('t1', 't2', 't4');
History:
2018-01-26 Satoru Ishikawa Created
**********************************************************************************************/
CREATE OR REPLACE VIEW v_generate_redshift_spectrum_ddl
AS
SELECT 
  REGEXP_REPLACE (schemaname, '^zzzzzzzz', '') AS schemaname, 
  REGEXP_REPLACE (tablename, '^zzzzzzzz', '') AS tablename, 
  seq, 
  ddl 
FROM 
  (
    SELECT 
      schemaname, 
      tablename, 
      seq, 
      ddl 
    FROM 
      (
        --DROP TABLE
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          0 AS seq, 
          '-- DROP TABLE ' + QUOTE_IDENT(n.nspname || '_db') + '.' + QUOTE_IDENT(c.relname) + ';' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --CREATE TABLE
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          2 AS seq, 
          'CREATE EXTERNAL TABLE ' + QUOTE_IDENT(n.nspname || '_db') + '.' + QUOTE_IDENT(c.relname) + '' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --OPEN PAREN COLUMN LIST
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          5 AS seq, 
          '(' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --COLUMN LIST
        UNION 
        SELECT 
          schemaname, 
          tablename, 
          seq, 
          '\t' + col_delim + col_name + ' ' + col_datatype AS ddl 
        FROM 
          (
            SELECT 
              n.nspname AS schemaname, 
              c.relname AS tablename, 
              100000000 + a.attnum AS seq, 
              CASE WHEN a.attnum > 1 THEN ',' ELSE '' END AS col_delim, 
              QUOTE_IDENT(a.attname) AS col_name, 
              CASE 
                WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER VARYING'
                ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER VARYING', 
                'VARCHAR'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER', 
                'CHAR'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'NUMERIC'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'NUMERIC', 
                'DECIMAL'
              ) ELSE UPPER(
                format_type(a.atttypid, a.atttypmod)
              ) END AS col_datatype
            FROM 
              pg_namespace AS n 
              INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
              INNER JOIN pg_attribute AS a ON c.oid = a.attrelid 
              LEFT OUTER JOIN pg_attrdef AS adef ON a.attrelid = adef.adrelid 
              AND a.attnum = adef.adnum 
            WHERE 
              c.relkind = 'r' 
              AND a.attnum > 0 
            ORDER BY 
              a.attnum
          ) 
        --CLOSE PAREN COLUMN LIST
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          299999999 AS seq, 
          ')' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --END SEMICOLON
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          600000000 AS seq, 
          '-- PARTITIONED BY (col_name data_type [, … ])\n' ||
          'ROW FORMAT DELIMITED\n' ||
          '  FIELDS TERMINATED BY ''\\t''\n' ||
          'STORED AS TEXTFILE\n' ||
          '  LOCATION ''s3://bucket/folder/''\n' ||
          'TABLE PROPERTIES (\n' ||
          '--  ''CrawlerSchemaDeserializerVersion''=''1.0'',\n' ||
          '--  ''CrawlerSchemaSerializerVersion''=''1.0'',\n' ||
          '--  ''UPDATED_BY_CRAWLER''=''my_crawler'',\n' ||
          '--  ''averageRecordSize''=''100'',\n' ||
          '  ''classification''=''csv'',\n' ||
          '--  ''columnsOrdered''=''true'',\n' ||
          '--  ''compressionType''=''gzip'',\n' ||
          '  ''delimiter''=''\\t'',\n' ||
          '--  ''objectCount''=''1000'',\n' ||
          '--  ''recordCount''=''10000000000'',\n' ||
          '--  ''sizeKey''=''1000000000000'',\n' ||
          '--   ''skip.header.line.count''=''1'',\n' ||
          '  ''typeOfData''=''file'')\n' ||
          ';' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r'
      ) 
    WHERE 1=1
--       AND schemaname in ('s1', 's2') -- listed Traget schemas. 
--       AND tablename  in ('t1', 't2') -- listed Traget tables. 
    ORDER BY 
      schemaname, 
      tablename, 
      seq
  );

2. S3にデータファイルを配置する

データファイルがすでにある場合

Redshiftにロードした元のデータがS3上にある場合は、そのファイルをS3のフォルダの下にファイルを配置します。データファイルは圧縮形式(.gz等)で保存することを推奨します。

  • 1つのS3フォルダにすべてのデータを配置したい場合は全てのデータファイルをS3フォルダの下に単純にコピーするので構いません。
  • パーティション毎にS3フォルダにデータファイルが格納したい場合、例えば、accesslog-20180125.tsv.gzのようにファイルに日付毎にファイルが分かれているのであれば日付毎でフォルダを分割して保存します。

データがRedshift上に存在する場合

ELT済みのデータなどRedshift上にデータが存在する場合は、RedshiftのUNLOADコマンドでLOCATIONに指定したS3のフォルダの下にファイルを配置します。データファイルは圧縮形式(GZIP等)で保存することを推奨します。

  • データが少ないのであれば、パーティション不要なので単一ファイルでアンロードします。
  • データが多いのであれば、パーティション毎にフォルダを作成してその下にファイルをアンロードします。

参考: カラムあり/カラムなしパーティション形式

※ パーティションとは、指定したパーティションキー毎にデータファイルをフォルダに分けて格納して管理することです。クエリの条件にパーティションキーを指定したときに一致したフォルダのデータをスキャンするだけで済むので、無駄なスキャンをスキップして高速かつ利用費削減できます。

3. Redshift Spectrumに外部スキーマを作成する

Redshiftのスキーマとテーブルの関係と同様に、後述する外部テーブルを作成する際に指定する外部スキーマを作成します。ちょっとややこしいいのですが、この外部スキーマは、Amazon AthenaやAWS Glueからは「DB」として参照できます。Redshift Spectrumの外部スキーマ名と既存のRedshiftのスキーマ名は名前が重複しないように別の名前を設定します。上記のツールが生成するRedshift Spectrumの外部スキーマ名は、元になるRedshiftのスキーマ名の最後に_dbを付けた文字列をDB名として生成します。例えば、マイグレーション元であるRedshiftのスキーマ名がssbgzなので、Redshift Spectrumで作成する外部スキーマ名はssbgz_dbとします。

Redshift Spectrumのコンソールから以下のDDLを実行します。このDDLは、Redshift上にssbgz_dbという外部スキーマの作成と、Glueデータカタログ上にssbgz_dbというDBを作成しています。そして、Redshiftの外部スキーマssbgz_dbとGlueデータカタログのDBssbgz_dbがリンクしていることを定義しています。

cmdb=# create external schema ssbgz_db
cmdb-# from data catalog
cmdb-# database 'ssbgz_db'
cmdb-# iam_role 'arn:aws:iam::123456789012:role/AmazonRedshiftSpectrumRole'
cmdb-# create external database if not exists;
INFO:  External database "ssbgz_db" created
CREATE SCHEMA

4. Redshift Spectrumにテーブルを作成する

Redshiftのテーブル定義情報から自動生成したAmazon Redshift Spectrumのテーブル作成SQLを用いてテーブルを作成します。

Amazon Redshift Spectrumのテーブル作成用SQLの生成

そのまま実行すると全てのテーブルのDDLが生成しますが、必要なテーブルが明らかな場合は、スキーマ名やテーブル名を指定してください。

例えば、スキーマ名はssbgz、テーブル名はlineorderの場合:

cmdb=# select ddl from v_generate_athena_ddl where schemaname='ssbgz' and tablename='lineorder';
                          ddl
-------------------------------------------------------
 -- DROP TABLE ssbgz_db.lineorder;
 CREATE EXTERNAL TABLE ssbgz_db.lineorder
 (
         lo_orderkey INTEGER
         ,lo_linenumber INTEGER
         ,lo_custkey INTEGER
         ,lo_partkey INTEGER
         ,lo_suppkey INTEGER
         ,lo_orderdate INTEGER
         ,lo_orderpriority VARCHAR(15)
         ,lo_shippriority VARCHAR(1)
         ,lo_quantity INTEGER
         ,lo_extendedprice INTEGER
         ,lo_ordertotalprice INTEGER
         ,lo_discount INTEGER
         ,lo_revenue INTEGER
         ,lo_supplycost INTEGER
         ,lo_tax INTEGER
         ,lo_commitdate INTEGER
         ,lo_shipmode VARCHAR(10)
 )
 -- PARTITIONED BY (col_name data_type [, … ])        +
 ROW FORMAT DELIMITED                                 +
   FIELDS TERMINATED BY '\t'                          +
 STORED AS TEXTFILE                                   +
   LOCATION 's3://bucket/folder/'                     +
 TABLE PROPERTIES (                                   +
 --  'CrawlerSchemaDeserializerVersion'='1.0',        +
 --  'CrawlerSchemaSerializerVersion'='1.0',          +
 --  'UPDATED_BY_CRAWLER'='my_crawler',               +
 --  'averageRecordSize'='100',                       +
   'classification'='csv',                            +
 --  'columnsOrdered'='true',                         +
 --  'compressionType'='gzip',                        +
   'delimiter'='\t',                                  +
 --  'objectCount'='1000',                            +
 --  'recordCount'='10000000000',                     +
 --  'sizeKey'='1000000000000',                       +
 --   'skip.header.line.count'='1',                   +
   'typeOfData'='file')                               +
 ;
(22 rows)

Amazon Redshift Spectrumのテーブル作成用SQLのカスタマイズ

S3に格納されているデータファイルは、TSVファイル形式を前提に生成しています。パーティション設定、データファイルの場所、TBLPROPERTIESの設定等は内容に応じて、設定が必要です。

PARTITION 設定

データを格納しているフォルダが単一フォルダの場合は指定は不要です。コメントのままでも構いません。

一方、データを格納しているフォルダがパーティション構成の場合は、パーティションのキーとなるカラム名とデータ型を指定してください。例えば、カラム名がp_date、データ型がDATEの場合は以下のように指定します。

PARTITIONED BY (p_date DATE)
ROW FORMAT DELIMITED 設定

TSVファイル形式を前提なので、そのままで構いません。S3ファイルの区切り文字(delimiter)を指定します。TSVなので\tを指定しています。CSVの場合は,を指定してください。

STORED AS TEXTFILE 設定

LOCATIONは、S3ファイルを格納しているフォルダのURLを指定します。S3のファイルのURLやS3のプレフィックスを指定するとエラーになります。

LOCATION
   's3://bucket/folder/'
TABLE PROPERTIES 設定

キーバリュー形式でプロパティを設定します。コメントインしているプロパティは設定を推奨するプロパティです。property_nameproperty_valueは、ともにシングルクォーテーションで括ります。

property_name property_value 必須 補足
CrawlerSchemaDeserializerVersion 1.0 現状は常に`1.0`指定されている。
CrawlerSchemaSerializerVersion 1.0 現状は常に`1.0`指定されている。
UPDATED_BY_CRAWLER my_crawler AWS Glueのクローラの名前を指定します。
averageRecordSize 215 レコード長の平均値。(sizeKey ÷ recordCount)
classification csv ファイルのフォーマットタイプを指定します。AWS Glueのデータソースとして参照する場合は必須項目です。
columnsOrdered true classificationが`csv`の場合は`true`指定します。
compressionType gzip 圧縮タイプを指定する。非圧縮の場合は`none`を指定します。
delimiter \t classificationが`csv`の区切り文字を指定します。ここではTab区切りなので`\t`を指定します。
objectCount 8 S3のファイル数を指定します。
recordCount 10766 レコード数を指定します。Redshiftのクエリオプティマイザが利用するので概算値でも良いので指定することを推奨します。
sizeKey 2314879 ファイルのバイト数
skip.header.line.count 1 スキップするファイルの先頭行の数を指定します。
typeOfData file データタイプとして`file`が指定されています。
補足:コストベースオプティマイザの考慮

Redshift Spectrmは、Redshiftのコストベースオプティマイザを活用(Dive deep to Amazon Redshift Spectrum: Now query exabytes of data in S3のP.29)と記載がありますので、一般的な統計情報で必要となるrecordCountなどを設定したほうが、より最適な実行プランが得られる可能性があります。現状は、AWS GlueのクローラでDDLを自動生成すると、TABLE PROPERTIES プロパティの殆どに値が設定されます。しかし、実行プランのrowsは常に10000000000と解釈され、recordCountを参照しているようには見えませんでしたので、上記の表では「必須」にしていません。

cmawsteamdb=# explain SELECT * FROM ssbgz_db.lineorder limit 1;
                                                                                        QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Limit  (cost=0.00..0.02 rows=1 width=136)
   ->  XN S3 Query Scan lineorder_test_test_xx  (cost=0.00..200000000.00 rows=10000000000 width=136)
         ->  S3 Seq Scan ssbgz_db.lineorder location:"s3://bucket/ssbgz/lineorder" format:TEXT  (cost=0.00..100000000.00 rows=10000000000 width=136)
(3 rows)

テーブル作成DDLを実行する

カスタマイズ後のDDLは以下のとおりです。このDDLをRedshift Spectrumのコンソールから実行します。

-- DROP TABLE ssbgz_db.lineorder;
CREATE EXTERNAL TABLE ssbgz_db.lineorder
(
        lo_orderkey INTEGER
        ,lo_linenumber INTEGER
        ,lo_custkey INTEGER
        ,lo_partkey INTEGER
        ,lo_suppkey INTEGER
        ,lo_orderdate INTEGER
        ,lo_orderpriority VARCHAR(15)
        ,lo_shippriority VARCHAR(1)
        ,lo_quantity INTEGER
        ,lo_extendedprice INTEGER
        ,lo_ordertotalprice INTEGER
        ,lo_discount INTEGER
        ,lo_revenue INTEGER
        ,lo_supplycost INTEGER
        ,lo_tax INTEGER
        ,lo_commitdate INTEGER
        ,lo_shipmode VARCHAR(10)
)
PARTITIONED BY (p_date DATE)
ROW FORMAT DELIMITED                                 
  FIELDS TERMINATED BY '\t'                          
STORED AS TEXTFILE                                   
  LOCATION 's3://bucket/folder/'  
TABLE PROPERTIES (                                   
  'classification'='csv',                            
  'columnsOrdered'='true',                         
  'compressionType'='gzip',                        
  'delimiter'='\t',                                  
  'typeOfData'='file')                               
;

5. 作成したテーブルにパーティションを設定する

パーティション形式でデータファイルを分割していない場合、この作業は不要です。パーティション形式にデータファイルを分割している場合はパーティションを設定します。DDLでパーティションを指定しているのに以下の設定を忘れるとデータが全く見えない状態になりますのでご注意ください。

  • カラムありパーティション
    • カラムありパーティション形式にデータファイルを配置していれば、以下のコマンドを実行するとLOCATIONに指定したパスをスキャンしてパーティション設定を自動更新できます。

現在、Redshiftからこのコマンド実行はサポートされていません。このコマンドを実行するにはAthenaのコンソールからコマンド実行します。

MSCK REPAIR TABLE <テーブル名>;

自動化するのであれば、AWSCLIから実行することも可能です。

aws athena start-query-execution \
--query-string "MSCK REPAIR TABLE ssbgz_db.lineorder;" \
--result-configuration OutputLocation=s3://mybucket/athena_staging_dir 
{
    "QueryExecutionId": "953a5f3f-0e87-44e9-9a94-4aee4aa0cd2e"
}
  • カラムなしパーティション
    • カラムなしパーティション形式はパーティション毎に以下のコマンドを実行してパーティションを設定します。
ALTER TABLE ssbgz_db.lineorder ADD PARTITION (p_date='2018-01-28');

6. クエリの実行

Redshiftから通常のテーブルと変わりなくクエリが実行できます。

cmdb=# SELECT * FROM ssbgz_db.lineorder limit 1;
-[ RECORD 1 ]------+----------
lo_orderkey        | 526035458
lo_linenumber      | 3
lo_custkey         | 2358557
lo_partkey         | 468166
lo_suppkey         | 346150
lo_orderdate       | 19930404
lo_orderpriority   | 3-MEDIUM
lo_shippriority    | 0
lo_quantity        | 38
lo_extendedprice   | 4309732
lo_ordertotalprice | 13788549
lo_discount        | 5
lo_revenue         | 4094245
lo_supplycost      | 68048
lo_tax             | 6
lo_commitdate      | 19930516
lo_shipmode        | TRUCK

最後に

Amazon RedshiftのテーブルをAmazon Redshift Spectrumに移行するとRedshiftのストレージが削減されるだけではなく、移行したテーブルはSpectrumレイヤにプレディケイトプッシュダウンすることで、Redshiftクラスタのスループットは向上します。Redshift Spectrumは、DWHとデータレイクの境界をユーザーが意識することなく、テーブル間でフィルタ、集計、結合をこれまでと同じようにシンタックスでクエリできます。また、Redshift Spectrumに移行したテーブルは、AWS Glueのデータカタログに登録されますので、AWSのデータレイクサービスからもクエリができるようになります。

Amazon Athenaのskip.header.line.countのサポートAmazon Redshift SpectrumのDATE型サポートによって、AWSのデータレイクサービス(Amazon Athena、Amazon Redshift Spectrum、Amazon EMR、AWS Glue)間で同じS3のデータファイルに対して、実用レベルの相互にクエリーが実行できるようになりました。これまでデータ分析はDWHである Amazon Redshiftが中心でしたが、今後はAWSのデータレイクサービスが中心になります。これまで以上の大きなデータに、データロードすることなく素早くデータ分析が可能になります。その一助になれば幸いです。

関連ブログ

Amazon RedshiftからAmazon Athenaにマイグレートするツールを公開しました