
AWS Python SDKからAthenaにクエリして結果をS3からローカルに保存
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Amazon Athena を使うと、Amazon S3内のデータに爆速でSQLで問い合わせられます。
便利ではあるのですが、管理コンソールから操作すると
- SQL の記載
- SQL の実行
- 完了まで待つ
- ダウンロードリンクからSQLの実行結果を保存
と待ち時間が長く、まとめて実行した SQL が多くある場合は、少しばかり手間です。
このような問題を解決するため、API を使って SQL の実行から SQL の実行結果の保存までを API で行うスクリプトを紹介します。
実行イメージ
$ cat foo.sql
select count(*)
from bar
$ export AWS_DEFAULT_PROFILE=test
$ python athena.py foo.sql
$ ls -1
athena.log
athena.py
foo.sql # クエリーする SQL
foo.sql.csv # クエリー結果
$ cat foo.sql.csv # クエリー結果を確認
"_col0"
"1234"
解説
主要なポイントだけかいつまんで説明します。
プログラム全体はブログ末尾を参照下さい
Amazon Athena にクエリーを投げる
Amazon Athena にクエリーを投げるには start_query_execution API を使います。
- QueryString : SQL 文 を指定します。
- QueryExecutionContext : クエリー先のデータベース名を指定します
- ResultConfiguration : クエリーの保存先を指定します。AMCから利用する場合、デフォルトは「aws-athena-query-results--」です。
result = athena.start_query_execution(
QueryString = sql,
QueryExecutionContext = {
'Database': DATABASE_NAME
},
ResultConfiguration = {
'OutputLocation': 's3://' + S3BUCKET_NAME,
}
)
クエリーの実行ステータスを確認
クエリーの実行ステータスを確認するには get_query_execution API を使います。
- QueryExecutionId :
start_query_execution
実行時に発行される ID を指定します。
レスポンス例です。
{
"QueryExecution": {
"Query": "string",
"QueryExecutionContext": {
"Database": "string"
},
"QueryExecutionId": "string",
"ResultConfiguration": {
"EncryptionConfiguration": {
"EncryptionOption": "string",
"KmsKey": "string"
},
"OutputLocation": "string"
},
"Statistics": {
"DataScannedInBytes": number,
"EngineExecutionTimeInMillis": number
},
"Status": {
"CompletionDateTime": number,
"State": "string",
"StateChangeReason": "string",
"SubmissionDateTime": number
}
}
}
QueryExecution -> Status -> State が開始直後の RUNNING
から SUCCEEDED
に遷移するまでポーリングします。
サンプルプログラムでは retrying モジュールを使い exponential backoff を使いながらポーリングしています。
retrying モジュールの使い方は、次の記事を参照下さい。
クエリー結果を S3 からローカル環境にコピー
Athena のクエリー結果は S3 に保存されます。
この保存先パスは s3://{OutputLocation}//{QueryExecutionId}.csv
となります。
S3 の API で S3 からローカルサーバーにコピーします。
プログラム全体
以上の処理をまとめたプログラムが以下です
#!/usr/bin/env python | |
# vim: set fileencoding=utf8 : | |
``` | |
$ pip install -U boto3 retrying | |
$ export AWS_DEFAULT_PROFILE=test | |
$ cat foo.sql | |
select count(*) | |
from bar | |
$ python athena.py foo.sql | |
$ ls -1 | |
athena.log # program log | |
athena.py # main program | |
foo.sql # sql | |
foo.sql.csv # query result | |
$ cat foo.sql.csv # check query result | |
"_col0" | |
"1234" | |
''' | |
import logging | |
import pprint | |
import sys | |
import boto3 | |
from retrying import retry | |
logging.basicConfig(filename='athena.log',level=logging.INFO) | |
athena = boto3.client('athena') | |
s3 = boto3.resource('s3') | |
S3BUCKET_NAME = 'XXX' | |
DATABASE_NAME = 'YYY' | |
@retry(stop_max_attempt_number = 10, | |
wait_exponential_multiplier = 30 * 1000, | |
wait_exponential_max = 10 * 60 * 1000) | |
def poll_status(_id): | |
''' | |
poll query status | |
''' | |
result = athena.get_query_execution( | |
QueryExecutionId = _id | |
) | |
logging.info(pprint.pformat(result['QueryExecution'])) | |
state = result['QueryExecution']['Status']['State'] | |
if state == 'SUCCEEDED': | |
return result | |
elif state == 'FAILED': | |
return result | |
else: | |
raise Exception | |
def query_to_athena(filename): | |
sql = open(filename, 'r').read() | |
result = athena.start_query_execution( | |
QueryString = sql, | |
QueryExecutionContext = { | |
'Database': DATABASE_NAME | |
}, | |
ResultConfiguration = { | |
'OutputLocation': 's3://' + S3BUCKET_NAME, | |
} | |
) | |
logging.info(pprint.pformat(result)) | |
QueryExecutionId = result['QueryExecutionId'] | |
result = poll_status(QueryExecutionId) | |
# save response | |
with open(filename + '.log', 'w') as f: | |
f.write(pprint.pformat(result, indent = 4)) | |
# save query result from S3 | |
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED': | |
s3_key = QueryExecutionId + '.csv' | |
local_filename = filename + '.csv' | |
s3.Bucket(S3BUCKET_NAME).download_file(s3_key, local_filename) | |
def main(): | |
for filename in sys.argv[1:]: | |
try: | |
query_to_athena(filename) | |
except Exception, err: | |
print err | |
if __name__ == '__main__': | |
main() |
改善ポイント
今回したプログラムはクエリーを一つずつシーケンシャルに処理しています。
- SQL の並列上限(デフォルトは5)を超えない範囲で並列実行し、 複数の実行ステータスを BatchGetQueryExecution でまとめて確認
- S3バケット、データベース名 を引数で渡せるようにする
などすれば、もう少し使いやすくなると思います。
AWS CLI がクエリー処理完了をチェックする waiter に対応すると、ポーリング処理をよりシンプルにかけるようになります。でこの議論は以下のイシューで進行中です
- https://github.com/boto/boto3/issues/1212
- https://github.com/boto/botocore/pull/1358
CloudWatch Events が Athena のクエリーステータスと連動すると、ポーリングの代わりにイベントドリブンな処理を書けるようになります。
まとめ
今回は REST API を使い、以下の処理を行うプログラムを作成しました。
- Athena にクエリーを投げる
- クエリー実行ステータスをポーリングで確認
- クエリー実行結果を S3 から保存
Amazon Athena に一度にまとめて SQL を投げるエンジニアのヒントになれば幸いです。
参照
AWS CLI および AWS SDK for Python からの API の使い方は以下の過去の記事も参照下さい。