CURのマニフェストファイルからAthena向けのDDLを作成する

CURのマニフェストファイルからAthena向けのDDLを作成する

Clock Icon2017.12.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWSが提供するCost and Usage Report(CUR)にはAWSの利用明細などの請求情報が格納されています。

過去に同様に提供されていたDetailed Billing Report(DBR)との大きな違いの一つとして、DBRがカラムが固定であったことに対してCURではカラムが流動的であることがあります。CURのカラム情報は、マニフェストファイルに格納され、CURの作成と同時に出力されます。

この記事では、カラム情報を持つマニフェストファイルをもとにAthena向けのDDLを作成し、CURの情報をAthenaで検索する方法を紹介します。

DDLを作成するスクリプト

マニフェストファイルからDDLを作成するために、以下のcreate_ddl.pyを作成します。

import re
from pathlib import Path

import json
import boto3
from jinja2 import Environment, FileSystemLoader

TEMPLATE_NAME = "template_ddl.hql.j2"

def create_ddl(manifest_path, profile_name=None):
    """
    CURのマニフェストファイルからAthena用のDDLを作成する。
    params:
        manifest_path : マニフェストファイルのS3パス
        profile_name : S3にアクセスする際に利用するプロファイル。指定がなければdefault
    """
    session = boto3.Session(profile_name=profile_name)

    bucket_name, manifest_path = __parse_s3path(manifest_path)

    manifest_data = __get_manifest_data(bucket_name, manifest_path, session)
    template = __get_template(TEMPLATE_NAME)

    cur_dirpath = Path(manifest_path).parent

    columns = manifest_data["columns"]
    s3_cur_dirpath = "s3://{}/{}/".format(bucket_name, cur_dirpath)

    render_params = {
        "s3_cur_dirpath" : s3_cur_dirpath,
        "columns" : columns
    }
    ddl_sql = template.render(**render_params)

    return ddl_sql

def __parse_s3path(s3_path):
    """S3のパスをバケット名とオブジェクトのキーに分ける"""
    m = re.match(r"s3://(?P<bucket_name>.+?)/(?P<manifest_path>.+)", s3_path)

    bucket_name = m.group("bucket_name")
    object_key = m.group("manifest_path")

    return (bucket_name, object_key)

def __get_manifest_data(bucket_name, manifest_path, session):
    """マニフェストファイルの内容をdictで返す"""
    s3 = session.client("s3")

    object_body = s3.get_object(Bucket=bucket_name, Key=manifest_path)["Body"]
    manifest_data = json.loads(object_body.read().decode())
    return manifest_data

def __get_template(template_name):
    """DDLのテンプレートを返す"""
    env = Environment(loader=FileSystemLoader("."))
    template = env.get_template(template_name)
    return template

if __name__ == '__main__':
    import argparse

    from create_ddl import create_ddl

    parser = argparse.ArgumentParser(
        description="CURのマニフェストファイルからAthena用のDDLを作成する")

    parser.add_argument('s3_manifest_path')
    parser.add_argument('--profile')

    args = parser.parse_args()

    s3_manifest_path = args.s3_manifest_path
    profile_name = args.profile
    template_dir = "."

    ddl = create_ddl(s3_manifest_path, profile_name)

    print(ddl)

また、上記のスクリプトとは別に、DDL用のテンプレート(template_ddl.hql.j2)を作成します。

CREATE EXTERNAL TABLE cur(
  {%- for column in columns %}
    {{column.category}}_{{column.name | replace(":", "_")}} STRING{{"," if not loop.last }}
  {%- endfor %}
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   'separatorChar' = ',',
   'quoteChar' = '\"',
   'escapeChar' = '\\'
)LOCATION '{{s3_cur_dirpath}}'
TBLPROPERTIES ('has_encrypted_data'='false')
;

DDLの出力の仕方

上記の2つのファイルを同じディレクトリに配置したのち、以下の要領でコマンドを実行するとDDLが出力されます。

$ python create_ddl.py --profile <profile_name> <s3_manifest_path>
CREATE EXTERNAL TABLE cur(
    identity_LineItemId STRING,
    identity_TimeInterval STRING,
    bill_InvoiceId STRING,
...

CURをAthenaで検索する際の注意点

カラム名の重複

Athenaでは、カラム名の英字は全て小文字に変換されるため、マニフェストファイルではユニークなカラムがAthenaでは重複したカラム名だと解釈される場合があります。この場合は、DDLのカラム名を適宜修正する必要があります。

CSVヘッダがデータに残る

Athenaでは、CSVをロードする際にヘッダ行を無視することができないため、単純にDDLを実行しただけではCSVヘッダがレコードとして残ってしまいます。このため、CURを事前にETLするか、検索の際にヘッダ行を含めないように変更する必要があります。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.