AWS Data Wranglerを使ってpandas -> AthenaとAthena -> pandasをやってみた

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どうも、DA 事業本部の大澤です。

AWS Data Wrangler を使って、”pandas DataFrame のデータを Athena のテーブルへ”、”Athena でのクエリ実行結果を pandas DataFrame へ”というのを試してみました。かなり便利だったので、どんな感じでできるかをご紹介します。

AWS Data Wrangler

2019年9月、Github上にAWS Data Wrangler(以下、Data Wrangler)が公開されました。Data Wranglerは、各種AWSサービスからデータを取得して、コーディングをサポートしてくれるPythonのモジュールです。

現在、Python を用いて、Amazon Athena(以下、Athena)や Amazon Redshift(以下、Redshift)からデータを取得して、ETL 処理を行う際、PyAthena や boto3、Pandas などを利用して行うことが多いかと思います。その際、本来実施したい ETL のコーディングまでに、接続設定を書いたり、各種コーディングが必要でした。Data Wragler を利用することで、Athena や Amazon S3(以下、S3)上の CSV から Pandas を利用するのが、数行で実施できたり、PySpark から Redshift に連携できるなど、お客様側は ETL の処理の記述内容に集中することができます。 本モジュールはインスタンスに対して pip でインストールできることに加え、Lambda Layer としての利用や Glue 上で egg ファイルをアップロードして利用することができます。

AWS Data Wrangler を使って、簡単に ETL 処理を実現する | Amazon Web Services ブログより

やってみる

インストール

Aws Data Wrangler を pip でインストールします。

pip install awswrangler

ライブラリの読み込みと準備

使用するライブラリを読み込み、各パラメータの設定等を行います。パラメータは必要に応じて変更してください。

import awswrangler
import numpy as np
import pandas as pd
from os import path

database = 'test_data_wrangler' # 検証用に使用するGlue/AthenaのDB名
bucket = '' # 使用するS3のバケット名
prefix = 'data_wrangler' # データの配置場所
s3_prefix = f's3://{bucket}/{prefix}'

# AWS Data Wrangler の各処理はこのセッションを通じて行います
session = awswrangler.Session()

検証用のデータベース作成

Glueのクライアントを使って、検証用にデータベースを作成します。

glue = session.boto3_session.client('glue')
glue.create_database(DatabaseInput={'Name':database})

データ作成

乱数を生成し、1000行4列のデータを作成します。1-3列目は[0, 1)の実数、4列目は[0,10)の整数にします。

df = pd.DataFrame(np.hstack((np.random.rand(1000, 3), np.random.randint(0, 10, (1000, 1)))) , columns=['a', 'b', 'c', 'd'])
df

pandas DataFrame -> Athena

AWS Data Wranglerを使って先ほど作成したDataFrameのデータをAthenaのテーブルへ挿れます。 自動的にParquet化し、指定した場所にデータをアップロードしてくれます。また、テーブルが存在しない場合は自動的に作成されます。テーブルがすでに存在し、レコードもすでに入ってる場合の挙動は引数のmodeによってコントロールできます。

そのほかの引数についてはドキュメントをご覧ください。

table='test'
session.pandas.to_parquet(
    dataframe=df,
    database=database,
    table=table,
    path=path.join(s3_prefix, table), # データの保存場所
    partition_cols='d', 
    mode='overwrite_partitions', # 'append', 'overwrite', 'overwrite_partitions'
    preserve_index=False,
    compression='gzip' # None, 'snappy', 'gzip', 'lzo' 
)

実行後、アップロードされたParquetファイルのパス一覧が返されます。列dでパーティション化されていることが分かります。

Athena -> pandas DataFrame

次は先ほど作成したテーブルに対してクエリを実行し、その結果を取得してみます。

query = '''
    select * from test
'''
athena_df = session.pandas.read_sql_athena(
    sql=query,
    database=database
)
athena_df

検証用のデータベース削除

検証が完了したので、最初に作成したデータベースを削除します。

glue.delete_database(Name=database)

さいごに

AWS Data Wrangler を使って、”pandas DataFrame-> Athena”と”Athena -> pandas DataFrame”をやってみた様子をご紹介しました。AWS Data Wrangler には今回ご紹介した内容以外にも多くのことが可能です。 うまく活用することで ETL 処理がかなり楽になりそうです!

参考