AWS SDK for pandas (awswrangler)のredshift.to_sqlの利用ポイント

2023.09.08

データアナリティクス事業本部のueharaです。

今回は、AWS SDK for pandas (awswrangler)の redshift.to_sql 関数の利用ポイントを紹介したいと思います。

AWS SDK for pandas (awswrangler)とは

awswranglerはAWSの様々なDBサービスとpandasを連携することができるAWS公式のPythonライブラリです。

例えば、以下のようなサービスとの連携に対応しています。

  • S3
  • RDS
  • Athena
  • Redshift
  • Timestream
  • などなど

awswranglerを利用することで、上記AWSサービスに対してpandasのDataFrameから簡単にデータのロード/アンロードすることができます。

Lambdaで利用したいのであればAWSがGithubでLambda Layers用のzipファイルを公開しておりますし、Glue Python Shellで利用したいのであればlibrary-setにanalyticsを指定するだけで良いので簡単に利用ができます。

redshift.to_sql関数とは

redshift.to_sql 関数は、一言で言うとpandas DataFrameからRedshiftにデータを入れるためのawswranglerに用意されている関数になります。

ドキュメントもこちらのページにあるのですが、細かい仕様については触れられていない部分も多々あり、ライブラリのソースコードを直接見に行くことも多いので今回はこちらの関数の利用ポイントをいくつかピックアップして紹介したいと思います。

なお、以降の記載は2023/09/08時点で最新の「AWS SDK for pandas 3.3.0」のソースコードを元にしていますので予めご了承下さい。

ポイント①:3つのデータ挿入モード

データの挿入には append, overwrite, upsert の3つのモードがあり、mode 引数で渡すことができます。

appendはいわゆる「INSERT」、overwriteは「洗い替え(全件削除&INSERT)」、upsertは文字通り「UPSERT(UPDATE&INSERT)」となります。

overwriteについては更に overwrite_method でテーブルの削除方法を指定できます。指定できる方法と、特徴は以下の通りです。

方法 特徴
drop テーブルを削除します。ただし、テーブルに依存しているビューがある場合は失敗します。
cascade テーブル、およびそのテーブルに依存しているビューを全て削除します。
truncate トランザクションを即座にコミットする削除方法です。データ挿入時には新しいトランザクションを開始するため、DB用語でいう原子性(Atomicity)は担保されません。
delete データを削除するという点では他の3つと同じですが、 DELETE FROM ... コマンドにより全ての行を削除する操作になります。テーブル定義自体は保持されますが、他の方法に比べて処理速度は遅くなります。

truncate以外であれば、削除⇒挿入の流れで処理が失敗した場合、ロールバックする仕様になっています。

したがって、洗い替えの際「挿入データとして渡したDataFrameのカラム数が挿入先のテーブル定義と異なっていた」というようなデータ挿入エラーが発生したとしても、元のテーブルが削除されてしまい件数が0件になってしまうことはありません。

また、UPSERTの場合は更新キーを primary_keys として渡すことができます。

仮にここで引数として渡さなかった場合、挿入先のテーブルからPRIMARY KEYを取得し、そのキーを元にUPSERTを実施する仕様となっています。明示的に記載したい場合は primary_kyes 引数に渡すと良いと思います。

洗い替えとUPSERTのそれぞれの例をコードで表すと、以下のようになります。

example1.py

# 洗い替えの例
wr.redshift.to_sql(
    df=df,
    table="test_table",
    schema="public",
    con=con,
    mode="overwrite",
    overwrite_method="delete"
)

# UPSERTの例
wr.redshift.to_sql(
    df=df,
    table="test_table",
    schema="public",
    con=con,
    mode="upsert",
    primary_keys = ["pk1", "pk2"]
)

ポイント②:挿入先のテーブルがDBに存在しない場合

挿入先のテーブルが存在しない場合、テーブルが新規作成されます。

