[初心者向け]pandasでCloud StorageのCSVファイルを読み込んでみた

Cloud Storage上のCSVをpandasを用いて読み込み、parquet形式での出力を検証してみました。
2024.04.26

データアナリティクス事業本部の根本です。Cloud Functionsでpandasを用いてCloud Storageのファイルを読み込んでみました。2種類の読み込み方法を検証しています。そして読み込んだデータをparquet形式でCloud Storageに出力もしてみました。

この記事の対象者

  • pandasでCloud Storage上のCSVを読み込んでparquet形式で出力したいひと

前提条件

  • Cloud Functions、Cloud Storageが使用できるGoogle Cloudのプロジェクトがあること

やってみる

やりたいこと

  • Cloud Functionsでpandasを用いてCloud Storage上のCSVファイルを読み込む
  • 読み込んだデータをデータフレームにする
  • データフレームをparquet形式でCloud Storageに出力する
  • CSVファイルの読み込みは、2通り行う

前準備:requirements.txtの準備

まずはrequirements.txtを準備します。以下となります。

functions-framework==3.*
pandas==2.2.2
pyarrow==16.0.0
gcsfs==2024.3.1
google-cloud-storage==2.2.1

parquet形式での出力を行うのでpyarrowを含めています。pandasでparquet形式のファイルを扱う場合(to_parquetを用いる場合)はpyarrowまたはfastparquetのどちらかが必要になります。
(to_parquetengineパラメータを省略した場合、デフォルトではpyarrowを使用されます)

engine{‘auto’, ‘pyarrow’, ‘fastparquet’}, default ‘auto’
Parquet library to use. If ‘auto’, then the option io.parquet.engine is used. The default io.parquet.engine behavior is to try ‘pyarrow’, falling back to ‘fastparquet’ if ‘pyarrow’ is unavailable.

引用:https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html

前準備:CSVファイルのバケットへのアップロード

バケットを作成して、以下のCSVファイルを作成します。検証用なので簡潔なCSVです。
id列のみ、10行のデータ部のデータとなります。

id
1
2
3
4
5
6
7
8
9
10

以下のコマンドでバケットへCSVファイルをアップロードします。

gsutil cp ./sample.csv gs://作成済みバケット名/

Cloud Functionsのpythonコード

コードは以下になります。

main.py

import functions_framework
from io import BytesIO
import pandas as pd
from google.cloud import storage

def get_df_from_bytes(bucket_name, file_name):
  client = storage.Client()
  bucket = client.get_bucket(bucket_name)
  blob = bucket.blob(file_name)
  blob_data = blob.download_as_bytes()
  csv_df = pd.read_csv(BytesIO(blob_data))
  return csv_df

def get_df_from_direct(bucket_name, file_name):
  csv_df = pd.read_csv("gs://" + bucket_name + "/" + file_name)
  return csv_df

@functions_framework.http
def pandas_parquet(request):
  df1 = get_df_from_bytes("バケット名", "sample.csv")
  df2 = get_df_from_direct("バケット名", "sample.csv")
  df1.to_parquet("gs://バケット名/df1.parquet")
  df2.to_parquet("gs://バケット名/df2.parquet")
  return "len(df1)" + str(len(df1)) + " ,len(df2):" + str(len(df2))

CSVからデータを取得するのに2種類の関数を用意しました。
1. read_csv関数内でBytesIOを用いたもの
2. read_csv関数内でURIを指定したもの

BytesIOを用いた関数

get_df_from_bytes

def get_df_from_bytes(bucket_name, file_name):
  client = storage.Client()
  bucket = client.get_bucket(bucket_name)
  blob = bucket.blob(file_name)
  blob_data = blob.download_as_bytes()
  csv_df = pd.read_csv(BytesIO(blob_data))
  return csv_df

要所だけ見ていきます。
Blob(ファイル)の取得:

blob = bucket.blob(file_name)

bucket.blob(file_name)を使って、指定されたバケット内の特定のファイル名(file_name)に対応するblobオブジェクトを取得します。 ファイルデータのダウンロード:

