Amazon RedshiftからAmazon Athenaにマイグレートするツールを公開しました

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Amazon RedshiftとAmazon Athenaでは、データ型やデータの持ち方、特にDDL構文が全く異なり、いざ移行を試みると煩雑な作業に多くの時間を要してハードルの高さを感じてしまいます。そこで、この問題を解決すべく、Amazon Redshiftのテーブル定義情報からAmazon Athenaのテーブルを作成するDDLを自動生成するツールを作成し、公開するに至りました。今回はこのツールを用いてマイグレートとする方法について解説します。

GitHub - aws-datalake-migration/ddl/v_generate_athena_ddl.sql

Amazon Athenaにマイグレートする目的

アドホックなクエリをAmazon Athenaにオフロード

Redshiftへの予測できない不定期でアドホックなクエリは、日次や月次の定期バッチ処理に少なからず影響があります。そのため、大規模なRedshiftクラスタでは、キャパシティの上限を予測して、コンピュートノードのサイジングやWLMのチューニングが必要です。

この課題はアドホックなクエリをAmazon Athenaにオフロードすることで解決できます。Redshiftのデータマートや集計した累積データをS3にUNLOADしておき、Amazon AthenaでそのS3ファイルに対してクエリします。

Redshiftのストレージの開放

常にストレージキャパシティの限界までデータを格納することは、パフォーマンスや安定稼働の観点で望ましくありません。そこで、大容量で参照頻度の少ないテーブルのデータをS3ファイルとしてUNLOADして、Athenaでクエリできるように移行します。移行後は直接AthenaからS3のファイルに対してアドホックなクエリを実行できるようになります。

RedshiftのELTからGlueのETLに移行する

S3のファイルベースにすることで、Redshiftのコンピューティングリソースを使うELTから、GlueのスケーラブルなETLに移行できます。Amazon Athenaに移行することで、AWS Glue連携したサーバーレス・アナリティクス環境を構築できます。

マイグレートの手順

  1. Amazon Athenaのテーブル作成用ビューの登録
  2. S3にデータファイルを配置する
  3. AthenaにDBを作成する
  4. Athenaにテーブルを作成する
  5. 作成したテーブルにパーティションを設定する
  6. クエリの実行

1. Amazon Athenaのテーブル作成用ビューの作成

最初にAmazon Redshiftのテーブル定義情報からAmazon Athenaのテーブルを作成するためのビューをRedshiftに作成します。なお、ビューを登録しないで実行したい場合はCREATE OR REPLACE VIEW v_generate_athena_ddl ASをコメントにしてそのまま実行すると全てのテーブルのDDLが生成します。

--DROP VIEW v_generate_athena_ddl;
/**********************************************************************************************
Purpose: View to get the Amazon Athena DDL for a table.
         This will contain the S3 LOCATION, TBLPROPERTIES(for AWS Glue), etc.
Notes:   
         The following filters are useful:
           where tablename in ('t1', 't2')     -- only get DDL for specific tables
           where schemaname in ('s1', 's2')    -- only get DDL for specific schemas
         So for example if you want to order DDL on tablename and only want the tables 't1', 't2'
         and 't4' you can do so by using a query like:
           select ddl from (
           ) where tablename in ('t1', 't2', 't4');
History:
2018-01-26 Satoru Ishikawa Created
**********************************************************************************************/
CREATE OR REPLACE VIEW v_generate_athena_ddl
AS
SELECT 
  REGEXP_REPLACE (schemaname, '^zzzzzzzz', '') AS schemaname, 
  REGEXP_REPLACE (tablename, '^zzzzzzzz', '') AS tablename, 
  seq, 
  ddl 
