Amazon Redshift [アップデート] データレイクで用いるカラム名ありパーティション形式のアンロードをサポートしたので試してみた

2020.03.30

Amazon Redshiftは、クラスタバージョン1.0.14104からINCLUDEオプションとPARTITION BYを指定して、アンロードされたファイルにパーティション列を追加できるようになりました。個人的には待望の機能なので早速試してみました。

INCLUDEオプションとPARTITION BYの指定とは

INCLUDEオプションとPARTITION BYを指定して、アンロードされたファイルにパーティション列を追加できるようになりました。つまり、UNLOADコマンドを実行する際にPARTITION BYを指定することでデータレイクで用いるカラム名ありパーティション形式でアンロードできるようになりました。もちろん、出力ファイル形式はCSV、Parquetなどお好きなフォーマットで出力可能です。

PARTITION BY ( column_name [, ... ] ) [INCLUDE]

※ 執筆時点では、 上記のオプションは英語マニュアルにのみ記載されています。

PARTITION BYにカラム指定するとカラムの値ごとにパーティション出力されます。また、INCLUDEオプションでPARTITION BYを指定した場合、パーティションカラムはアンロードされたファイルから削除されません。

unload ('select * from lineitem')
to 's3://mybucket/lineitem/'
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
PARQUET
PARTITION BY (l_shipdate);

4 つのスライスがある場合、生成される Parquet ファイルはさまざまなフォルダに動的に分割されます。

s3://mybucket/lineitem/l_shipdate=1992-01-02/0000_part_00.parquet
                                             0001_part_00.parquet
                                             0002_part_00.parquet
                                             0003_part_00.parquet
s3://mybucket/lineitem/l_shipdate=1992-01-03/0000_part_00.parquet
                                             0001_part_00.parquet
                                             0002_part_00.parquet
                                             0003_part_00.parquet
s3://mybucket/lineitem/l_shipdate=1992-01-04/0000_part_00.parquet
                                             0001_part_00.parquet
                                             0002_part_00.parquet
                                             0003_part_00.parquet
...

検証用テーブルとデータ

検証用に用意したのテーブルのlo_orderdateカラムには、19920101〜19921231まで365日分のデータが含まれています。

cmdb=> \d lineorder_test
                  Table "cm_ishikawa_satoru.lineorder_test"
       Column       |         Type          | Collation | Nullable | Default
--------------------+-----------------------+-----------+----------+---------
 lo_orderkey        | integer               |           | not null |
 lo_linenumber      | integer               |           | not null |
 lo_custkey         | integer               |           | not null |
 lo_partkey         | integer               |           | not null |
 lo_suppkey         | integer               |           | not null |
 lo_orderdate       | integer               |           | not null |
 lo_orderpriority   | character varying(15) |           | not null |
 lo_shippriority    | character varying(1)  |           | not null |
 lo_quantity        | integer               |           | not null |
 lo_extendedprice   | integer               |           | not null |
 lo_ordertotalprice | integer               |           | not null |
 lo_discount        | integer               |           | not null |
 lo_revenue         | integer               |           | not null |
 lo_supplycost      | integer               |           | not null |
 lo_tax             | integer               |           | not null |
 lo_commitdate      | integer               |           | not null |
 lo_shipmode        | character varying(10) |           | not null |

cmdb=> select count(*) from lineorder_test;
  count
---------
 9106395
(1 row)

cmdb=> \x
Expanded display is on.
cmdb=> select * from lineorder_test limit 1;
-[ RECORD 1 ]------+----------
lo_orderkey        | 521799110
lo_linenumber      | 6
lo_custkey         | 739298
lo_partkey         | 783047
lo_suppkey         | 758132
lo_orderdate       | 19920102
lo_orderpriority   | 3-MEDIUM
lo_shippriority    | 0
lo_quantity        | 1
lo_extendedprice   | 113001
lo_ordertotalprice | 26969219
lo_discount        | 2
lo_revenue         | 110740
lo_supplycost      | 67800
lo_tax             | 7
lo_commitdate      | 19920317
lo_shipmode        | SHIP

