Athenaのパーティションを事前に一括作成する方法

AthenaでHive互換のパーティションに対応していないログに一括でパーティションを作成します。ALTER TABLEを都度適用する運用負荷が軽減できます。
2020.03.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Hive互換になっていないログなどでは ALTER TABLE ADD PARTITIONを実行する必要があります。新しくデータが保存されたタイミングで作成するのは、運用の中で行うのは面倒です。実はデータがなくでもパーティションを事前に作成しておくことができるので、今回は一括でパーティションを作成するスクリプトを作成してみました。これを使って一年先までのパーティションを作成してみたいと思います。

今回はVPCフローログをサンプルデータとして使用します。

テーブルを作成

VPCフローログは下記の形式で出力されますので、year/month/day を利用してパーティション分割したいと思います。

bucket_ARN/optional_folder/AWSLogs/aws_account_id/vpcflowlogs/region/year/month/day/log_file_name.log.gz

下記のCREATE文を実行してAthenaテーブルを作成します。テーブル名やログのパスは適宜変更してください。

create_vpc_flow_logs.sql

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
  version int,
  account string,
  interfaceid string,
  sourceaddress string,
  destinationaddress string,
  sourceport int,
  destinationport int,
  protocol int,
  numpackets int,
  numbytes bigint,
  starttime int,
  endtime int,
  action string,
  logstatus string,
  vpcid string,
  subnetid string,
  instanceid string,
  tcpflags int,
  traffictype string,
  packetsourceaddress string,
  packetdestinationaddress string
)  
PARTITIONED BY (year int, month int, day int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/'
TBLPROPERTIES ("skip.header.line.count"="1");

パーティションの一括作成

下記のスクリプトを実行してパーティションを作成します。2020/01/01 ~ 2020/12/31 のパーティションを作成しています。レートリミットを考慮してクエリは同期実行しています。

create_partitions.py

#!/usr/bin/env python
import boto3
import time
from datetime import datetime
from datetime import timedelta

athena = boto3.client('athena')
""" :type : pyboto3.athena """

dbname = 'default'
table_name = 'vpc_flow_logs'
result_location = 's3://aws-athena-query-results-xxxxxxxxxxx-ap-northeast-1'
bucket = 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/'
start = datetime(2020, 1, 1)
end = datetime(2021, 1, 1)

MAX_RETRY = 10

days = (end - start).days
for i in range(0, days):
    dt = start + timedelta(days=i)
    sql = 'ALTER TABLE {} ADD IF NOT EXISTS PARTITION (year={},month={},day={}) location \'{}\'' \
        .format(table_name, dt.year, dt.month, dt.day, bucket + dt.strftime('%Y/%m/%d'))

    query = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={'Database': dbname},
        ResultConfiguration={'OutputLocation': result_location}
    )
    query_execution_id = query['QueryExecutionId']

    counter = 1
    while True:
        time.sleep(1)
        query_execution = athena.get_query_execution(QueryExecutionId=query_execution_id)
        state = query_execution['QueryExecution']['Status']['State']

        if state == 'SUCCEEDED':
            print(state + ': ' + sql)
            break
        elif state == 'FAILED':
            print(state + ': ' + sql)
            break
        elif counter >= MAX_RETRY:
            print('Retry Error: ' + sql)
            break
        else:
            counter += 1
            continue

実行結果

下記のようになっていればパーティションが作成されています。

SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=1,day=1) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/01/01'
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=1,day=2) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/01/02'
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=1,day=3) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/01/03'
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=1,day=4) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/01/04'
・
・
・
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=12,day=28) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/12/28'
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=12,day=29) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/12/29'
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=12,day=30) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/12/30'
SUCCEEDED: ALTER TABLE vpc_flow_logs ADD IF NOT EXISTS PARTITION (year=2020,month=12,day=31) location 's3://your_log_bucket/prefix/AWSLogs/{subscribe_account_id}/vpcflowlogs/{region_code}/2020/12/31'

下記のクエリでパーティションを確認できます。

SHOW PARTITIONS vpc_flow_logs;

まとめ

簡単にパーティションを一括で作成することができました。1年と言わず数年分作成してもよいでしょう。少しカスタマイズすれば1時間単位のパーティション作成も簡単にできます。

運用負荷の軽減に役立てば幸いです。