FROM 
  (
    SELECT 
      schemaname, 
      tablename, 
      seq, 
      ddl 
    FROM 
      (
        --DROP TABLE
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          0 AS seq, 
          '-- DROP TABLE ' + QUOTE_IDENT(n.nspname || '_db') + '.' + QUOTE_IDENT(c.relname) + ';' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --CREATE TABLE
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          2 AS seq, 
          'CREATE EXTERNAL TABLE ' + QUOTE_IDENT(n.nspname || '_db') + '.' + QUOTE_IDENT(c.relname) + '' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --OPEN PAREN COLUMN LIST
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          5 AS seq, 
          '(' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --COLUMN LIST
        UNION 
        SELECT 
          schemaname, 
          tablename, 
          seq, 
          '\t' + col_delim + col_name + ' ' + col_datatype AS ddl 
        FROM 
          (
            SELECT 
              n.nspname AS schemaname, 
              c.relname AS tablename, 
              100000000 + a.attnum AS seq, 
              CASE WHEN a.attnum > 1 THEN ',' ELSE '' END AS col_delim, 
              QUOTE_IDENT(a.attname) AS col_name, 
              CASE 
                WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER VARYING'
                ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER VARYING', 
                'VARCHAR'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'CHARACTER', 
                'CHAR'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'INTEGER'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'INTEGER', 
                'INT'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'NUMERIC'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'NUMERIC', 
                'DECIMAL'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'DOUBLE'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'DOUBLE PRECISION', 
                'DOUBLE'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'REAL'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'REAL', 
                'FLOAT'
              ) WHEN STRPOS(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'TIMESTAMP'
              ) > 0 THEN REPLACE(
                UPPER(
                  format_type(a.atttypid, a.atttypmod)
                ), 
                'TIMESTAMP WITHOUT TIME ZONE', 
                'TIMESTAMP'
              ) ELSE UPPER(
                format_type(a.atttypid, a.atttypmod)
              ) END AS col_datatype
            FROM 
              pg_namespace AS n 
              INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
              INNER JOIN pg_attribute AS a ON c.oid = a.attrelid 
              LEFT OUTER JOIN pg_attrdef AS adef ON a.attrelid = adef.adrelid 
              AND a.attnum = adef.adnum 
            WHERE 
              c.relkind = 'r' 
              AND a.attnum > 0 
            ORDER BY 
              a.attnum
          ) 
        --CLOSE PAREN COLUMN LIST
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          299999999 AS seq, 
          ')' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r' 
        --END SEMICOLON
        UNION 
        SELECT 
          n.nspname AS schemaname, 
          c.relname AS tablename, 
          600000000 AS seq, 
          '-- PARTITIONED BY (col_name data_type [, … ])\n' ||
          'ROW FORMAT DELIMITED\n' ||
          '  FIELDS TERMINATED BY ''\\t''\n' ||
          'STORED AS INPUTFORMAT\n' ||
          '  ''org.apache.hadoop.mapred.TextInputFormat''\n' ||
          'OUTPUTFORMAT\n' ||
          '  ''org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat''\n' ||
          'LOCATION\n' ||
          '  ''s3://bucket/folder/''\n' ||
          'TBLPROPERTIES (\n' ||
          '--  ''CrawlerSchemaDeserializerVersion''=''1.0'',\n' ||
          '--  ''CrawlerSchemaSerializerVersion''=''1.0'',\n' ||
          '--  ''UPDATED_BY_CRAWLER''=''my_crawler'',\n' ||
          '--  ''averageRecordSize''=''100'',\n' ||
          '  ''classification''=''csv'',\n' ||
          '--  ''columnsOrdered''=''true'',\n' ||
          '--  ''compressionType''=''gzip'',\n' ||
          '  ''delimiter''=''\\t'',\n' ||
          '--  ''objectCount''=''1000'',\n' ||
          '--  ''recordCount''=''10000000000'',\n' ||
          '--  ''sizeKey''=''1000000000000'',\n' ||
          '--   ''skip.header.line.count''=''1'',\n' ||
          '  ''typeOfData''=''file'')\n' ||
          ';' AS ddl 
        FROM 
          pg_namespace AS n 
          INNER JOIN pg_class AS c ON n.oid = c.relnamespace 
        WHERE 
          c.relkind = 'r'
      ) 
    WHERE 1=1
--       AND schemaname in ('s1', 's2') -- listed Traget schemas. 
--       AND tablename  in ('t1', 't2') -- listed Traget tables. 
    ORDER BY 
      schemaname, 
      tablename, 
      seq
  );

2. S3にデータファイルを配置する

データファイルがすでにある場合

Redshiftにロードした元のデータがS3上にある場合は、そのファイルをS3のフォルダの下にファイルを配置します。データファイルは圧縮形式(.gz等)で保存することを推奨します。

  • 1つのS3フォルダにすべてのデータを配置したい場合は全てのデータファイルをS3フォルダの下に単純にコピーするので構いません。
  • パーティション毎にS3フォルダにデータファイルが格納したい場合、例えば、accesslog-20180125.tsv.gzのようにファイルに日付毎にファイルが分かれているのであれば日付毎でフォルダを分割して保存します。

データがRedshift上に存在する場合

ELT済みのデータなどRedshift上にデータが存在する場合は、RedshiftのUNLOADコマンドでLOCATIONに指定したS3のフォルダの下にファイルを配置します。データファイルは圧縮形式(GZIP等)で保存することを推奨します。

  • データが少ないのであれば、パーティション不要なので単一ファイルでアンロードします。
  • データが多いのであれば、パーティション毎にフォルダを作成してその下にファイルをアンロードします。

参考: カラム名あり/カラム名なしパーティション形式

