Pandasを使ってS3バケット上のExcelファイルをParquet形式に変換する

2023.10.13

データアナリティクス事業本部インテグレーション部コンサルティングチーム・新納(にいの)です。

S3バケット上にあるExcelファイルを変換し、Amazon Athenaでクエリできるようにしたくなる時が誰しもあるかと思います。今回はS3バケット上にあるExcelファイルをParquet形式に変換してAthenaでテーブル化してみました。

前提

こんなことがやりたい

S3バケット上にあるExcelファイルを別の保存先バケットにParquet形式に変換した上で保存します。将来的にはこのファイルを使ってAmazon Athenaでテーブルを作成します。

実行環境

以下の環境で動作確認しています。

  • Python:3.11.4

ExcelからParquetへ変換する

実行したPythonスクリプトは以下の通りです。

import boto3
import pandas as pd
import io 

PROFILE_NAME = '<プロファイル名>'

# S3バケット名とプレフィックス(フォルダパス)を設定
TARGET_BUCKET = '<ソースバケット名>'  
prefix = 'exceldata/' 

# 変換後のTSVファイルを保存するフォルダパス
TO_BUCKET = '<保存先バケット名>'
parquet_folder = 'parquetdata/'

# S3クライアントを作成
session = boto3.Session()
if PROFILE_NAME in boto3.Session().available_profiles:
    session = boto3.Session(profile_name=PROFILE_NAME)
s3 = session.resource('s3')
s3_client = session.client('s3')

# Bucketからオブジェクトの一覧を取得する
def get_all_objects_high(bucket):
    bucket = s3.Bucket(bucket)
    return bucket.objects.all()

# ExcelファイルをParquetに変換して保存する関数
def convert_excel_to_parquet(target_bucket, to_bucket, obj):
    # S3からExcelファイルを読み取り
    excel_object = s3_client.get_object(Bucket=target_bucket, Key=obj.key)
    excel_data = excel_object['Body'].read()
 
    # ExcelファイルをDataFrameとして読み込み(Sheetは1枚目を取得)
    df = pd.read_excel(io.BytesIO(excel_data), engine='openpyxl', sheet_name=0)
    
    # Excelデータが0件の場合終了(ヘッダーしか無い場合や空ファイルの場合も終了)
    if df.empty==True:
        print(f'{obj.key} has no data')
        return

    # Parquetファイルとして保存
    buffer = io.BytesIO()
    parquet_file = parquet_folder + obj.key.split('/')[-1].replace('.xlsx', '.parquet')
    df.to_parquet(buffer)
    s3_client.put_object(Bucket=to_bucket, Key=parquet_file, Body=buffer.getvalue())
    print(f'created {parquet_file}')

def main():
    print('boto3 vertion is {0}'.format(boto3.__version__))
    objs = get_all_objects_high(TARGET_BUCKET)
    for i,obj in enumerate(iter(objs)):
        if obj.size == 0 or obj.key[-5:] != '.xlsx':
            print(f'{obj.key} is not converted')
            continue
        convert_excel_to_parquet(TARGET_BUCKET,TO_BUCKET,obj)

    
if __name__ == "__main__":
    main()

大切なポイントに絞って解説します。

Bucketからオブジェクトの一覧を取得する

まずはS3バケット内のExcelファイルをすべて取得します。オブジェクトの取得といえばlist_objects_v2でしたが、今回は高レベルAPIを使って記述量を減らしています。

S3のバケット内のオブジェクトをすべて取得する | DevelopersIO

# Bucketからオブジェクトの一覧を取得する
def get_all_objects_high(bucket):
    bucket = s3.Bucket(bucket)
    return bucket.objects.all()

Excelファイル読み込み

read_excel()でExcelファイルを読み込みします。

今回はS3上に置かれたExcelの中身を['Body'].read()で読み込み、BytesIOでラップし、DataFrameに格納しました。BytesIOを使うことでメモリ上でデータの受け渡しをしています。

    # S3からExcelファイルを読み取り
    excel_object = s3_client.get_object(Bucket=target_bucket, Key=obj.key)
    excel_data = excel_object['Body'].read()
     
    # ExcelファイルをDataFrameとして読み込み(Sheetは1枚目を取得)
    df = pd.read_excel(io.BytesIO(excel_data), engine='openpyxl', sheet_name=0)

参考:pandas.read_excel — pandas 2.1.1 documentation

Parquetファイルに変換して保存する

to_parquetでParquetファイルに変換可能です。ここでもBytesIOでラップしたデータを使っています。デフォルトではsnappyで圧縮されます。

    # Parquetファイルとして保存
    buffer = io.BytesIO()
    parquet_file = parquet_folder + obj.key.split('/')[-1].replace('.xlsx', '.parquet')
    df.to_parquet(buffer)
    s3_client.put_object(Bucket=to_bucket, Key=parquet_file, Body=buffer.getvalue())
    print(f'created {parquet_file}')

参考:pandas.DataFrame.to_parquet — pandas 2.1.1 documentation

pyarrowかfastparquetがインストールされていない環境ではto_parquet()を実行すると以下のエラーが発生します。

Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.

Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

to_parquet()ではengineオプションでPandasで使用するParquetライブラリを指定可能です。デフォルトだとautoで、io.parquet.engineが使われます。io.parquet.engineはデフォルトでpyarrowを利用し、pyarrowが利用できない場合はfastparquetを利用します。明示的にengineオプションで指定したライブラリをインストールしておくか、指定しないのであればpyarrowかfastparquetをインストールしておきましょう。

pip install pyarrow

S3バケット配下のフォルダをスキップする

かなり地味ですが、少しハマったのがS3バケット配下のフォルダをスキップする処理です。

今回はバケットの下にフォルダがあり、そのフォルダの中にExcelファイルが置かれています。フォルダも処理しようとして変換処理がエラーになったため、if obj.size == 0でsizeが0のオブジェクトをスキップする処理を追加して回避しています。

        if obj.size == 0 or obj.key[-5:] != '.xlsx':
            print(f'{obj.key} is not converted')
            continue
        convert_excel_to_parquet(TARGET_BUCKET,TO_BUCKET,obj)

最後に

知っておくとちょっと便利なExcelファイルからParquetファイルへの変換方法でした。ExcelファイルのままだとAthenaで直接クエリができませんが、Parquet形式に変換することで圧縮や列志向フォーマットとして使えて、Athenaでのクエリが高速になります。この記事が誰かのお役に立てば幸いです。