blob_data = blob.download_as_bytes()

blob.download_as_bytes()メソッドを呼び出して、blob(ファイル)の内容をバイトデータとしてダウンロードします。このバイトデータは、ファイルの内容をメモリ上に保持します。

csv_df = pd.read_csv(BytesIO(blob_data))

pd.read_csv()関数を使用して、ダウンロードしたバイトデータをpandasのDataFrameに読み込みます。ここで、BytesIO(blob_data)はバイトデータをファイルのように扱うために使用します。read_csv関数はこのデータストリームからCSVデータを読み取ることができます。  

read_csv関数内でURIを指定した関数

続いてもう1つの関数を見ていきます。

get_df_from_direct

def get_df_from_direct(bucket_name, file_name):
  csv_df = pd.read_csv("gs://" + bucket_name + "/" + file_name)
  return csv_df

BytesIOを用いたものよりシンプルかと思います。
read_csv関数内にはCloud StorageのURIを指定できます。
が、いくつか注意点があります。
1. gcsfsというライブラリが必要
pandasはfsspecというライブラリを介して多くのストレージからデータを読み込むことができますが、read_csv関数でCloud StorageのURIを指定した場合、 fsspecを介してgcsfsを用いて連携する形になります。fsspecはファイルシステムの操作を抽象化するためのインタフェースであり、gcsfsfsspecのインタフェースに基づいてCloud Storageとのやり取りを行うための具体的な実装を提供します。またgscfsfsspecに依存関係があるのでrequirements.txtにgcsfsを記載したのでpipの依存解決機構によって自動的にfsspecもインストールされます。
2. gcsfsはベータ版
以下の記載がgcsfsのサイトにありました

This software is beta, use at your own risk.

引用:https://gcsfs.readthedocs.io/en/latest/. ※2024/4/26時点の情報です。

シンプルな実装になるのですが、上記の点を踏まえた上で使用するかどうかを検討した方が良いと考えます。

トリガーされるメイン関数

処理としては、httpトリガーで起動された後、各CSV読み込み関数を起動してデータフレームを作成します。
作成したデータフレームを、to_parquet関数を用いてparquet形式でバケットに書き出します。また、各データフレームの長さをレスポンスで返却します。

pandas_parquet

@functions_framework.http
def pandas_parquet(request):
  df1 = get_df_from_bytes("バケット名", "sample.csv")
  df2 = get_df_from_direct("バケット名", "sample.csv")
  df1.to_parquet("gs://バケット名/df1.parquet")
  df2.to_parquet("gs://バケット名/df2.parquet")
  return "len(df1):" + str(len(df1)) + " ,len(df2):" + str(len(df2))

デプロイ

以下のgcloudコマンドでデプロイをしました。

gcloud functions deploy pandas_parquet_function \
--gen2 \
--region=asia-northeast1 \
--runtime=python311 \
--source=. \
--entry-point=pandas_parquet \
--trigger-http

関数の実行

それでは関数を以下のコマンドで実行します。httpトリガーなのでcurlで実行しています。

curl -X GET "関数のURL"

レスポンスは以下となっていました。

len(df1):10 ,len(df2):10

同じCSVを読み込んでいるので、長さは同じ[10]が返却されています。

Cloud Storageにparquet形式のファイルが作成されているかも確認します。

gsutil ls "バケット名"
gs://"バケット名"/df1.parquet
gs://"バケット名"/df2.parquet

2つのデータフレームから出力したparquet形式のファイルが出力されていました。

感想

Cloud Storageに対して、2種類のアプローチでCSVをpandasで読み込んでみました。gcsfsを用いると、コードがシンプルになるのでとても良いなと思いました。
今後もpandasを用いてCloud Storageを操作することがあるのでまた何か気になったことがあったら記事にしたいと思います。この記事が誰かの参考になれば嬉しいです。それでは。

参考

pandasリファレンス(to_parquet)
gcsfs
pandasリファレンス(read_csv)