AthenaでCTAS / Insert でパーティション100件超の作成で発生するエラーはIcebergでも発生するのか確認してみた

AthenaでCTAS / Insert でパーティション100件超の作成で発生するエラーはIcebergでも発生するのか確認してみた

Athenaでは、CTASやInsertにてクエリ毎のパーティション作成数は最大100個に制限されています。100パーティションを超える追加が発生した場合は `HIVE_TOO_MANY_OPEN_PARTITIONS: Exceeded limit of 100 open writers for partitions/buckets` というエラーが発生します。 Icebergテーブルに対しても発生するのか確認し、回避方法も合わせて検討しました。
Clock Icon2024.08.07

データ事業本部の笠原です。

Athenaでは、CTASやInsertにてクエリ毎のパーティション作成数は最大100個に制限されています。100パーティションを超える追加が発生した場合は HIVE_TOO_MANY_OPEN_PARTITIONS: Exceeded limit of 100 open writers for partitions/buckets というエラーが発生します。

Icebergテーブルに対しても発生するのか、確認してみました。

結果としてはIcebergテーブルに対しても発生するので、回避方法を合わせて検討しました。

3行まとめ

  • Icebergテーブルでも、Athenaクエリの100パーティション超える追加発生時エラーが起こる
  • クエリの範囲を絞って100パーティション超える追加が発生しないようにすれば、エラーを回避できる
  • 他にGlueジョブやAthenaノートブックのSparkを使えば、エラーを回避できる

構成

AthenaのSQLクエリは今回AWSマネジメントコンソール上から実行します。
実行時にはGlueカタログを参照します。
Glueカタログで定義されたInput/OutputはいずれもS3バケットのファイルとします。
InputとOutputは同じバケット配下の各パスで分けています。InputはCSVファイルのテーブル、OutputはIcebergテーブルとして事前に定義します。

arch

準備

実際に試す前に、利用するデータとテーブル定義を行います。

入力データファイル作成

今回は試験用にCSVファイルのデータを作成しました。
Icebergテーブルには、日付パーティションで格納したいので、
日付項目となる target_date カラムの値が100超えるだけの行数を適当に生成します。

こんな感じでPythonで作成しました。

データファイル生成コード
generate_sample.py
import string
import uuid
import datetime
import csv

prefixes = []
chr_lowlist = string.ascii_lowercase

## id の先頭2文字を英小文字の組み合わせ (26*26 = 676) で作る
for chr1 in chr_lowlist:
    for chr2 in chr_lowlist:
        prefixes.append(chr1 + chr2)

target_date = datetime.date(2023,1,1)
header = ['id', 'uuid', 'target_date']

with open('./partition100_date_sample.csv', 'w') as f:
    writer = csv.writer(f)
    writer.writerow(header)
    for idx, prefix in enumerate(prefixes):
        writer.writerow([
            f'{prefix}{idx:0=5}',
            uuid.uuid4(),
            target_date.strftime('%Y-%m-%d'),
        ])
        target_date += datetime.timedelta(days=1)

作成したCSVファイルはこんな感じになります。

partition100_date_sample.csv
id,uuid,target_date
aa00000,c91da6e5-d315-4643-8735-42917648ba44,2023-01-01
ab00001,11e63f6d-7b10-425c-be68-00d02a8c0638,2023-01-02
ac00002,62c05364-44d0-4a7d-8375-409a326b11e8,2023-01-03
ad00003,cde01f0b-5585-4742-a8f2-405a62d9ebc1,2023-01-04
ae00004,95d230e1-7f9d-4fc2-8f0b-85b9398a385c,2023-01-05
af00005,d40425b6-e53c-4d79-b6b0-d3965c4f8636,2023-01-06

...<省略>...

zx00673,7abd5ed6-30c8-4344-aa35-4463eaaf2606,2024-11-04
zy00674,10b0e52c-e7bd-4a80-a829-69b3b13ebb79,2024-11-05
zz00675,2ccecb4c-19c5-4c72-b85c-89f0e3470326,2024-11-06