このとき、定義される型はDataFrameのdtypeに基づくものになります。変換を実施している部分のソースコードは以下の通りです。

例えば、DataFrameの型がuint32であれば、テーブルはBIGINTで定義されます。

dtypeがNULLなど変換できない値であった場合は、以下のようなエラーとなります。

UnsupportedType: Unsupported Redshift type: null

作成されるテーブルの型を明示的に指定したい場合は、 redshift.to_sqldtype を引数として渡すことができます。

example2.py

wr.redshift.to_sql(
    df=df,
    table="test_table",
    schema="public",
    con=con,
    mode="overwrite",
    dtype={"col1_name": "VARCHAR(10)", "col2_name": "FLOAT"})
)

dtypeを指定することにより、例えば TIMESTAMPTZ など、変換表にない型も定義することができます。

ただ、基本的には予めDDLを用意してテーブルを作成し、洗い替えの際削除は delete で実施するという方針が運用上管理がしやすいのではないかと思います。

ポイント③:データ挿入に関する内部処理

データの挿入は、当然と言えば当然ですが内部的には INSERT クエリを作成して実施されます。

例えば、DataFrameが以下だったとします。

df = pd.DataFrame({
    "col1": ["hoge1", np.NaN, "hoge3", "hoge4"],
    "col2": ["foo1", "foo2", "foo3", "foo4"],
    "col3": [np.NaN, "x", "y", "z"],
    "col4": [1, 2, 3, 4]
})

このとき、1行目のデータを挿入するのに作成されるSQLは以下の通りとなります。

INSERT INTO public.test_table VALUES ('hoge1', 'foo1', NULL, 1)

ちなみに、 redshift.to_sql 関数の use_column_names という引数を True にした場合は以下のようになります。

INSERT INTO public.test_table (col1, col2, col3, col4) VALUES ('hoge1', 'foo1', NULL, 1)

上記のように、単純にデータの値をINSERTクエリにVALUESとして渡しているだけなので、データの挿入時には特にDataFrameの型を気にしてはいません。

したがって、以下のDataFrameのcol1のdtypeはStringになりますが、テーブル定義の方が TIMESTAMPTZ であれば TIMESTAMPTZ 型で入ります。

"""
# テーブル定義
CREATE TABLE IF NOT EXISTS public.test_table (
    col1 TIMESTAMPTZ
);
"""

df = pd.DataFrame({
    "col1": ["2023-09-08 09:30:00 JST", "2023-09-08 10:30:00 UTC"] # 上のテーブルに入れると TIMESTAMPTZ で入る
})

先程説明に出た redshift.to_sql の引数で指定するdtypeはあくまでテーブルを新規作成する際に利用されるものであり、既に元テーブルがある場合は利用されることの無い値になるのでご注意下さい。

また、まとめてINSERTするデータの行数を chunksize 引数で指定することもできます。デフォルトは200です。

まとめて入れる方が効率的ですが、その分メモリ使用量も増えるので、ユースケースによって適切な値を設定して下さい。

example3.py

wr.redshift.to_sql(
    df=df,
    table="test_table",
    schema="public",
    con=con,
    mode="append",
    chunksize=500 # まとめて挿入する行数を指定
)

ポイント④:テーブル名の大文字・小文字で問題が発生するケース

Redshiftは、デフォルトではデータベースオブジェクト(テーブルや列など)の大文字・小文字を区別しません。

したがって、 CREATE TABLE クエリでテーブル名を大文字で指定したとしても、内部では小文字のテーブル名で管理されます。

redshift.to_sql 関数において、overwriteとupsertは既存のテーブルチェックを行います。

overwriteであれば既存のテーブルが存在すれば削除、upsertであれば既存のテーブルが存在すれば一時テーブルを作成して更新・挿入の処理を行います。

既存のテーブルが存在しない場合、どちらもappend(INSERT)と同じ挙動をします。

テーブルの存在確認について、awswranglerは内部的には以下のクエリを実行しています。

