PyAthena+SQLAlchemyでAthenaをPythonicに実行して結果を取得してみた

PyAthenaを使って「AmazonAthena上でのSQL実行〜実行結果をS3から取得〜取得したデータの展開」のプロセスをほんの数行のロジックで完了させてみました。
2019.01.08

Athenaを使う業務のバッチ処理にて、ORM併用やS3から結果を取得するフローをより楽にできたりしないものかと思っていたところ、まさしくそのものの組み合わせがあったため試してみました。

PyAthenaについて

Python DB API 2.0 (PEP 249)準拠のAmazon Athena用クライアントです。PEP249については以下のリンク先を参照してください。

実際の利用例としては、Amazon Web Services ブログのAmazon Athena を使用した SageMaker ノートブックからの SQL クエリの実行方法に掲載されています。

なお、PyAthenaに依存したCLIクライアントとしてathenacliがあります。

PyAthena + SQLAlchemy

サポートされているSQLAlchemyのバージョンは1.0.0以上となります。

Install SQLAlchemy with pip install SQLAlchemy>=1.0.0 or pip install PyAthena[SQLAlchemy]. Supported SQLAlchemy is 1.0.0 or higher.

via https://pypi.org/project/PyAthena/

% pip install PyAthena[SQLAlchemy]

使い方

今回はSQLAlchemy内でPyAthenaを経由してAthena上でSQLを実行し、結果をS3から取得します。create_engineに渡すURLスキームがathenaベース指定となる部分以外、SQLAlchemyそのものとなります。

% vim main.py
import boto3
from urllib.parse import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData

S3_BACKET = "XXXXXXXXXXXXXXXX"
def main():
    conn_str = 'awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}'
    engine = create_engine(conn_str.format(
        region_name='ap-northeast-1',
        schema_name='sampledb',
        s3_staging_dir=quote_plus('s3://%s/' % S3_BACKET)
    ))
    many_rows = Table('test', MetaData(bind=engine), autoload=True)
    print(select([func.count('*')], from_obj=many_rows).scalar())
if __name__ == '__main__':
    main()
% python main.py
30

PyAthena単体実行

SQLAlchemyの記述スタイルが上手く使えない場合は、PyAthenaでの直接実行を利用する方法もあります。

from pyathena import connect
import boto3

session = boto3.Session(profile_name='default')
credentials = session.get_credentials()
current_credentials = credentials.get_frozen_credentials()
cursor = connect(aws_access_key_id=current_credentials.access_key,
                 aws_secret_access_key=current_credentials.secret_key,
                 s3_staging_dir='s3://XXXXXXXXXXXXXXXX/',
                 region_name='ap-northeast-1').cursor()
cursor.execute("""
               SELECT count(*) FROM test
               """)
print(cursor.fetchall())

まとめ

これまでにboto3を用いて、

  • AthenaでのSQL実行
  • SQLの実行完了待ち
  • S3より実行結果のダウンロード
  • ダウンロードしたデータの展開

と、1つ1つ順を追って処理してきたことが数行で完了できてしまいました。

正直なところ呆然とした感覚もあるのですが、ライブラリの有用性が自力で手間をかけて実装した経験により更に実感できたという点もあります。

PyAthenaは、Athenaの処理実行待ちとS3からダウンロードしたファイルの展開までを自動で行うため、実施するSQLの記述だけに専念できます。 バッチ処理等でAthenaでの集計結果をダウンロードするシーンにはとても有効だと思われますので、同じような状況の方は試してみてはいかがでしょうか。