以降では、クラスタバージョン1.0.14104dc2.largeの2ノードで機能の確認とともにUNLOAD時間も計測します。

UNLOAD(CSV出力)

通常のCSV出力は、約25秒でした。dc2.largeの2ノードなのでファイルは4つずつ出力します。

cmdb=> UNLOAD ('select * FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_tsv/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> GZIP
cmdb-> DELIMITER '\t'
cmdb-> HEADER
cmdb-> ;
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 21955.632 ms (00:21.956)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_tsv/ --recursive
2020-03-29 23:16:55   97117240 lineorder_test/lineorder_tsv/0000_part_00.gz
2020-03-29 23:16:54   97531482 lineorder_test/lineorder_tsv/0001_part_00.gz
2020-03-29 23:16:55   97121201 lineorder_test/lineorder_tsv/0002_part_00.gz
2020-03-29 23:16:55   97311619 lineorder_test/lineorder_tsv/0003_part_00.gz

上記にPARTITION BYの指定してカラム名ありパーティション出力すると約2分10秒で365パーティション作成されました。パーティションごとに4つのファイルが作成されます。先頭の0000_part_00.gz〜0003_part_00.gz(L.15~18)の4つのファイルも生成されますのでご注意ください。

cmdb=> UNLOAD ('select * FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_tsv/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> GZIP
cmdb-> DELIMITER '\t'
cmdb-> HEADER
cmdb-> PARTITION BY (lo_orderdate);
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 130235.088 ms (02:10.235)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_tsv/ --recursive
2020-03-29 23:17:35        136 lineorder_test/lineorder_partitioned_tsv/0000_part_00.gz
2020-03-29 23:17:35        136 lineorder_test/lineorder_partitioned_tsv/0001_part_00.gz
2020-03-29 23:17:35        136 lineorder_test/lineorder_partitioned_tsv/0002_part_00.gz
2020-03-29 23:17:35        136 lineorder_test/lineorder_partitioned_tsv/0003_part_00.gz
2020-03-29 23:17:35     251358 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0000_part_00.gz
2020-03-29 23:17:35     261823 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0001_part_00.gz
2020-03-29 23:17:35     256716 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0002_part_00.gz
2020-03-29 23:17:35     256881 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19920102/0003_part_00.gz
:
:
2020-03-29 23:19:44     262736 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19921231/0002_part_00.gz
2020-03-29 23:19:44     257813 lineorder_test/lineorder_partitioned_tsv/lo_orderdate=19921231/0003_part_00.gz

UNLOAD(PARTITION BYによるParquet出力)

通常のCSV出力は、約20秒で最速でした。dc2.largeの2ノードなのでファイルは4つずつ出力します。通常のCSV出力は、約25秒で最速でした。dc2.largeの2ノードなのでファイルは4つずつ出力します。

cmdb=> UNLOAD ('select * FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_parquet/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> FORMAT AS PARQUET
cmdb-> ;
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 20348.743 ms (00:20.349)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_parquet/ --recursive
2020-03-29 21:14:35   78859742 lineorder_test/lineorder_parquet/0000_part_00.parquet
2020-03-29 21:14:35   79212678 lineorder_test/lineorder_parquet/0001_part_00.parquet
2020-03-29 21:14:35   78876263 lineorder_test/lineorder_parquet/0002_part_00.parquet
2020-03-29 21:14:35   79049744 lineorder_test/lineorder_parquet/0003_part_00.parquet

上記にPARTITION BYの指定してカラム名ありパーティション出力すると約2分14秒で365パーティション作成されました。パーティションごとに4つのファイルが作成されます。

cmdb=> UNLOAD ('select * FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_parquet/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> FORMAT AS PARQUET
cmdb-> PARTITION BY (lo_orderdate);
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 134429.230 ms (02:14.429)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_parquet/ --recursive
2020-03-29 21:15:30     255578 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0000_part_00.parquet
2020-03-29 21:15:30     264993 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0001_part_00.parquet
2020-03-29 21:15:30     260896 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0002_part_00.parquet
2020-03-29 21:15:30     260852 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19920102/0003_part_00.parquet
:
:
2020-03-29 21:17:37     265705 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19921231/0002_part_00.parquet
2020-03-29 21:17:38     261312 lineorder_test/lineorder_partitioned_parquet/lo_orderdate=19921231/0003_part_00.parquet

生成されたParquetファイルの中をS3Selectで確認します。PARTITION BYに指定したlo_orderdateカラムは含まれないParquetファイルが作成されます。

[
    {
        "lo_orderkey": 597459780,
        "lo_linenumber": 3,
        "lo_custkey": 2221423,
        "lo_partkey": 666256,
        "lo_suppkey": 820069,
        "lo_orderpriority": "3-MEDIUM",
        "lo_shippriority": "0",
        "lo_quantity": 43,
        "lo_extendedprice": 5255546,
        "lo_ordertotalprice": 15438046,
        "lo_discount": 9,
        "lo_revenue": 4782546,
        "lo_supplycost": 73333,
        "lo_tax": 8,
        "lo_commitdate": 19920208,
        "lo_shipmode": "FOB"
    },
    {
        "lo_orderkey": 540891970,
:
:
:

UNLOAD(INCLUDEオプション付きPARTITION BYによるParquet出力)

INCLUDEオプション付きINCLUDEオプション付きPARTITION BYの指定してカラム名ありパーティション出力すると約2分9秒で365パーティション作成されました。パーティションごとに4つのファイルが作成されます。

cmdb=> UNLOAD ('select * FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_inc_parquet/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> FORMAT AS PARQUET
cmdb-> PARTITION BY (lo_orderdate) INCLUDE;
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 129937.515 ms (02:09.938)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_inc_parquet/ --recursive
2020-03-29 21:22:06     255819 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0000_part_00.parquet
2020-03-29 21:22:06     265234 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0001_part_00.parquet
2020-03-29 21:22:06     261137 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0002_part_00.parquet
2020-03-29 21:22:06     261093 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19920102/0003_part_00.parquet
:
:
2020-03-29 21:24:10     265946 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19921231/0002_part_00.parquet
2020-03-29 21:24:13     261553 lineorder_test/lineorder_partitioned_inc_parquet/lo_orderdate=19921231/0003_part_00.parquet

生成されたParquetファイルの中をS3Selectで確認します。PARTITION BYに指定したlo_orderdateカラムがそのまま含まれたParquetファイルが作成されます。

[
    {
        "lo_orderkey": 597459780,
        "lo_linenumber": 3,
        "lo_custkey": 2221423,
        "lo_partkey": 666256,
        "lo_suppkey": 820069,
        "lo_orderdate": 19920102,
        "lo_orderpriority": "3-MEDIUM",
        "lo_shippriority": "0",
        "lo_quantity": 43,
        "lo_extendedprice": 5255546,
        "lo_ordertotalprice": 15438046,
        "lo_discount": 9,
        "lo_revenue": 4782546,
        "lo_supplycost": 73333,
        "lo_tax": 8,
        "lo_commitdate": 19920208,
        "lo_shipmode": "FOB"
    },
    {
        "lo_orderkey": 540891970,
:
:
:

出力ファイルからGlueクローラで外部テーブルを自動作成

「UNLOAD(Parquet出力)」で作成したファイルを、Glueクローラを用いて外部テーブルを自動作成します。出力したファイルを外部テーブルとして登録することで、Redshift Spectrum、Athena、Glue、EMRなどさまざまなAWSのビッグデータソリューションとの連携が可能になります。手順は画面に従いクローラの作成・実行するだけです。

クローラの名前

クローラのSourceTypeの指定

データストアの追加

別のデータストアの追加

IAMロールの選択

クローラのスケジュール設定

クローラの出力設定

クローラの設定確認

クローラの実行

Athenaからクエリを実行

[Tips] パーティションキーのカスタマイズ

上記の例では、lo_orderdateカラムの年月日の値でパーティション分割しましたが、必ずしも既存のカラムの値に限りません。例えば、lo_orderdateカラムの年月でパーティション分割したい場合は、以下のように年月のlo_ordermonthカラムを導出して、lo_ordermonthでパーティション分割することが可能です。

cmdb=> UNLOAD ('select *, (lo_orderdate / 100) AS lo_ordermonth FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_parquet_ordermonth/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> FORMAT AS PARQUET
cmdb-> PARTITION BY (lo_ordermonth);
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 20248.052 ms (00:20.248)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_parquet_ordermonth/ --recursive
2020-03-30 01:04:45    7557230 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199201/0000_part_00.parquet
2020-03-30 01:04:45    7577753 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199201/0001_part_00.parquet
:
:
2020-03-30 01:05:01    7815467 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199212/0002_part_00.parquet
2020-03-30 01:05:02    7830526 lineorder_test/lineorder_partitioned_parquet_ordermonth/lo_ordermonth=199212/0003_part_00.parquet

[Tips] Parquetファイルサイズの最適化

最も処理効率の良いParquetファイルサイズは256MB〜1GBですが、そのままUNLOADするとRedshiftのスライスサイズでファイルが小さな分割されてしまい、AthenaやGlue(Spark)で処理する際にパフォーマンスに悪影響が生じる可能性があります。そこで、PARALLEL OFF指定することで、パーティション内のファイルを1つのファイルにまとめることが可能です。

以下の例では、ファイルをまとめられることを確認しています。

cmdb=> UNLOAD ('select *, (lo_orderdate / 100) AS lo_ordermonth FROM lineorder_test;')
cmdb-> TO 's3://cm-user/lineorder_test/lineorder_partitioned_parquet_opt/'
cmdb-> IAM_ROLE 'arn:aws:iam::123456789012:role/redshift-role'
cmdb-> FORMAT AS PARQUET
cmdb-> PARALLEL OFF
cmdb-> PARTITION BY (lo_ordermonth);
INFO:  UNLOAD completed, 9106395 record(s) unloaded successfully.
UNLOAD
Time: 43878.377 ms (00:43.878)

---

% aws s3 ls s3://cm-user/lineorder_test/lineorder_partitioned_parquet_opt/ --recursive
2020-03-30 01:47:20   24458111 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199201/000.parquet
2020-03-30 01:47:24   23843710 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199202/000.parquet
2020-03-30 01:47:27   25346415 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199203/000.parquet
2020-03-30 01:47:31   24630728 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199204/000.parquet
2020-03-30 01:47:34   25378878 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199205/000.parquet
2020-03-30 01:47:38   24564990 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199206/000.parquet
2020-03-30 01:47:41   25310834 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199207/000.parquet
2020-03-30 01:47:45   25375873 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199208/000.parquet
2020-03-30 01:47:49   24491352 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199209/000.parquet
2020-03-30 01:47:52   25345431 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199210/000.parquet
2020-03-30 01:47:56   24632677 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199211/000.parquet
2020-03-30 01:47:59   25328213 lineorder_test/lineorder_partitioned_parquet_opt/lo_ordermonth=199212/000.parquet

最後に

今回の新機能でRedshiftはパーティションした形式でデータを直接、データレイクにアンロードできるようになりました。Redshiftはスキーマオンライトなので、データのロードが必要になりますがクエリ可能なデータであることが保証されます。一方、データレイクはスキーマオンリードなので本来S3のデータがクエリ可能であるかが保証できませんが、Redshiftからアンロードされたファイルはクエリ可能なデータであることが保証できます。この連携を用いることでデータレイクでもスキーマオンライトと同様の信頼性が得られるようになります。

GlueやEMRで大量のデータファイルを読み込み、パーティション形式で出力するとパーティションサイズの大きさや分割されたファイルを纏める処理など、スケールに応じて、ワーカー数やメモリサイズの調整、コードを書き換える煩雑な作業がありました。今後は命令的コードよりも宣言的クエリ言語で記述できるので、Redshiftにデータをロードして、Parquetでアンロードするだけで初期のデータ移行が済ませてしまうケースが増えるのではないかと思います。私も積極的に活用していきたいと思います。