S3上のCSVをAthenaでクエリ操作してみた(Boto3対応)

2022.10.13

データアナリティクス事業本部のueharaです。

今回はS3上のCSVファイルに対して、Athenaでテーブル作成や抽出といったクエリによる操作を実施したいと思います。

Athenaを使う手順ですが、ざっくりと下記の3段階になります。

  1. データをS3に保存
  2. テーブルを定義
  3. クエリを実行

CSVファイルの用意

今回は簡単ですが次のようなCSVファイルを用意します。

products.csv

id,name,price
1,note,100
2,pen,80
3,eraser,120

バケットの作成

まず「athena-test-uehara」という名前のS3バケットを作成し、その配下に下記2つのフォルダを作成します。

  • data: csvファイル用
  • athena-output: Athenaのクエリ保存用

CSVファイルのアップロード

先に作成したdataに、products.csvをアップロードします。

DB・テーブル作成

まず始めに、SQLを実行するために実行結果の保存先を設定する必要があるので、先程作成した athena-output を設定します。 ※Athenaのコンソール画面の「設定 -> クエリの結果と暗号化の設定」で設定。

次に、適当な名前でDBを作成します。(Athenaの画面上で実行)

CREATE DATABASE uehara_test_db;

DBの作成が完了すると、画面上で作成したDBが選択できるようになるので設定を行います。

DBが作成できたら、次はいよいよテーブルを作成します。

Athenaのテーブル定義に関するDDLはこちらで確認可能です。

今回はシンプルなCSVなので、次のように定義してみます。(Athenaの画面上で実行)

CREATE EXTERNAL TABLE products (
id int, 
name string, 
price int)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      ESCAPED BY '\\'
      LINES TERMINATED BY '\n'
LOCATION
's3://athena-test-uehara/data/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
);

パラメーターの補足

ROW FORMAT では通常SerDe(シリアライザー/デシリアライザー)を指定しますが、今回は DELIMITED のみの指定をしています。 DELIMITED のみを指定した場合、 SerDe はorg.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe となります。

LOCATION 's3://athena-test-uehara/data/' はcsvファイルを格納しているプレフィックスを指定しています。

TBLPROPERTIES'skip.header.line.count'='1' を指定していますが、これはcsvファイルのヘッダを読み飛ばすための指定です。 今回はテーブル作成時のcsvファイルがヘッダ付のためskipの設定をしていますが、テーブルへのINSERT時には注意が必要です。(詳細は後述)

テーブルの中身を覗いてみる

テーブルの作成までできたら、データが読み込めているか確認を行います。(Athenaの画面上で実行)

SELECT * FROM uehara_test_db.products
ORDER BY id;

正しくテーブルが作成できていた場合は、データ取得ができることを確認できると思います。

Boto3(Python) でクエリを発行してみる

先程のSELECTをクエリを、今度はPythonから発行してみたいと思います。  

全体のプログラムとしては次の通りです。

athena-query.py

import time

import boto3
import pandas as pd


class Athena(object):
    def __init__(self, session=None):
        self.client = session.client('athena')

    def execute_sql(self, sql):
        athena_client = self.client

        exec_id = athena_client.start_query_execution(
            QueryString=sql,
            ResultConfiguration={
                'OutputLocation': 's3://athena-test-uehara/athena-output'
            },
            WorkGroup='primary'
        )['QueryExecutionId']

        print('Athena Execution ID: {}'.format(exec_id))
        print('query: ')
        print(sql)

        status = athena_client.get_query_execution(QueryExecutionId=exec_id)['QueryExecution']['Status']

        # ポーリング
        while status['State'] not in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            print('{}: wait query running...'.format(status['State']))
            time.sleep(5)
            status = athena_client.get_query_execution(QueryExecutionId=exec_id)['QueryExecution']['Status']

        return status['State'], exec_id


def main():
    session = boto3.session.Session()

    athena = Athena(session)

    sql = 'SELECT * FROM uehara_test_db.products;'

    status, exec_id = athena.execute_sql(sql)

    if status == 'SUCCEEDED':
        s3_client = session.client('s3')
        body = s3_client.get_object(Bucket='athena-test-uehara',
                                    Key='athena-output/{0}.csv'.format(exec_id))['Body']

        df = pd.read_csv(body, lineterminator='n')

        print('[result]')
        print(df)


if __name__ == '__main__':
    main()

実行結果

要点の解説

start_query_execution() では、実行結果の保存先やワークグループを指定しています。

exec_id = athena_client.start_query_execution(
    QueryString=sql,
    ResultConfiguration={
        'OutputLocation': 's3://athena-test-uehara/athena-output'
    },
    WorkGroup='primary'
)['QueryExecutionId']

また、start_query_execution()は非同期のため、ステータスが返るまでポーリングを行っています。

# ポーリング
while status['State'] not in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
    print('{}: wait query running...'.format(status['State']))
    time.sleep(5)
    status = athena_client.get_query_execution(QueryExecutionId=exec_id)['QueryExecution']['Status']

実行結果はS3の指定バケットへ保存されるため、AthenaのExecution IDから実行結果のcsvを参照しています。
※pandasのDataFrameで読み込んで表示

s3_client = session.client('s3')
body = s3_client.get_object(Bucket='athena-test-uehara',
                            Key='athena-output/{0}.csv'.format(exec_id))['Body']

df = pd.read_csv(body, lineterminator='n')

print('[result]')
print(df)

INSERT時の注意点

ここでは 'skip.header.line.count'='1'としたときの、INSERT時の注意点を記載します。

試しに、次のようなSQLをAthenaの画面上で実行してみます。

INSERT INTO uehara_test_db.products
VALUES (4,'case',1500), (5,'ruler',250);

データのINSERTが完了したら、再度productsテーブルの中身を全件表示させてみます。

'case', 'ruler' のどちらも表示されると思いきや、表示されるのは'ruler'のみとなっています。

確認のため、INSERTクエリによって挿入されたデータを確認してみます。

最初に作成したdataバケットに新たに .gz ファイルが作成されているかと思いますが、それが今回のINSERTにより作成されたファイルです。

.gz ファイルをローカルにダウンロードし、展開して中身を確認してみます。

こちらを見ると、たしかに'case'のデータも挿入されています。

このように、'skip.header.line.count'='1'を指定することにより、INSERT等により新たに挿入されたデータに対しても1行目がskipされるため、注意が必要です。

skip.header.line.countについて、詳細は下記のブログも参考にしてみて下さい。

Amazon Athenaがヘッダ行のスキップをサポートしました!

補足

今回は初めからバケット内にcsvファイルが格納されているところから始めましたが、これは必須要件ではありません。

例えば、まず「data2」という空っぽのバケットを用意し、空のテーブルから作成してデータをINSERTしていくという処理でも問題ありません。

CREATE EXTERNAL TABLE products (
id int, 
name string, 
price int)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      ESCAPED BY '\\'
      LINES TERMINATED BY '\n'
LOCATION
's3://athena-test-uehara/data2/' # ここが空でも問題ない
TBLPROPERTIES (
  'has_encrypted_data'='false',
);