[小ネタ][python]S3上のGzip圧縮されたcsvファイルをローカルに保存させずにデータ加工してみた

pandasの使い方をもっと慣れていきたい最中で作った。
2020.05.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どーもsutoです。

機械学習関連の自学や業務でデータセットを扱うためにPythonを勉強しています。中には数百MB〜数GBのデータを扱うようになると、ローカルPCディスクの容量を多少気にしなくちゃならなくりますね。

てことで今回は、S3上のGzipファイルをローカルディスクに保存させずにデータ操作を行うスクリプトを作ってみました。

検証内容

  • S3に置かれているGzipファイル(csv or tsvを圧縮したもの)をpandasで読み込み
  • 読み込んだデータの指定の場所に新しいカラム(複数)を挿入する
  • 加工後のデータを書き込み、S3にPUTする

結果

スクリプト

import boto3
import io
import gzip
import datetime
from botocore.errorfactory import ClientError
import pandas as pd

region = 'ap-northeast-1'
bucket = 'cm-suto-testcsv'
session = boto3.Session(region_name=region, profile_name='suto')

s3 = session.resource('s3')
s3client = session.client('s3')

#
# functions
#
def check_s3_key_exists(bucket: str, key: str):
    try:
        s3client.head_object(Bucket=bucket, Key=key)
        return True
    except ClientError:
        return False

#
# main
#
def main():

    target_date = str(datetime.date.today())
    target_date_num = target_date.replace("-", "")

    inkey = 'csv/target_in.tsv.gz'
    outkey = 'csv/target_out-{}.tsv.gz'.format(target_date_num)

    colum_list = ['TEST1', 'TEST2'] # 追加したいカラムをリストで生成
    number = 2                      # 挿入したい列番号を代入

    if check_s3_key_exists(bucket, inkey):
        try:
            inobj = s3.Object(bucket, inkey)
            outobj = s3.Object(bucket, outkey)
            content = inobj.get()['Body'].read()
            bytes_io = io.BytesIO(content)
            fh_r = pd.read_csv(bytes_io, compression='gzip', header=0, index=False, sep='\t', encoding='utf-8')
            inmem = io.BytesIO()
            with gzip.GzipFile(fileobj=inmem, mode='wb') as fh_w:
                with io.TextIOWrapper(fh_w, encoding='utf-8') as wrapper:
                    for item in colum_list:
                        fh_r.insert(number, item, ' ') # 新規追加のValueには空白を挿入
                        number +=1
                    fh_r.to_csv(wrapper, sep='\t', encoding='utf-8')
            outobj.put(Body=inmem)

        except Exception as e:
            print(e)
            print('Error getting object {} from bucket {}. Make sure they exist and your bucket.'.format(inkey, bucket))
            raise e

if __name__ == "__main__":
    main()

実行前のデータ

実際にスクリプト実行前と後のデータを見てみると

実行後のデータ

ちゃんと更新されてS3に保存されています。 データ操作の処理は49〜51行の部分なのでここを自分のやりたい処理に変えれば応用が効くはず。

まとめ

以上、ローカルPCに保存させることなくS3上の圧縮ファイルのデータ編集を行う方法でした。

元インフラエンジニアの身としてまだまだコード書きは未熟ですが、いずれキレイに書けるようがんばります。