CURのマニフェストファイルからAthena向けのDDLを作成する
はじめに
AWSが提供するCost and Usage Report(CUR)にはAWSの利用明細などの請求情報が格納されています。
過去に同様に提供されていたDetailed Billing Report(DBR)との大きな違いの一つとして、DBRがカラムが固定であったことに対してCURではカラムが流動的であることがあります。CURのカラム情報は、マニフェストファイルに格納され、CURの作成と同時に出力されます。
この記事では、カラム情報を持つマニフェストファイルをもとにAthena向けのDDLを作成し、CURの情報をAthenaで検索する方法を紹介します。
DDLを作成するスクリプト
マニフェストファイルからDDLを作成するために、以下のcreate_ddl.py
を作成します。
import re from pathlib import Path import json import boto3 from jinja2 import Environment, FileSystemLoader TEMPLATE_NAME = "template_ddl.hql.j2" def create_ddl(manifest_path, profile_name=None): """ CURのマニフェストファイルからAthena用のDDLを作成する。 params: manifest_path : マニフェストファイルのS3パス profile_name : S3にアクセスする際に利用するプロファイル。指定がなければdefault """ session = boto3.Session(profile_name=profile_name) bucket_name, manifest_path = __parse_s3path(manifest_path) manifest_data = __get_manifest_data(bucket_name, manifest_path, session) template = __get_template(TEMPLATE_NAME) cur_dirpath = Path(manifest_path).parent columns = manifest_data["columns"] s3_cur_dirpath = "s3://{}/{}/".format(bucket_name, cur_dirpath) render_params = { "s3_cur_dirpath" : s3_cur_dirpath, "columns" : columns } ddl_sql = template.render(**render_params) return ddl_sql def __parse_s3path(s3_path): """S3のパスをバケット名とオブジェクトのキーに分ける""" m = re.match(r"s3://(?P<bucket_name>.+?)/(?P<manifest_path>.+)", s3_path) bucket_name = m.group("bucket_name") object_key = m.group("manifest_path") return (bucket_name, object_key) def __get_manifest_data(bucket_name, manifest_path, session): """マニフェストファイルの内容をdictで返す""" s3 = session.client("s3") object_body = s3.get_object(Bucket=bucket_name, Key=manifest_path)["Body"] manifest_data = json.loads(object_body.read().decode()) return manifest_data def __get_template(template_name): """DDLのテンプレートを返す""" env = Environment(loader=FileSystemLoader(".")) template = env.get_template(template_name) return template if __name__ == '__main__': import argparse from create_ddl import create_ddl parser = argparse.ArgumentParser( description="CURのマニフェストファイルからAthena用のDDLを作成する") parser.add_argument('s3_manifest_path') parser.add_argument('--profile') args = parser.parse_args() s3_manifest_path = args.s3_manifest_path profile_name = args.profile template_dir = "." ddl = create_ddl(s3_manifest_path, profile_name) print(ddl)
また、上記のスクリプトとは別に、DDL用のテンプレート(template_ddl.hql.j2
)を作成します。
CREATE EXTERNAL TABLE cur( {%- for column in columns %} {{column.category}}_{{column.name | replace(":", "_")}} STRING{{"," if not loop.last }} {%- endfor %} ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = ',', 'quoteChar' = '\"', 'escapeChar' = '\\' )LOCATION '{{s3_cur_dirpath}}' TBLPROPERTIES ('has_encrypted_data'='false') ;
DDLの出力の仕方
上記の2つのファイルを同じディレクトリに配置したのち、以下の要領でコマンドを実行するとDDLが出力されます。
$ python create_ddl.py --profile <profile_name> <s3_manifest_path> CREATE EXTERNAL TABLE cur( identity_LineItemId STRING, identity_TimeInterval STRING, bill_InvoiceId STRING, ...
CURをAthenaで検索する際の注意点
カラム名の重複
Athenaでは、カラム名の英字は全て小文字に変換されるため、マニフェストファイルではユニークなカラムがAthenaでは重複したカラム名だと解釈される場合があります。この場合は、DDLのカラム名を適宜修正する必要があります。
CSVヘッダがデータに残る
Athenaでは、CSVをロードする際にヘッダ行を無視することができないため、単純にDDLを実行しただけではCSVヘッダがレコードとして残ってしまいます。このため、CURを事前にETLするか、検索の際にヘッダ行を含めないように変更する必要があります。