S3のデータをAmazon Aurora PostgreSQLにUPSERTしてみた
AWSで構築したデータ基盤で、S3のデータを Aurora PostgreSQLにUPSERTする方法を紹介します。
UPSERT(INSERT/UPDATE)とは
データベースにレコード挿入する際、100%新規データを扱うのであれば、テーブルにINSERTするだけですみます。 更新データも存在する場合、新規の場合はINSERT、更新の場合は UPDATE というように処理を分ける必要があります。
このように、条件によってUPDATEとINSERTを使い分けて処理することを、UPdate と inSERT をくっつけて UPSERT と呼びます。
データ処理で頻出の更新パターンであり、AWSのデータウェアハウス Amazon Redshiftにも、UPSERT単独のドキュメントが存在します。
新しいデータの更新と挿入 - Amazon Redshift
このドキュメントでは
- テーブルの行すべてを上書き
- テーブルの特定の列を更新
の2種類が解説されています。
本ブログでは
- Amazon Redshiftの代わりにAmazon Aurora
- ステージテーブルのデータソースはS3
に置き換えて「1. テーブルの行すべてを上書き」を実現する方法を紹介します。
ポイント
- UPSERT 処理全体を1トランザクションで済ます
- PostgreSQLのS3インポート機能でS3からステージテーブルにロード
- ステージ <-> ターゲットテーブル間でUPSERT
- INSERT 文の
ON CONFLICT
句を使うと、 SQL 1本で UPSERT をかける - ステージテーブルは一時テーブルとしてとして作成。PostgreSQLの一時テーブルスキーマは永続化されない
環境
- データベース : Amazon Aurora PostgreSQL 13系
- データソース : S3
- 処理プログラム : Python + Psycopg2
1. 一時テーブルの作成
S3 データをロードするステージテーブルは UPSERT 処理中しか利用しないため、一時テーブルとしてCSV フォーマットに合わせたテーブルを作成します。
テーブル CREATE 時に TEMP(TEMPORARY)
を含めるだけです。
CREATE TEMP TABLE stage_tbl( ... )
PostgreSQL の一時テーブルは永続化されず、セッション終了時に削除されます。都度 CREATE TEMP TABLE
しましょう。
トランザクション終了時に削除したい場合は、 ON COMMIT DROP
をつけます。
CREATE TEMP TABLE stage_tbl( ... ) ON COMMIT DROP
ステージテーブルのスキーマがターゲットテーブルと同じ場合は、 ターゲットテーブルをソースにlike
でテーブル作成しましょう(CREATE TEMP TABLE stage_tbl (like target_tbl)
)。
2. S3 からステージテーブルにインポート
Aurora PostgreSQL にはネイティブで S3 からデータをロードする機能が備わっています。
本機能を利用し、SQL SELECT aws_s3.table_import_from_s3
を流します。
SELECT aws_s3.table_import_from_s3( 'stage_tbl', '', '(format csv)', aws_commons.create_s3_uri('BUCKET-NAME', 'KEY', 'REGION') )
事前準備として
- DB インスタンスにS3へのアクセスを許可
- DB インスタンスで S3 エクステンションを有効化
などの対応が必要です。
詳細は、次のドキュメントを参照ください。
3. ステージテーブルとターゲットテーブル間のマージ
ステージ・ターゲットテーブル間でデータをマージ(UPSERT)します。
2つのアプローチがあります。
- DELETE & INSERT 方式(Redshiftのドキュメント通り):両方に同じキーが存在する場合、ターゲットテーブルから対象レコードを
DELETE
。その後、バルクINSERT
- ON CONFLICT 方式(PostgreSQL固有):
ON CONFLICT
を用い、デフォルトはINSERT
、コンフリクト時はUPDATE
を実行
3-A : DELETE & INSERT 方式
ステージ・ターゲットテーブル間でキーが重複するレコードが存在する場合、コンフリクトにより INSERT が失敗します。
事前に重複するレコードをターゲットテーブルから DELETE
しておきます。
DELETE FROM target USING stage WHERE target.primarykey = stage.primarykey;
その後、ステージテーブルにあるレコードをターゲットテーブルにまとめて INSERT
します。
INSERT INTO target SELECT * FROM stage;
Redshift の UPSERT ドキュメントと同じアプローチです。
3-B : ON CONFLICT 方式
PostgreSQL は 9.5 から INSERT
文に ON CONFLICT
句が追加され、 UPSERT
に対応しました。
コンフリクトが発生して INSERT
できないときは、 DO UPDATE
で UPDATE
を走らせます。
INSERT INTO target(email, data) SELECT email, data FROM stage ON CONFLICT ON CONSTRAINT target_pkey DO UPDATE SET data = excluded.data;
ステージテーブルのデータには excluded
でアクセスできます。
当然のことながら、コンフリクトが発生しないと UPDATE
は起こりません。
ターゲットテーブルには制約を設定してください。
4 : Psycopg2 で一連の UPSERT 処理を書く
以上を Python/psycopg2 にまとめます。
import psycopg2 def upsert(): with psycopg2.connect(...) as conn: with conn.cursor() as cur: # 1. ステージテーブルの作成 cur.execute(SQL_CREATE_STAGE_TABLE) # 2. S3 のCSVをステージテーブルにロード cur.execute(SQL_LOAD_FROM_S3) # 3. UPSERT # 3-A:DELETE&INSERT方式 cur.execute(SQL_DELETE) cur.execute(SQL_INSERT) # 3-B:ON CONFLICT方式 cur.execute(SQL_ON_CONFLICT_UPSERT)
psycopg2 は 2.5 から コンテキストマネージャーに対応しているため、 connect
や cursor
に with
を使えます。
さらに、psycopg2 の connection
クラスの autocommit
はデフォルトが False
のため、
SQLを初回呼び出し後、commit/rollback が発生するまで同じトランザクションで処理されます。
ステージ(一時)テーブルのデータは一連のSQLで有効であり、すべてのSQLが成功すれば最後にトランザクションはコミットされ、失敗すればロールバックされます。
最後に
S3 のデータを Aurora PostgreSQL に UPSERT する方法を紹介しました。
2大処理
- S3 から PostgreSQL へのロード
- PostgreSQL 内のテーブル間の UPSERT
はともに SQL で完結しているため、アプリケーションのスキルをほぼ要求せずに、データ処理できます。
AWS Glueを使うほど複雑な処理をやっていなかったり、データサイズが小さいために Redshift ではなく PostgreSQL を使っているようなケースにおいて、有効なアプローチではないかと思います。
それでは。