この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
AWSが提供するCost and Usage Report(CUR)にはAWSの利用明細などの請求情報が格納されています。
過去に同様に提供されていたDetailed Billing Report(DBR)との大きな違いの一つとして、DBRがカラムが固定であったことに対してCURではカラムが流動的であることがあります。CURのカラム情報は、マニフェストファイルに格納され、CURの作成と同時に出力されます。
この記事では、カラム情報を持つマニフェストファイルをもとにAthena向けのDDLを作成し、CURの情報をAthenaで検索する方法を紹介します。
DDLを作成するスクリプト
マニフェストファイルからDDLを作成するために、以下のcreate_ddl.py
を作成します。
import re
from pathlib import Path
import json
import boto3
from jinja2 import Environment, FileSystemLoader
TEMPLATE_NAME = "template_ddl.hql.j2"
def create_ddl(manifest_path, profile_name=None):
"""
CURのマニフェストファイルからAthena用のDDLを作成する。
params:
manifest_path : マニフェストファイルのS3パス
profile_name : S3にアクセスする際に利用するプロファイル。指定がなければdefault
"""
session = boto3.Session(profile_name=profile_name)
bucket_name, manifest_path = __parse_s3path(manifest_path)
manifest_data = __get_manifest_data(bucket_name, manifest_path, session)
template = __get_template(TEMPLATE_NAME)
cur_dirpath = Path(manifest_path).parent
columns = manifest_data["columns"]
s3_cur_dirpath = "s3://{}/{}/".format(bucket_name, cur_dirpath)
render_params = {
"s3_cur_dirpath" : s3_cur_dirpath,
"columns" : columns
}
ddl_sql = template.render(**render_params)
return ddl_sql
def __parse_s3path(s3_path):
"""S3のパスをバケット名とオブジェクトのキーに分ける"""
m = re.match(r"s3://(?P<bucket_name>.+?)/(?P<manifest_path>.+)", s3_path)
bucket_name = m.group("bucket_name")
object_key = m.group("manifest_path")
return (bucket_name, object_key)
def __get_manifest_data(bucket_name, manifest_path, session):
"""マニフェストファイルの内容をdictで返す"""
s3 = session.client("s3")
object_body = s3.get_object(Bucket=bucket_name, Key=manifest_path)["Body"]
manifest_data = json.loads(object_body.read().decode())
return manifest_data
def __get_template(template_name):
"""DDLのテンプレートを返す"""
env = Environment(loader=FileSystemLoader("."))
template = env.get_template(template_name)
return template
if __name__ == '__main__':
import argparse
from create_ddl import create_ddl
parser = argparse.ArgumentParser(
description="CURのマニフェストファイルからAthena用のDDLを作成する")
parser.add_argument('s3_manifest_path')
parser.add_argument('--profile')
args = parser.parse_args()
s3_manifest_path = args.s3_manifest_path
profile_name = args.profile
template_dir = "."
ddl = create_ddl(s3_manifest_path, profile_name)
print(ddl)
また、上記のスクリプトとは別に、DDL用のテンプレート(template_ddl.hql.j2
)を作成します。
CREATE EXTERNAL TABLE cur(
{%- for column in columns %}
{{column.category}}_{{column.name | replace(":", "_")}} STRING{{"," if not loop.last }}
{%- endfor %}
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '\"',
'escapeChar' = '\\'
)LOCATION '{{s3_cur_dirpath}}'
TBLPROPERTIES ('has_encrypted_data'='false')
;
DDLの出力の仕方
上記の2つのファイルを同じディレクトリに配置したのち、以下の要領でコマンドを実行するとDDLが出力されます。
$ python create_ddl.py --profile <profile_name> <s3_manifest_path>
CREATE EXTERNAL TABLE cur(
identity_LineItemId STRING,
identity_TimeInterval STRING,
bill_InvoiceId STRING,
...
CURをAthenaで検索する際の注意点
カラム名の重複
Athenaでは、カラム名の英字は全て小文字に変換されるため、マニフェストファイルではユニークなカラムがAthenaでは重複したカラム名だと解釈される場合があります。この場合は、DDLのカラム名を適宜修正する必要があります。
CSVヘッダがデータに残る
Athenaでは、CSVをロードする際にヘッダ行を無視することができないため、単純にDDLを実行しただけではCSVヘッダがレコードとして残ってしまいます。このため、CURを事前にETLするか、検索の際にヘッダ行を含めないように変更する必要があります。