S3バケット作成

今回のデータファイルを格納するS3バケットを1つ作成しておきます。
AWS CLIでもマネジメントコンソールでも良いので、作りやすい方法で作成しましょう。
また、S3バケット名は適宜変更してください。

バケット作成
aws s3 mb s3://sample-partition-100-db

create_bucket

また、入力データとなるCSVファイルは、この後定義するCSVファイルのGlueテーブルの LOCATION パスに合わせて配置しておきましょう。

入力データファイルを配置
aws s3 cp ./partition100_date_sample.csv \
  s3://sample-partition-100-db/partition100_date_sample_data/

input_csv_file

Glueカタログ定義

まずは、この後定義するテーブルのデータベースをGlueカタログに定義します。
今回はAthenaのクエリエディタで作成しますが、マネジメントコンソールで作成しても構いません。

データベース作成
CREATE DATABASE `sample_partition_100_db`
LOCATION 's3://sample-partition-100-db/'
;

athena_create_database

そして、入力となるCSVファイルをGlueカタログにテーブルとして定義します。

CSVテーブル作成
CREATE EXTERNAL TABLE `sample_partition_100_db`.`partition100_date_csv` (
  id string,
  uuid string,
  target_date string
)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://sample-partition-100-db/partition100_date_sample_data'
TBLPROPERTIES (
  'classification'='csv',
  'columnsOrdered'='false',
  'compressionType'='none',
  'delimiter'=',',
  'skip.header.line.count'='1'
)
;

続いて、出力先となるIcebergテーブルを定義します。
TBLPROPERTIES にて、 'table_type'='ICEBERG' を指定することで、Icebergテーブルとして振舞います。
また、日付カラム target_date を日別にパーティションを組むようにします。

Icebergテーブル作成
CREATE TABLE `sample_partition_100_db`.`partition100_date_iceberg` (
  id string,
  uuid string,
  target_date date
) 
PARTITIONED BY (day(target_date))
LOCATION 's3://sample-partition-100-db/partition100_date_iceberg'
TBLPROPERTIES (
  'table_type'='ICEBERG'
)
;

これで準備ができました。

Athenaでやってみる

IcebergテーブルにPartitionが100を超えるInsert処理を実行します。
Athenaクエリエディタで以下のSQLを実行してみます。

パーティション100超発生のInsert
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
;

すると、以下のようなエラーが表示されるはずです。

ICEBERG_TOO_MANY_OPEN_PARTITIONS: Exceeded limit of 100 open writers for partitions. 

athena_partition_100over_error

Icebergテーブルに対しても、100パーティションを超える追加の際にエラーが発生することがわかりました。

ついでに、CTASでも同様のエラーが発生するか確認します。

Icebergテーブルをdropした後、
以下のSQLを実行します。

パーティション100超発生のCTAS
create table partition100_date_iceberg
with (
  table_type = 'ICEBERG',
  is_external = false,
  location = 's3://sample-partition-100-db/partition100_date_iceberg/',
  partitioning = ARRAY[ 'day(target_date)' ]
) as
select id, uuid, cast(target_date as date) as target_date
from partition100_date_csv
;

実行すると、同様のエラーが発生することがわかります。

athena_partition_100over_error_ctas

対処法

パーティションが100個を超える新規作成を実施する必要がある場合の対処法を考えてみます。

1. 100個を超えないようにクエリを分割する

これはAWSのドキュメントでも案内されている方法です。
パーティションを100個を超えないように新規作成するように範囲を指定して実行します。

例えば、先ほどの INSERT クエリは以下のように分けて実行します。

