AWS Python SDKからAthenaにクエリして結果をS3からローカルに保存

2018.01.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Amazon Athena を使うと、Amazon S3内のデータに爆速でSQLで問い合わせられます。

便利ではあるのですが、管理コンソールから操作すると

  1. SQL の記載
  2. SQL の実行
  3. 完了まで待つ
  4. ダウンロードリンクからSQLの実行結果を保存

と待ち時間が長く、まとめて実行した SQL が多くある場合は、少しばかり手間です。

このような問題を解決するため、API を使って SQL の実行から SQL の実行結果の保存までを API で行うスクリプトを紹介します。

実行イメージ

$ cat foo.sql
select count(*)
from bar

$ export AWS_DEFAULT_PROFILE=test

$ python athena.py foo.sql

$ ls -1
athena.log
athena.py
foo.sql     # クエリーする SQL
foo.sql.csv # クエリー結果

$ cat foo.sql.csv # クエリー結果を確認
"_col0"
"1234"

解説

主要なポイントだけかいつまんで説明します。

プログラム全体はブログ末尾を参照下さい

Amazon Athena にクエリーを投げる

Amazon Athena にクエリーを投げるには start_query_execution API を使います。

  • QueryString : SQL 文 を指定します。
  • QueryExecutionContext : クエリー先のデータベース名を指定します
  • ResultConfiguration : クエリーの保存先を指定します。AMCから利用する場合、デフォルトは「aws-athena-query-results--」です。
result = athena.start_query_execution(
    QueryString = sql,
    QueryExecutionContext = {
        'Database': DATABASE_NAME
    },
    ResultConfiguration = {
        'OutputLocation': 's3://' + S3BUCKET_NAME,
    }
)

クエリーの実行ステータスを確認

クエリーの実行ステータスを確認するには get_query_execution API を使います。

  • QueryExecutionId : start_query_execution 実行時に発行される ID を指定します。

レスポンス例です。

{
   "QueryExecution": {
      "Query": "string",
      "QueryExecutionContext": {
         "Database": "string"
      },
      "QueryExecutionId": "string",
      "ResultConfiguration": {
         "EncryptionConfiguration": {
            "EncryptionOption": "string",
            "KmsKey": "string"
         },
         "OutputLocation": "string"
      },
      "Statistics": {
         "DataScannedInBytes": number,
         "EngineExecutionTimeInMillis": number
      },
      "Status": {
         "CompletionDateTime": number,
         "State": "string",
         "StateChangeReason": "string",
         "SubmissionDateTime": number
      }
   }
}

QueryExecution -> Status -> State が開始直後の RUNNING から SUCCEEDED に遷移するまでポーリングします。

サンプルプログラムでは retrying モジュールを使い exponential backoff を使いながらポーリングしています。

retrying モジュールの使い方は、次の記事を参照下さい。

PythonでExponential Backoffをしたかったのでretryingモジュールを調べてみた

クエリー結果を S3 からローカル環境にコピー

Athena のクエリー結果は S3 に保存されます。

この保存先パスは s3://{OutputLocation}//{QueryExecutionId}.csv となります。

S3 の API で S3 からローカルサーバーにコピーします。

プログラム全体

以上の処理をまとめたプログラムが以下です

改善ポイント

今回したプログラムはクエリーを一つずつシーケンシャルに処理しています。

  • SQL の並列上限(デフォルトは5)を超えない範囲で並列実行し、 複数の実行ステータスを BatchGetQueryExecution でまとめて確認
  • S3バケット、データベース名 を引数で渡せるようにする

などすれば、もう少し使いやすくなると思います。

AWS CLI がクエリー処理完了をチェックする waiter に対応すると、ポーリング処理をよりシンプルにかけるようになります。でこの議論は以下のイシューで進行中です

  • https://github.com/boto/boto3/issues/1212
  • https://github.com/boto/botocore/pull/1358

CloudWatch Events が Athena のクエリーステータスと連動すると、ポーリングの代わりにイベントドリブンな処理を書けるようになります。

まとめ

今回は REST API を使い、以下の処理を行うプログラムを作成しました。

  1. Athena にクエリーを投げる
  2. クエリー実行ステータスをポーリングで確認
  3. クエリー実行結果を S3 から保存

Amazon Athena に一度にまとめて SQL を投げるエンジニアのヒントになれば幸いです。

参照

AWS CLI および AWS SDK for Python からの API の使い方は以下の過去の記事も参照下さい。

新機能 AWSCLIから Amazon Athena のクエリを実行する

AWS SDK for Python (Boto3) で Amazon Athena にクエリする