dbt-athenaでHIVEテーブルに一度に100を超えるパーティションの書き込みができるか確認してみた

dbt-athenaはHIVEテーブルに一度に100を超えるパーティションの書き込みをした際も内部的にCTASとINSERT INTOを組み合わせてデータ作成をしてくれてとても便利でした。
2023.11.05

データアナリティクス事業本部 機械学習チームの鈴木です。

dbt-athenaでHIVEテーブルに一度に100を超えるパーティションの書き込みができるか確認してみました。

この記事について

試した内容について

Athenaはパーティション分割されたテーブルに対してINSERT INTOおよびCTASでデータ作成する際に、一度に100個を超えるパーティションにデータを作成することはできないのでした。

100個を超えるパーティションにデータを作成したい場合、以下のガイドのように100個以下のパーティションが対象になるようにフィルタリングし、繰り返しデータ作成をすることになります。

Athenaではパーティションキーは100個以内の限られたカーディナリティのものを使うことになりますが、実際のところ年月日のような値はパーティションキーによく使われているので、このような設計にしている場合、100個くらいはあっという間に超えてしまいます。このとき過去のデータを作り直す機能としては、例えば先に記載したようにforなどを使って100個ずつAthenaで実行し直すというような実装をする必要がありました。

ところで、dbt-athenaを使っている場合、特にIncremental modelsなどではパーティション分割していることも多いかと思いますが、データの作り直しをする際にdbt run --full-refreshを実行すると100個の制限にかかって失敗してしまうとかはないのかなと疑問に思ったので、試してみた内容をご紹介します。

結果としては、その制限にかかってモデル作成が失敗することはなく、dbt-athena側で100個に収まるようにパーティションを分けてデータ作成してくれることが分かり、とても使いやすいなと思いました。

環境

  • dbt: 1.6.6
  • dbt-athena-community: 1.6.4
  • Python: 3.9.6
  • Athena エンジンバージョン 3

検証の準備

データの作成

以下のように2つのカラムを持つCSVファイルを手作りしました。partiton_keyカラムは名前通り、パーティショニングするときのパーティションキーにします。

partiton_key,item_value
1,sample
2,sample
3,sample
4,sample
5,sample
6,sample
7,sample
8,sample
9,sample
10,sample

上のようなCSVファイルを、以下の2パターン作成しました。

  1. partiton_keyが100個のCSVファイル
  2. partiton_keyが101個のCSVファイル

今回100個の制限に引っかかるか確認するなら、No.2のCSVファイルでパーティションを作成した際にエラーになるかどうかを見れば良さそうですね。

上記のファイルをS3にアップロードし、以下のGlueテーブルから検索できるようにしておきました。

