S3のアップロードイベントでAWS Athenaのパーティションを設定してみた

S3のアップロードイベントでAWS Athenaのパーティションを設定してみた

Clock Icon2017.10.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

DI部のおおたきです。AWS Athenaを使う際にS3上のファイル数が多くなるようならパーテションの機能は必須です。 S3にHiveフォーマットのパスでアップロードされていればパーテションの設定は不要ですがそうでない場合もあると思います。 その場合はアップロードされたパスに対してalter tableでパーテションを追加する必要があります。 パーテションについてはこちらの記事に詳しく記載があるので参照してください。 そこで今回、S3にファイルがアップロードされたイベントをトリガーにAWS Lambdaでパーテションを設定するプログラムを書いてみました。

実装してみる

Athenaへのアクセスは発表当初はjdbc接続に対応していましたが、現在はpythonのboto3からも接続可能となっています。そのため今回はpythonで実装してみました。 まず、アップロードするファイルのS3のパスは以下のようにします。

{バケット}
    └─{テーブル名}       
        └─yyyy
            └─mm
                └─dd
                   xxxxxx.csv

yyyy/mm/dd/の部分にパーテションを設定することで、検索時に特定日付の配下のファイルのみをスキャンできるようにします。 今回作成したテーブルのDDLは以下になります。パーティションにyear,month ,dayを設定しています。

CREATE EXTERNAL TABLE IF NOT EXISTS testootakidb.customer (
  `id` string,
  `name` string 
) PARTITIONED BY (
  year string,
  month string,
  day string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://xxxxxxx/customer/'
TBLPROPERTIES ('has_encrypted_data'='false');

次にLambdaのソースコードはこんな感じです。S3のPUTイベントで発火するように設定します。

import boto3
import time

client = boto3.client('athena')

def lambda_handler(event, context):
    
    for record in event["Records"]:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        key_tmp = key.split('/')
        table_name = key_tmp[0]
        year = key_tmp[1]
        month = key_tmp[2]
        day = key_tmp[3]
        path = table_name + '/' + year + '/' + month + '/' + day + '/'

    sql = 'ALTER TABLE {} ADD IF NOT EXISTS PARTITION (year=\'{}\',month=\'{}\',day=\'{}\') location \'{}\''.format \
        (table_name, year, month , day,'s3://' + bucket + '/' + path)
    print('sql=' + sql)
    client.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={
            'Database': 'testootakidb'
        },
        ResultConfiguration={
            'OutputLocation': 's3://test-ootaki/athena-output'
        }
    )

実装としては単純でstart_query_executionでalter文を実行しているだけです。alter文にIF NOT EXISTSを設定することで既にパーテションが作成されていたら無視するようにしています。

実行してみる

以下のS3のパスにファイルをアップロードしてみます。 s3://{バケット}/customer/2017/10/24/cusotmer_20171024.csv パーテションが設定されたか確認してみます。パーテションの確認はshow partetions <テーブル名>で確認することができます。 マネージメントコンソールのQuery Editorから実行してみます。 show_partitions パーテションが追加されたのが確認できました。

まとめ

いかがだったでしょうか。Athenaのパーテションの追加は面倒だったりするのでLambdaからpythonを使って設定することで簡単に対応ができるかと思います。今回はS3のファイルアップロードイベントで実行しましたが、CloudWatchのイベントにLambdaを登録して定期実行させるのもいいかもしれません。 今回は以上です。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.