# ※{schema}, {table} はそれぞれredshift.to_sql関数に引数として与えられている値
SELECT true WHERE EXISTS (
    SELECT * FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table}'
)

ここで、元のテーブルを大文字で作成したからと、 redshift.to_sql 関数に大文字でテーブル名を渡すと、上記の存在チェックでtrueとならないため想定しない挙動となります。

具体的には、例えばoverwriteであれば元テーブルが削除されずそのままデータがINSERTされるという事象が起こります。

example4.py

wr.redshift.to_sql(
    df=df,
    table="TEST_table", # 内部的にはtest_tableのため、存在チェッククエリでtrueとならない
    schema="public",
    con=con,
    mode="overwrite",
    overwrite_method="delete"
)

大文字と小文字を区別するオプションもあり、それによって対応も変わりますが、デフォルトの設定で使用している場合は注意しましょう。

ポイント⑤:chunksizeを大きくしすぎたときのエラー

テーブルに挿入するカラム数が多く、かつchunksizeを大きくしすぎた時、以下のようなエラーが発生します。

error: 'h' format requires -32768 <= number <= 32767

これは以下のようなクエリ内の placeholders (%s) に関する制限であり、このplaceholdersのカウンターがshort型であるため、このようなエラーが発生します。

INSERT INTO "public"."test_table" VALUES (%s, %s, %s)

本エラーがでた場合、chunksizeを小さくすることで利用するplaceholdersの数を少なくし、エラーを解消することができます。

ポイント⑥:INT型のテーブルにnanを含む値を入れる際の注意点

NULLを含む整数値のカラムを持つcsvファイルをpandasのDataFrameで読み込んだ場合、デフォルトでは float64 と判定されます。

例えば、下記の場合col2カラムは float64 で読み込まれます。

data.csv

col1,col2,col3
1,,7
2,5,8
3,6,9

これは、DataFrameのデフォルトの整数型である int64nan という値を持てないためです。

この nan を含む float64 のカラム持つDataFrameを、RedshiftのINT型のテーブルに redshift.to_sql でデータを挿入しようとするとエラーとなります。

invalid input syntax for integer: "nan"

対策としては、DataFrameを全て object 型にすることが考えられます。

ポイント③でお話した通り、 redshift.to_sql 関数では単にINSERTクエリのVALUESに値を渡しているだけなので文字列としてしまっても問題ありません。

そうしてしまえば INSERT クエリ発行時に nan の部分は NULL となるので、問題なく挿入できるようになります。

...

df = df.astype(object)

wr.redshift.to_sql(
    df=df,
    table="test_table",
    schema="public",
    con=con,
    mode="append"
)

ポイント⑦:処理実行後のcommitタイミング

まず大前提として、AWS SDK for pandas (awswrangler)が裏側で利用している redshift_connector はデフォルトで autocommit がデフォルトで False になっています。

また、仮に上記を Trueに設定していたとしても、redshift.to_sql 関数では引数で渡されたコネクションの autocommit 設定を以下の通り参照しません。

autocommit_temp: bool = con.autocommit
con.autocommit = False

redshift.to_sql 関数の commit_transaction 引数はデフォルトで True になっているため、正常に終了が終了した後に con.commit() が実行されます。

従って、仮にその前段で con.cursor().execute() を実行してcommitしていないものがあっても、このタイミングでcommitされます。

commit_transactionFalse を設定すると redshift.to_sql 関数の処理終了後にもcommitはしませんので、その方が都合が良いという場合は False に設定して下さい。

ただし、redshift.to_sql 関数の実行が失敗した場合は(ポイント①で述べたように)truncate以外は con.rollback() が呼ばれますので、その時点でcommitしていないcon.cursor().execute() の実行処理分は破棄されることになります。

最後に

今回は、AWS SDK for pandas (awswrangler)の redshift.to_sql 関数の利用ポイントを紹介させていただきました。

参考になりましたら幸いです。

参考文献