※ パーティションとは、指定したパーティションキー毎にデータファイルをフォルダに分けて格納して管理することです。クエリの条件にパーティションキーを指定したときに一致したフォルダのデータをスキャンするだけで済むので、無駄なスキャンをスキップして高速かつ利用費削減できます。

3. AthenaにDBを作成する

AthenaのコンソールからDBを作成します。将来的なRedshift Spectrumとの連携を考慮して、AthenaのDB名とRedshiftのスキーマ名は別の名前を設定します。上記のツールが生成するAthenaのDB名は、元になるRedshiftのスキーマ名の最後に_dbを付けた文字列をDB名として生成します。例えば、マイグレーション元であるRedshiftのスキーマ名がssbgzなので、Athenaで作成するDB名はssbgz_dbとします。

Athenaのコンソールから以下のDDLを実行します。

CREATE DATABASE ssbgz_db;

4. Athenaにテーブルを作成する

Redshiftのテーブル定義情報から自動生成したAmazon Athenaのテーブル作成SQLを用いてテーブルを作成します。

Amazon Athenaのテーブル作成用SQLの生成

そのまま実行すると全てのテーブルのDDLが生成しますが、必要なテーブルが明らかな場合は、スキーマ名やテーブル名を指定してください。

例えば、スキーマ名はssbgz、テーブル名はlineorderの場合:

cmdb=# select ddl from v_generate_athena_ddl where schemaname='ssbgz' and tablename='lineorder';
                              ddl
----------------------------------------------------------------
 -- DROP TABLE ssbgz_db.lineorder;
 CREATE EXTERNAL TABLE ssbgz_db.lineorder
 (
         lo_orderkey INT
         ,lo_linenumber INT
         ,lo_custkey INT
         ,lo_partkey INT
         ,lo_suppkey INT
         ,lo_orderdate INT
         ,lo_orderpriority VARCHAR(15)
         ,lo_shippriority VARCHAR(1)
         ,lo_quantity INT
         ,lo_extendedprice INT
         ,lo_ordertotalprice INT
         ,lo_discount INT
         ,lo_revenue INT
         ,lo_supplycost INT
         ,lo_tax INT
         ,lo_commitdate INT
         ,lo_shipmode VARCHAR(10)
 )
 -- PARTITIONED BY (col_name data_type [, … ])                 +
 ROW FORMAT DELIMITED                                          +
   FIELDS TERMINATED BY '\t'                                   +
 STORED AS INPUTFORMAT                                         +
   'org.apache.hadoop.mapred.TextInputFormat'                  +
 OUTPUTFORMAT                                                  +
   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'+
 LOCATION                                                      +
   's3://bucket/folder/'                                       +
 TBLPROPERTIES (                                               +
 --  'CrawlerSchemaDeserializerVersion'='1.0',                 +
 --  'CrawlerSchemaSerializerVersion'='1.0',                   +
 --  'UPDATED_BY_CRAWLER'='my_crawler',                        +
 --  'averageRecordSize'='100',                                +
   'classification'='csv',                                     +
 --  'columnsOrdered'='true',                                  +
 --  'compressionType'='gzip',                                 +
   'delimiter'='\t',                                           +
 --  'objectCount'='1000',                                     +
 --  'recordCount'='10000000000',                              +
 --  'sizeKey'='1000000000000',                                +
 --   'skip.header.line.count'='1',                            +
   'typeOfData'='file')                                        +
 ;
(22 rows)

Amazon Athenaのテーブル作成用SQLのカスタマイズ

S3に格納されているデータファイルは、TSVファイル形式を前提に生成しています。パーティション設定、データファイルの場所、TBLPROPERTIESの設定等は内容に応じて、設定が必要です。

PARTITION 設定

データを格納しているフォルダが単一フォルダの場合は指定は不要です。コメントのままでも構いません。

一方、データを格納しているフォルダがパーティション構成の場合は、パーティションのキーとなるカラム名とデータ型を指定してください。例えば、カラム名がp_date、データ型がDATEの場合は以下のように指定します。

PARTITIONED BY (p_date DATE)
ROW FORMAT DELIMITED 設定

TSVファイル形式を前提なので、そのままで構いません。S3ファイルの区切り文字(delimiter)を指定します。TSVなので\tを指定しています。CSVの場合は,を指定してください。

STORED AS INPUTFORMAT設定

TSVファイル形式を前提なので、そのままで構いません。データファイルの入力フォーマットを指定します。

OUTPUTFORMAT設定

TSVファイル形式を前提なので、そのままで構いません。データファイルの出力フォーマットを指定します。(但し、現状はデータの書き込みはサポートしていません)

LOCATION 設定

