PyAthena+SQLAlchemyでAthenaをPythonicに実行して結果を取得してみた
Athenaを使う業務のバッチ処理にて、ORM併用やS3から結果を取得するフローをより楽にできたりしないものかと思っていたところ、まさしくそのものの組み合わせがあったため試してみました。
PyAthenaについて
Python DB API 2.0 (PEP 249)準拠のAmazon Athena用クライアントです。PEP249については以下のリンク先を参照してください。
実際の利用例としては、Amazon Web Services ブログのAmazon Athena を使用した SageMaker ノートブックからの SQL クエリの実行方法に掲載されています。
なお、PyAthenaに依存したCLIクライアントとしてathenacliがあります。
PyAthena + SQLAlchemy
サポートされているSQLAlchemyのバージョンは1.0.0以上となります。
Install SQLAlchemy with pip install SQLAlchemy>=1.0.0 or pip install PyAthena[SQLAlchemy]. Supported SQLAlchemy is 1.0.0 or higher.
via https://pypi.org/project/PyAthena/
% pip install PyAthena[SQLAlchemy]
使い方
今回はSQLAlchemy内でPyAthenaを経由してAthena上でSQLを実行し、結果をS3から取得します。create_engineに渡すURLスキームがathenaベース指定となる部分以外、SQLAlchemyそのものとなります。
% vim main.py import boto3 from urllib.parse import quote_plus from sqlalchemy.engine import create_engine from sqlalchemy.sql.expression import select from sqlalchemy.sql.functions import func from sqlalchemy.sql.schema import Table, MetaData S3_BACKET = "XXXXXXXXXXXXXXXX" def main(): conn_str = 'awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}' engine = create_engine(conn_str.format( region_name='ap-northeast-1', schema_name='sampledb', s3_staging_dir=quote_plus('s3://%s/' % S3_BACKET) )) many_rows = Table('test', MetaData(bind=engine), autoload=True) print(select([func.count('*')], from_obj=many_rows).scalar()) if __name__ == '__main__': main()
% python main.py 30
PyAthena単体実行
SQLAlchemyの記述スタイルが上手く使えない場合は、PyAthenaでの直接実行を利用する方法もあります。
from pyathena import connect import boto3 session = boto3.Session(profile_name='default') credentials = session.get_credentials() current_credentials = credentials.get_frozen_credentials() cursor = connect(aws_access_key_id=current_credentials.access_key, aws_secret_access_key=current_credentials.secret_key, s3_staging_dir='s3://XXXXXXXXXXXXXXXX/', region_name='ap-northeast-1').cursor() cursor.execute(""" SELECT count(*) FROM test """) print(cursor.fetchall())
まとめ
これまでにboto3を用いて、
- AthenaでのSQL実行
- SQLの実行完了待ち
- S3より実行結果のダウンロード
- ダウンロードしたデータの展開
と、1つ1つ順を追って処理してきたことが数行で完了できてしまいました。
正直なところ呆然とした感覚もあるのですが、ライブラリの有用性が自力で手間をかけて実装した経験により更に実感できたという点もあります。
PyAthenaは、Athenaの処理実行待ちとS3からダウンロードしたファイルの展開までを自動で行うため、実施するSQLの記述だけに専念できます。 バッチ処理等でAthenaでの集計結果をダウンロードするシーンにはとても有効だと思われますので、同じような状況の方は試してみてはいかがでしょうか。