分割クエリ1
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2023-01-01' AND target_date < '2023-04-11'
;
分割クエリ2
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2023-04-11' AND target_date < '2023-07-20'
;
分割クエリ3
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2023-07-20' AND target_date < '2023-10-28'
;
分割クエリ4
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2023-10-28' AND target_date < '2024-02-05'
;
分割クエリ5
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2024-02-05' AND target_date < '2024-05-15'
;
分割クエリ6
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2024-05-15' AND target_date < '2024-08-23'
;
分割クエリ7
INSERT INTO partition100_date_iceberg
SELECT id, uuid, CAST(target_date AS date)
FROM partition100_date_csv
WHERE target_date >= '2024-08-23' AND target_date < '2024-12-01'
;

上記のクエリはパーティションを新規に100個まで作られる範囲毎に分けています。
このようにすることで、必要なデータが無事Insertされました。

athena_partition100less

2. Glueジョブを使う

初回連携時にデータを一気に投入する場合等、
件数が多い場合等クエリを分割して実行することが困難な場合もあると思います。
その場合はAthenaクエリではなく、例えばGlueジョブを使うと一気に投入することができます。

例えばVisual ETLでジョブを作成すると、以下のようになります。

glue_visual_etl_1

入力はs3から取得。Glue Data CatalogのCSVテーブルを使っておきます。

glue_visual_etl_2

途中、date型への変換用のtransformを入れてます。

glue_visual_etl_3

出力はGlue Data CatalogのIcebergテーブルを指定します。

glue_visual_etl_4

参考までに、Visual ETLで生成されたコードの内容は以下のとおりです。

Visual ETLで生成されたコード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Script generated for node Amazon S3
AmazonS3_node1719451067133 = glueContext.create_dynamic_frame.from_catalog(database="sample_partition_100_db", table_name="partition100_date_csv", transformation_ctx="AmazonS3_node1719451067133")

# Script generated for node Change Schema
ChangeSchema_node1719461645275 = ApplyMapping.apply(frame=AmazonS3_node1719451067133, mappings=[("id", "string", "id", "string"), ("uuid", "string", "uuid", "string"), ("target_date", "string", "target_date", "date")], transformation_ctx="ChangeSchema_node1719461645275")

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1719451091694_df = ChangeSchema_node1719461645275.toDF()
AWSGlueDataCatalog_node1719451091694 = glueContext.write_data_frame.from_catalog(frame=AWSGlueDataCatalog_node1719451091694_df, database="sample_partition_100_db", table_name="partition100_date_iceberg", additional_options={})

job.commit()

3. AthenaのSparkノートブックを使う

他にも、AthenaのクエリではなくSparkノートブックを使う方法があります。
Athenaのノートブックを新規作成しましょう。
その際、Sparkに対応するワークグループがなければ、ワークグループから作成しましょう。

ノートブック作成時には、「Apache Sparkのプロパティ」にて「Apache Iceberg」を選択しましょう。選択時に表示されるキーと値はデフォルトのままでOKです。

athena_spark_setting

ノートブック起動したら、以下のコードを実行します。

PySpark SQLで実行するコード
spark.sql('''
    INSERT INTO sample_partition_100_db.partition100_date_iceberg
    SELECT id, uuid, CAST(target_date AS date)
    FROM sample_partition_100_db.partition100_date_csv
    WHERE id <> 'id'
'''
)

athena_spark_insert_select

この方法でも、新規作成パーティションが100個を超えるデータのInsertは問題なく実行可能です。

ちなみに、入力テーブル partition100_date_csv に設定している、最初のヘッダ行1行を無視する設定 'skip.header.line.count'='1' が、Athena Sparkでの spark.sql() 実行時にはうまく有効化してくれないようです。

athena_spark_select

そのため、今回は WHERE id <> 'id' としてカラム名が入っている行を除くように条件を付与しています。

まとめ

Icebergテーブルでも、Athenaクエリの100パーティション超える追加発生時エラーが起こることがわかりました。
クエリの範囲を絞って100パーティション超える追加が発生しないようにすれば、エラーを回避できます。
また、GlueジョブやAthenaノートブックのSparkを使えば、エラーを回避できます。

この記事が誰かのお役に立てることができれば幸いです。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.