DA事業本部の横山です。
今回はboto3
を用いて実行したAthenaのクエリ結果を、AWS SDK for pandas (awswrangler)
を用いてpandas
のDataFrameとして取得してみました。
前提条件
本記事で利用している各ライブラリのバージョンは以下になります。
boto3
:1.26.135
awswrangler
:3.1.1
pandas
:2.0.1
注意事項
AWS SDK for pandas (awswrangler)
を用いれば、検索の実行や処理状況のポーリング自体もawswrangler
で行えます。しかし本記事ではあくまで検索の実行をboto3
で行い、検索結果の取得,DataFrameへの変換をawswrangler
で行うという内容の記事になっております。
重要な部分
本章移行に実行したコードや出力内容を記載しますが、重要な点は以下です。
- Athenaで検索開始
boto3
を使ってAthenaへ検索を行います
- レスポンス内容から
QueryExecutionId
を取得します- 実際にクエリが実行完了したかをポーリングして確認する必要がありますが、本記事では割愛(コードには載っています)
- Athenaから発行された
QueryExecutionId
を用いて検索結果をDataFrameとして取得するawswrangler
のget_query_results()
を利用する
# Athenaで検索開始
response = athena_cli.start_query_execution(
QueryString=sql,
QueryExecutionContext={
"Database": database,
},
ResultConfiguration={
"OutputLocation": s3_output,
},
WorkGroup=workgroup,
)
# QueryExecutionIdを取得
queryExecutionId = response.get("QueryExecutionId")
response = athena_cli.get_query_execution(QueryExecutionId=queryExecutionId)
state = response.get("QueryExecution").get("Status").get("State")
# QueryExecutionIdから検索結果をDataFrameで取得(awswrangler)
df = get_query_results(
query_execution_id=queryExecutionId,
)
awswrangler.athena.get_query_results()
のドキュメントは以下になります。
コード内容
import logging
import logging.config
import time
import traceback
import boto3
import pandas as pd
from awswrangler.athena import get_query_results
logging.config.fileConfig(
"logging.ini",
disable_existing_loggers=False,
)
logger = logging.getLogger(__name__)
def read_sql_query(
athena_cli: str,
sql: str,
database: str,
workgroup: str,
s3_output: str,
athena_query_wait_polling_delay: float = 0.25,
) -> pd.DataFrame:
"""Athenaでクエリを実行し、クエリ実行結果をDataFrameで取得する"""
try:
# クエリを準備
response = athena_cli.start_query_execution(
QueryString=sql,
QueryExecutionContext={
"Database": database,
},
ResultConfiguration={
"OutputLocation": s3_output,
},
WorkGroup=workgroup,
)
# QueryExecutionIdを取得
queryExecutionId = response.get("QueryExecutionId")
# クエリ実行中
while True:
response = athena_cli.get_query_execution(QueryExecutionId=queryExecutionId)
state = response.get("QueryExecution").get("Status").get("State")
if state == "SUCCEEDED":
logging.info("SUCCEEDED")
break
elif state in ("CANCELLED", "FAILED"):
logger.info(f"QueryExecution Status is {state}.")
raise Exception(f"QueryExecution Status is {state}.")
else:
time.sleep(athena_query_wait_polling_delay)
# クエリ実行時の統計情報を取得
logging.info(
f"QueryQueueTimeInMillis: {response.get('QueryExecution').get('Statistics').get('QueryQueueTimeInMillis')}"
)
logging.info(
f"TotalExecutionTimeInMillis: {response.get('QueryExecution').get('Statistics').get('TotalExecutionTimeInMillis')}"
)
logging.info(
f"DataScannedInBytes: {response.get('QueryExecution').get('Statistics').get('DataScannedInBytes')}"
)
# クエリ実行結果をDataFrameで取得
df = get_query_results(
query_execution_id=queryExecutionId,
)
except Exception as e:
logging.error(e)
logging.error(traceback.format_exc())
else:
logging.info("OK")
finally:
return df
def main():
database = "test_database"
s3_output = f"s3://path/to/athena/output"
workgroup = "test_workgroup"
sql = """
SELECT * FROM "test_database"."test_member_groups" limit 10;
"""
athena_cli = boto3.client("athena")
df = read_sql_query(
athena_cli=athena_cli,
sql=sql,
database=database,
workgroup=workgroup,
s3_output=s3_output,
)
logger.info(df)
if __name__ == "__main__":
main()
実行ログ
検索の実行、ポーリングを行い、Athenaの処理結果をDataFrameとして取得できているのがわかります。
利用シーンは限られますが、boto3
でAthenaのクエリ実行を行い結果をpandas
のDataFrameとして取得することができました。
boto3
で取得するAthenaの検索結果は通常JSON形式となりますがDataFrameとして受け取ることで、データ分析やデータの利活用が行いやすくなりますね。
[INFO] athena-query.py read_sql_query 48 SUCCEEDED
[INFO] athena-query.py read_sql_query 57 QueryQueueTimeInMillis: 124
[INFO] athena-query.py read_sql_query 60 TotalExecutionTimeInMillis: 588
[INFO] athena-query.py read_sql_query 63 DataScannedInBytes: 142
[INFO] athena-query.py read_sql_query 77 OK
[INFO] athena-query.py main 102 test_id test_name
0 test01 テスト01
1 test02 テスト02
2 test03 テスト03
3 test04 テスト04
4 test05 テスト05
おわりに
稀なケースではありますが、boto3
を用いて実行したAthenaのクエリ結果をAWS SDK for pandas (awswrangler)
を用いてpandas
のDataFrameとして取得してみました。
以上になります。この記事がどなたかの助けになれば幸いです。