CREATE EXTERNAL TABLE `partitioning_sample_table`(
  `partition_key` string, 
  `item_value` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://データをアップロードしたS3バケット名/データをアップロードしたパス'
TBLPROPERTIES (
  'classification'='csv', 
  'columnsOrdered'='false', 
  'compressionType'='none', 
  'delimiter'=',', 
  'skip.header.line.count'='1',
  'transient_lastDdlTime'='1697863217')

dbtのモデルの準備

以下のようにモデルを作成しました。partition_keyカラムでパーティション分割したモデルです。

models/staging/stg_partitioning_sample_model.sql

{{ config(
    materialized='table',
    partitioned_by=['partition_key'],
) }}

SELECT 
    item_value,
    partition_key
FROM {{ source('sample','partitioning_sample_table') }}

ソーステーブルは以下のように定義しておきました。

models/staging/sample__sources.yml

version: 2

sources:
  - name: sample
    database: awsdatacatalog
    schema: cm-nayuts-dbt-athena
    tables:
      - name: partitioning_sample_table

モデルを作成する

100個までのとき

No.1のデータをS3バケットに配置してからdbt runでモデルを実行し、AthenaでどのようなSQLが実行されるかを確認しました。

100個までのときはシンプルで、CTASでstg_partitioning_sample_modelを作成していました。

-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */

  
    create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model"
  with (
    table_type='hive',
    is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model/215c9454-40c2-403f-9424-73bf5991200f',
    partitioned_by=ARRAY['partition_key'],
    format='parquet'
  )
  as
    

SELECT 
    item_value,
    partition_key
FROM "awsdatacatalog"."cm-nayuts-dbt-athena"."partitioning_sample_table"
alter table `cm-nayuts-dbt-athena`.`stg_partitioning_sample_model` set tblproperties ('classification' = 'parquet')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
SELECT '{"rowcount":100,"data_scanned_in_bytes":1115}'

101個までのとき

次に、No.2のデータをS3バケットに配置してからdbt runでモデルを実行し、AthenaでどのようなSQLが実行されるかを確認しました。

こちらはまずHIVE_TOO_MANY_OPEN_PARTITIONS: Exceeded limit of 100 open writers for partitions/buckets.エラーでSQLが失敗しました。

パーティション超過のエラー

以降は一時用のテーブルを作成し、stg_partitioning_sample_modelを100を超えないようにパーティションをWHERE句で選択してCTASで作成し、溢れた分はINSERT INTOで格納していました。

-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */

  
    create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned"
  with (
    table_type='hive',
    is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model__tmp_not_partitioned/b2e1c475-47a7-45f6-9364-7edec9d3801b',
    format='parquet'
  )
  as
    

SELECT 
    item_value,
    partition_key
FROM "awsdatacatalog"."cm-nayuts-dbt-athena"."partitioning_sample_table"
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
select distinct partition_key from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" order by partition_key
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */

  
    create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model"
  with (
    table_type='hive',
    is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model/a74f65db-69fa-45e6-a18e-ae69ae0b7e84',
    partitioned_by=ARRAY['partition_key'],
    format='parquet'
  )
  as
    select "item_value", "partition_key"
                from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned"
                where (partition_key='1') or (partition_key='10') or (partition_key='100') or (partition_key='101') or (partition_key='11') or (partition_key='12') or (partition_key='13') or (partition_key='14') or (partition_key='15') or (partition_key='16') or (partition_key='17') or (partition_key='18') or (partition_key='19') or (partition_key='2') or (partition_key='20') or (partition_key='21') or (partition_key='22') or (partition_key='23') or (partition_key='24') or (partition_key='25') or (partition_key='26') or (partition_key='27') or (partition_key='28') or (partition_key='29') or (partition_key='3') or (partition_key='30') or (partition_key='31') or (partition_key='32') or (partition_key='33') or (partition_key='34') or (partition_key='35') or (partition_key='36') or (partition_key='37') or (partition_key='38') or (partition_key='39') or (partition_key='4') or (partition_key='40') or (partition_key='41') or (partition_key='42') or (partition_key='43') or (partition_key='44') or (partition_key='45') or (partition_key='46') or (partition_key='47') or (partition_key='48') or (partition_key='49') or (partition_key='5') or (partition_key='50') or (partition_key='51') or (partition_key='52') or (partition_key='53') or (partition_key='54') or (partition_key='55') or (partition_key='56') or (partition_key='57') or (partition_key='58') or (partition_key='59') or (partition_key='6') or (partition_key='60') or (partition_key='61') or (partition_key='62') or (partition_key='63') or (partition_key='64') or (partition_key='65') or (partition_key='66') or (partition_key='67') or (partition_key='68') or (partition_key='69') or (partition_key='7') or (partition_key='70') or (partition_key='71') or (partition_key='72') or (partition_key='73') or (partition_key='74') or (partition_key='75') or (partition_key='76') or (partition_key='77') or (partition_key='78') or (partition_key='79') or (partition_key='8') or (partition_key='80') or (partition_key='81') or (partition_key='82') or (partition_key='83') or (partition_key='84') or (partition_key='85') or (partition_key='86') or (partition_key='87') or (partition_key='88') or (partition_key='89') or (partition_key='9') or (partition_key='90') or (partition_key='91') or (partition_key='92') or (partition_key='93') or (partition_key='94') or (partition_key='95') or (partition_key='96') or (partition_key='97') or (partition_key='98')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
insert into "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" ("item_value", "partition_key")
                select "item_value", "partition_key"
                from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned"
                where (partition_key='99')
alter table `cm-nayuts-dbt-athena`.`stg_partitioning_sample_model` set tblproperties ('classification' = 'parquet')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
SELECT '"awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with many partitions created'

厳密にはdbt-athena-communityの実装を確認すべきですが、パーティション数超過のエラーを受け取るとCTASとINSERT INTOを組み合わせてデータ格納をするロジックに切り替えてくれるようですね。

Incremental modelsの場合

Incremental modelsの場合でも同様に実行できるか確認しました。

まず、モデルの定義を以下のように変更しました。

models/staging/stg_partitioning_sample_model.sql

{{ config(
    materialized='incremental',
    partitioned_by=['partition_key'],
) }}

SELECT 
    item_value,
    partition_key
FROM {{ source('sample','partitioning_sample_table') }}

{% if is_incremental() %}

  where partitioned_by > (select max(partitioned_by) from {{ this }})

{% endif %}

次に、No.2のファイルをS3バケットに配置した状態で、dbt run --full-refreshを実行し、以下の順にSQLが実行されることを確認しました。SQLの内容としては、微妙に違いはあるものの、ロジックとしては先に確認したものと同じでした。

パーティション超過のエラー Incremental

-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */

  
    create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned"
  with (
    table_type='hive',
    is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model__tmp_not_partitioned/59ea3001-4893-4a65-8ceb-84c238a3cc43',
    format='parquet'
  )
  as
    

SELECT 
    item_value,
    partition_key
FROM "awsdatacatalog"."cm-nayuts-dbt-athena"."partitioning_sample_table"
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
select distinct partition_key from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" order by partition_key
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */

  
    create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model"
  with (
    table_type='hive',
    is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model/3eca27c2-6a2a-46ae-bd90-a9fd48f3f78e',
    partitioned_by=ARRAY['partition_key'],
    format='parquet'
  )
  as
    select "item_value", "partition_key"
                from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned"
                where (partition_key='1') or (partition_key='10') or (partition_key='100') or (partition_key='101') or (partition_key='11') or (partition_key='12') or (partition_key='13') or (partition_key='14') or (partition_key='15') or (partition_key='16') or (partition_key='17') or (partition_key='18') or (partition_key='19') or (partition_key='2') or (partition_key='20') or (partition_key='21') or (partition_key='22') or (partition_key='23') or (partition_key='24') or (partition_key='25') or (partition_key='26') or (partition_key='27') or (partition_key='28') or (partition_key='29') or (partition_key='3') or (partition_key='30') or (partition_key='31') or (partition_key='32') or (partition_key='33') or (partition_key='34') or (partition_key='35') or (partition_key='36') or (partition_key='37') or (partition_key='38') or (partition_key='39') or (partition_key='4') or (partition_key='40') or (partition_key='41') or (partition_key='42') or (partition_key='43') or (partition_key='44') or (partition_key='45') or (partition_key='46') or (partition_key='47') or (partition_key='48') or (partition_key='49') or (partition_key='5') or (partition_key='50') or (partition_key='51') or (partition_key='52') or (partition_key='53') or (partition_key='54') or (partition_key='55') or (partition_key='56') or (partition_key='57') or (partition_key='58') or (partition_key='59') or (partition_key='6') or (partition_key='60') or (partition_key='61') or (partition_key='62') or (partition_key='63') or (partition_key='64') or (partition_key='65') or (partition_key='66') or (partition_key='67') or (partition_key='68') or (partition_key='69') or (partition_key='7') or (partition_key='70') or (partition_key='71') or (partition_key='72') or (partition_key='73') or (partition_key='74') or (partition_key='75') or (partition_key='76') or (partition_key='77') or (partition_key='78') or (partition_key='79') or (partition_key='8') or (partition_key='80') or (partition_key='81') or (partition_key='82') or (partition_key='83') or (partition_key='84') or (partition_key='85') or (partition_key='86') or (partition_key='87') or (partition_key='88') or (partition_key='89') or (partition_key='9') or (partition_key='90') or (partition_key='91') or (partition_key='92') or (partition_key='93') or (partition_key='94') or (partition_key='95') or (partition_key='96') or (partition_key='97') or (partition_key='98')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
insert into "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" ("item_value", "partition_key")
                select "item_value", "partition_key"
                from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned"
                where (partition_key='99')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */
select '"awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with many partitions created'
alter table `cm-nayuts-dbt-athena`.`stg_partitioning_sample_model` set tblproperties ('classification' = 'parquet')

終わりに

dbt-athenaでHIVEテーブルに一度に100を超えるパーティションの書き込みができるか確認してみました。

パーティション分割している際に、過去データの作り直しで悩むことがありましたが、dbt-athenaでモデル作成しているときは考えなくてもdbtがよしなにやってくることが分かり安心しました。

補足

パーティション数の制限について

今回の検証では100個を超えてもdbt-athenaがよしなに対処してくれることが分かりましたが、パーティション数が非常に大きい場合はパーティションキーの設計自体を考えた方が良いです。

例えばパーティションにデータを作成すると、S3にオブジェクトを作成するためのPUTリクエストが行われるため、東京リージョンだと0.0047USD/1000リクエストの費用がかかります。一見は安く感じますが、細かくパーティションを分けているとオブジェクト作成時やその後のファイル取得時などに思いがけず費用がかかってしまう可能性があります。

100個の制限を設けてユーザーが意識的にパーティションを作りすぎないようになっているのは大事なことだと思います。そもそも本来は非常に高いパーティションキーでパーティションを分けることはせず、Bucketingと組み合わせたり、Icebergテーブルにしてhidden partitioningのようなパーティションキーのカーディナリティを低くするような仕組みを利用したりすると良いように思います。