S3ファイルを格納しているフォルダのURLを指定します。S3のファイルのURLやS3のプレフィックスを指定するとエラーになります。

LOCATION
   's3://bucket/folder/'
TBLPROPERTIES 設定

キーバリュー形式でプロパティを設定します。コメントインしているプロパティは設定を推奨するプロパティです。property_nameproperty_valueは、ともにシングルクォーテーションで括ります。

property_name property_value 必須 補足
CrawlerSchemaDeserializerVersion 1.0 現状は常に`1.0`指定されている。
CrawlerSchemaSerializerVersion 1.0 現状は常に`1.0`指定されている。
UPDATED_BY_CRAWLER my_crawler AWS Glueのクローラの名前を指定します。
averageRecordSize 215 レコード長の平均値。(sizeKey ÷ recordCount)
classification csv ファイルのフォーマットタイプを指定します。AWS Glueのデータソースとして参照する場合は必須項目です。
columnsOrdered true classificationが`csv`の場合は`true`指定します。
compressionType gzip 圧縮タイプを指定する。非圧縮の場合は`none`を指定します。
delimiter \t classificationが`csv`の区切り文字を指定します。ここではTab区切りなので`\t`を指定します。
objectCount 8 S3のファイル数を指定します。
recordCount 10766 レコード数を指定します。Redshiftのクエリオプティマイザが利用するので概算値でも良いので指定することを推奨します。
sizeKey 2314879 ファイルのバイト数
skip.header.line.count 1 スキップするファイルの先頭行の数を指定します。
typeOfData file データタイプとして`file`が指定されています。

テーブル作成DDLを実行する

カスタマイズ後のDDLは以下のとおりです。このDDLをAthenaのコンソールから実行します。

CREATE EXTERNAL TABLE ssbgz_db.lineorder
(
        lo_orderkey INT
        ,lo_linenumber INT
        ,lo_custkey INT
        ,lo_partkey INT
        ,lo_suppkey INT
        ,lo_orderdate INT
        ,lo_orderpriority VARCHAR(15)
        ,lo_shippriority VARCHAR(1)
        ,lo_quantity INT
        ,lo_extendedprice INT
        ,lo_ordertotalprice INT
        ,lo_discount INT
        ,lo_revenue INT
        ,lo_supplycost INT
        ,lo_tax INT
        ,lo_commitdate INT
        ,lo_shipmode VARCHAR(10)
)
PARTITIONED BY (p_date DATE)
ROW FORMAT DELIMITED                                          
  FIELDS TERMINATED BY '\t'                                   
STORED AS INPUTFORMAT                                         
  'org.apache.hadoop.mapred.TextInputFormat'                  
OUTPUTFORMAT                                                  
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION                                                      
  's3://bucket/folder/'                                       
TBLPROPERTIES (                                               
  'classification'='csv',                                     
  'columnsOrdered'='true',                                  
  'compressionType'='gzip',                                 
  'delimiter'='\t',                                           
  'typeOfData'='file')                                        
;

5. 作成したテーブルにパーティションを設定する

パーティション形式でデータファイルを分割していない場合、この作業は不要です。パーティション形式にデータファイルを分割している場合はパーティションを設定します。DDLでパーティションを指定しているのに以下の設定を忘れるとデータが全く見えない状態になりますのでご注意ください。

  • カラム名ありパーティション
    • カラム名ありパーティション形式にデータファイルを配置していれば、以下のコマンドを実行するとLOCATIONに指定したパスをスキャンしてパーティション設定を自動更新できます。
MSCK REPAIR TABLE <テーブル名>;
  • カラムなしパーティション
    • カラムなしパーティション形式はパーティション毎に以下のコマンドを実行してパーティションを設定します。
ALTER TABLE <テーブル名> ADD PARTITION (p_date='2018-01-28');

6. クエリの実行

最後に

Amazon RedshiftのテーブルをAmazon Athenaに移行すると、それ以降Amazon Redshiftの影響は全く受けずにクエリの実行が可能になります。Amazon Athenaのskip.header.line.countのサポートAmazon Redshift SpectrumのDATE型サポートによって、AWSのデータレイクサービス(Amazon Athena、Amazon Redshift Spectrum、Amazon EMR、AWS Glue)間で同じS3のデータファイルに対して、実用レベルのクエリーを相互に実行できるようになりました。これまでデータ分析はDWHである Amazon Redshiftが中心でしたが、今後はAWSのデータレイクサービスが中心になります。これまで以上の大きなデータに、データロードすることなく素早くデータ分析が可能になります。その移行の一助になれば幸いです。

関連ブログ

Amazon RedshiftからAmazon Redshift Spectrumにマイグレートするツールを公開しました