boto3で実行したAthenaの検索結果を、pandasのDataFrameとして受け取る方法

`boto3`を用いて実行したAthenaのクエリ結果を`AWS SDK for pandas (awswrangler)`を用いて`pandas`のDataFrameとして取得してみました。
2023.05.26

DA事業本部の横山です。

今回はboto3を用いて実行したAthenaのクエリ結果を、AWS SDK for pandas (awswrangler)を用いてpandasのDataFrameとして取得してみました。

前提条件

本記事で利用している各ライブラリのバージョンは以下になります。

  • boto3: 1.26.135
  • awswrangler: 3.1.1
  • pandas: 2.0.1

注意事項

  • AWS SDK for pandas (awswrangler)を用いれば、検索の実行や処理状況のポーリング自体もawswranglerで行えます。しかし本記事ではあくまで検索の実行をboto3で行い、検索結果の取得,DataFrameへの変換をawswranglerで行うという内容の記事になっております。

重要な部分

本章移行に実行したコードや出力内容を記載しますが、重要な点は以下です。

  • Athenaで検索開始
    • boto3を使ってAthenaへ検索を行います
  • レスポンス内容からQueryExecutionIdを取得します
    • 実際にクエリが実行完了したかをポーリングして確認する必要がありますが、本記事では割愛(コードには載っています)
  • Athenaから発行されたQueryExecutionIdを用いて検索結果をDataFrameとして取得する
    • awswranglerget_query_results()を利用する
        # Athenaで検索開始
        response = athena_cli.start_query_execution(
            QueryString=sql,
            QueryExecutionContext={
                "Database": database,
            },
            ResultConfiguration={
                "OutputLocation": s3_output,
            },
            WorkGroup=workgroup,
        )

        # QueryExecutionIdを取得
        queryExecutionId = response.get("QueryExecutionId")

            response = athena_cli.get_query_execution(QueryExecutionId=queryExecutionId)
            state = response.get("QueryExecution").get("Status").get("State")

        # QueryExecutionIdから検索結果をDataFrameで取得(awswrangler)
        df = get_query_results(
            query_execution_id=queryExecutionId,
        )

awswrangler.athena.get_query_results()のドキュメントは以下になります。

コード内容

import logging
import logging.config
import time
import traceback

import boto3
import pandas as pd
from awswrangler.athena import get_query_results

logging.config.fileConfig(
    "logging.ini",
    disable_existing_loggers=False,
)
logger = logging.getLogger(__name__)


def read_sql_query(
    athena_cli: str,
    sql: str,
    database: str,
    workgroup: str,
    s3_output: str,
    athena_query_wait_polling_delay: float = 0.25,
) -> pd.DataFrame:
    """Athenaでクエリを実行し、クエリ実行結果をDataFrameで取得する"""

    try:
        # クエリを準備
        response = athena_cli.start_query_execution(
            QueryString=sql,
            QueryExecutionContext={
                "Database": database,
            },
            ResultConfiguration={
                "OutputLocation": s3_output,
            },
            WorkGroup=workgroup,
        )

        # QueryExecutionIdを取得
        queryExecutionId = response.get("QueryExecutionId")

        # クエリ実行中
        while True:
            response = athena_cli.get_query_execution(QueryExecutionId=queryExecutionId)
            state = response.get("QueryExecution").get("Status").get("State")
            if state == "SUCCEEDED":
                logging.info("SUCCEEDED")
                break
            elif state in ("CANCELLED", "FAILED"):
                logger.info(f"QueryExecution Status is {state}.")
                raise Exception(f"QueryExecution Status is {state}.")
            else:
                time.sleep(athena_query_wait_polling_delay)

        # クエリ実行時の統計情報を取得
        logging.info(
            f"QueryQueueTimeInMillis: {response.get('QueryExecution').get('Statistics').get('QueryQueueTimeInMillis')}"
        )
        logging.info(
            f"TotalExecutionTimeInMillis: {response.get('QueryExecution').get('Statistics').get('TotalExecutionTimeInMillis')}"
        )
        logging.info(
            f"DataScannedInBytes: {response.get('QueryExecution').get('Statistics').get('DataScannedInBytes')}"
        )

        # クエリ実行結果をDataFrameで取得
        df = get_query_results(
            query_execution_id=queryExecutionId,
        )

    except Exception as e:
        logging.error(e)
        logging.error(traceback.format_exc())

    else:
        logging.info("OK")

    finally:
        return df


def main():
    database = "test_database"
    s3_output = f"s3://path/to/athena/output"
    workgroup = "test_workgroup"

    sql = """
    SELECT * FROM "test_database"."test_member_groups" limit 10;
    """

    athena_cli = boto3.client("athena")

    df = read_sql_query(
        athena_cli=athena_cli,
        sql=sql,
        database=database,
        workgroup=workgroup,
        s3_output=s3_output,
    )
    logger.info(df)


if __name__ == "__main__":
    main()

実行ログ

検索の実行、ポーリングを行い、Athenaの処理結果をDataFrameとして取得できているのがわかります。

利用シーンは限られますが、boto3でAthenaのクエリ実行を行い結果をpandasのDataFrameとして取得することができました。 boto3で取得するAthenaの検索結果は通常JSON形式となりますがDataFrameとして受け取ることで、データ分析やデータの利活用が行いやすくなりますね。

[INFO] athena-query.py read_sql_query 48 SUCCEEDED
[INFO] athena-query.py read_sql_query 57 QueryQueueTimeInMillis: 124
[INFO] athena-query.py read_sql_query 60 TotalExecutionTimeInMillis: 588
[INFO] athena-query.py read_sql_query 63 DataScannedInBytes: 142
[INFO] athena-query.py read_sql_query 77 OK
[INFO] athena-query.py main 102   test_id test_name
0   test01      テスト01
1   test02      テスト02
2   test03      テスト03
3   test04      テスト04
4   test05      テスト05

おわりに

稀なケースではありますが、boto3を用いて実行したAthenaのクエリ結果をAWS SDK for pandas (awswrangler)を用いてpandasのDataFrameとして取得してみました。

以上になります。この記事がどなたかの助けになれば幸いです。