S3上のCSVをAthenaでクエリ操作してみた(Boto3対応)
データアナリティクス事業本部のueharaです。
今回はS3上のCSVファイルに対して、Athenaでテーブル作成や抽出といったクエリによる操作を実施したいと思います。
Athenaを使う手順ですが、ざっくりと下記の3段階になります。
- データをS3に保存
- テーブルを定義
- クエリを実行
CSVファイルの用意
今回は簡単ですが次のようなCSVファイルを用意します。
id,name,price 1,note,100 2,pen,80 3,eraser,120
バケットの作成
まず「athena-test-uehara」という名前のS3バケットを作成し、その配下に下記2つのフォルダを作成します。
- data: csvファイル用
- athena-output: Athenaのクエリ保存用
CSVファイルのアップロード
先に作成したdataに、products.csvをアップロードします。
DB・テーブル作成
まず始めに、SQLを実行するために実行結果の保存先を設定する必要があるので、先程作成した athena-output を設定します。 ※Athenaのコンソール画面の「設定 -> クエリの結果と暗号化の設定」で設定。
次に、適当な名前でDBを作成します。(Athenaの画面上で実行)
CREATE DATABASE uehara_test_db;
DBの作成が完了すると、画面上で作成したDBが選択できるようになるので設定を行います。
DBが作成できたら、次はいよいよテーブルを作成します。
Athenaのテーブル定義に関するDDLはこちらで確認可能です。
今回はシンプルなCSVなので、次のように定義してみます。(Athenaの画面上で実行)
CREATE EXTERNAL TABLE products ( id int, name string, price int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' LOCATION 's3://athena-test-uehara/data/' TBLPROPERTIES ( 'has_encrypted_data'='false', 'skip.header.line.count'='1' );
パラメーターの補足
ROW FORMAT
では通常SerDe(シリアライザー/デシリアライザー)を指定しますが、今回は DELIMITED
のみの指定をしています。
DELIMITED
のみを指定した場合、 SerDe はorg.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
となります。
LOCATION 's3://athena-test-uehara/data/'
はcsvファイルを格納しているプレフィックスを指定しています。
TBLPROPERTIES
で 'skip.header.line.count'='1'
を指定していますが、これはcsvファイルのヘッダを読み飛ばすための指定です。
今回はテーブル作成時のcsvファイルがヘッダ付のためskipの設定をしていますが、テーブルへのINSERT時には注意が必要です。(詳細は後述)
テーブルの中身を覗いてみる
テーブルの作成までできたら、データが読み込めているか確認を行います。(Athenaの画面上で実行)
SELECT * FROM uehara_test_db.products ORDER BY id;
正しくテーブルが作成できていた場合は、データ取得ができることを確認できると思います。
Boto3(Python) でクエリを発行してみる
先程のSELECTをクエリを、今度はPythonから発行してみたいと思います。
全体のプログラムとしては次の通りです。
import time import boto3 import pandas as pd class Athena(object): def __init__(self, session=None): self.client = session.client('athena') def execute_sql(self, sql): athena_client = self.client exec_id = athena_client.start_query_execution( QueryString=sql, ResultConfiguration={ 'OutputLocation': 's3://athena-test-uehara/athena-output' }, WorkGroup='primary' )['QueryExecutionId'] print('Athena Execution ID: {}'.format(exec_id)) print('query: ') print(sql) status = athena_client.get_query_execution(QueryExecutionId=exec_id)['QueryExecution']['Status'] # ポーリング while status['State'] not in ['SUCCEEDED', 'FAILED', 'CANCELLED']: print('{}: wait query running...'.format(status['State'])) time.sleep(5) status = athena_client.get_query_execution(QueryExecutionId=exec_id)['QueryExecution']['Status'] return status['State'], exec_id def main(): session = boto3.session.Session() athena = Athena(session) sql = 'SELECT * FROM uehara_test_db.products;' status, exec_id = athena.execute_sql(sql) if status == 'SUCCEEDED': s3_client = session.client('s3') body = s3_client.get_object(Bucket='athena-test-uehara', Key='athena-output/{0}.csv'.format(exec_id))['Body'] df = pd.read_csv(body, lineterminator='n') print('[result]') print(df) if __name__ == '__main__': main()
実行結果
要点の解説
start_query_execution()
では、実行結果の保存先やワークグループを指定しています。
exec_id = athena_client.start_query_execution( QueryString=sql, ResultConfiguration={ 'OutputLocation': 's3://athena-test-uehara/athena-output' }, WorkGroup='primary' )['QueryExecutionId']
また、start_query_execution()
は非同期のため、ステータスが返るまでポーリングを行っています。
# ポーリング while status['State'] not in ['SUCCEEDED', 'FAILED', 'CANCELLED']: print('{}: wait query running...'.format(status['State'])) time.sleep(5) status = athena_client.get_query_execution(QueryExecutionId=exec_id)['QueryExecution']['Status']
実行結果はS3の指定バケットへ保存されるため、AthenaのExecution IDから実行結果のcsvを参照しています。
※pandasのDataFrameで読み込んで表示
s3_client = session.client('s3') body = s3_client.get_object(Bucket='athena-test-uehara', Key='athena-output/{0}.csv'.format(exec_id))['Body'] df = pd.read_csv(body, lineterminator='n') print('[result]') print(df)
INSERT時の注意点
ここでは 'skip.header.line.count'='1'
としたときの、INSERT時の注意点を記載します。
試しに、次のようなSQLをAthenaの画面上で実行してみます。
INSERT INTO uehara_test_db.products VALUES (4,'case',1500), (5,'ruler',250);
データのINSERTが完了したら、再度productsテーブルの中身を全件表示させてみます。
'case', 'ruler' のどちらも表示されると思いきや、表示されるのは'ruler'のみとなっています。
確認のため、INSERTクエリによって挿入されたデータを確認してみます。
最初に作成したdataバケットに新たに .gz ファイルが作成されているかと思いますが、それが今回のINSERTにより作成されたファイルです。
.gz ファイルをローカルにダウンロードし、展開して中身を確認してみます。
こちらを見ると、たしかに'case'のデータも挿入されています。
このように、'skip.header.line.count'='1'
を指定することにより、INSERT等により新たに挿入されたデータに対しても1行目がskipされるため、注意が必要です。
skip.header.line.count
について、詳細は下記のブログも参考にしてみて下さい。
補足
今回は初めからバケット内にcsvファイルが格納されているところから始めましたが、これは必須要件ではありません。
例えば、まず「data2」という空っぽのバケットを用意し、空のテーブルから作成してデータをINSERTしていくという処理でも問題ありません。
CREATE EXTERNAL TABLE products ( id int, name string, price int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' LOCATION 's3://athena-test-uehara/data2/' # ここが空でも問題ない TBLPROPERTIES ( 'has_encrypted_data'='false', );