S3のデータをAmazon Aurora PostgreSQLにUPSERTしてみた

Python/Psycopgを使いS3のデータをAurora PostgreSQLにロードし、UPSERTするELT方法を紹介します。
2021.11.17

AWSで構築したデータ基盤で、S3のデータを Aurora PostgreSQLにUPSERTする方法を紹介します。

UPSERT(INSERT/UPDATE)とは

データベースにレコード挿入する際、100%新規データを扱うのであれば、テーブルにINSERTするだけですみます。 更新データも存在する場合、新規の場合はINSERT、更新の場合は UPDATE というように処理を分ける必要があります。

このように、条件によってUPDATEとINSERTを使い分けて処理することを、UPdate と inSERT をくっつけて UPSERT と呼びます。

データ処理で頻出の更新パターンであり、AWSのデータウェアハウス Amazon Redshiftにも、UPSERT単独のドキュメントが存在します。

新しいデータの更新と挿入 - Amazon Redshift

このドキュメントでは

  1. テーブルの行すべてを上書き
  2. テーブルの特定の列を更新

の2種類が解説されています。

本ブログでは

  • Amazon Redshiftの代わりにAmazon Aurora
  • ステージテーブルのデータソースはS3

に置き換えて「1. テーブルの行すべてを上書き」を実現する方法を紹介します。

ポイント

  • UPSERT 処理全体を1トランザクションで済ます
  • PostgreSQLのS3インポート機能でS3からステージテーブルにロード
  • ステージ <-> ターゲットテーブル間でUPSERT
  • INSERT 文の ON CONFLICT 句を使うと、 SQL 1本で UPSERT をかける
  • ステージテーブルは一時テーブルとしてとして作成。PostgreSQLの一時テーブルスキーマは永続化されない

環境

  • データベース : Amazon Aurora PostgreSQL 13系
  • データソース : S3
  • 処理プログラム : Python + Psycopg2

1. 一時テーブルの作成

S3 データをロードするステージテーブルは UPSERT 処理中しか利用しないため、一時テーブルとしてCSV フォーマットに合わせたテーブルを作成します。

テーブル CREATE 時に TEMP(TEMPORARY) を含めるだけです。

CREATE TEMP TABLE stage_tbl(
  ...
)

PostgreSQL の一時テーブルは永続化されず、セッション終了時に削除されます。都度 CREATE TEMP TABLE しましょう。 トランザクション終了時に削除したい場合は、 ON COMMIT DROP をつけます。

CREATE TEMP TABLE stage_tbl(
  ...
) ON COMMIT DROP

ステージテーブルのスキーマがターゲットテーブルと同じ場合は、 ターゲットテーブルをソースにlike でテーブル作成しましょう(CREATE TEMP TABLE stage_tbl (like target_tbl))。

2. S3 からステージテーブルにインポート

Aurora PostgreSQL にはネイティブで S3 からデータをロードする機能が備わっています。

本機能を利用し、SQL SELECT aws_s3.table_import_from_s3 を流します。

SELECT aws_s3.table_import_from_s3(
'stage_tbl',
'',
'(format csv)',
aws_commons.create_s3_uri('BUCKET-NAME', 'KEY', 'REGION')
)

事前準備として

  • DB インスタンスにS3へのアクセスを許可
  • DB インスタンスで S3 エクステンションを有効化

などの対応が必要です。

詳細は、次のドキュメントを参照ください。

3. ステージテーブルとターゲットテーブル間のマージ

ステージ・ターゲットテーブル間でデータをマージ(UPSERT)します。

2つのアプローチがあります。

  1. DELETE & INSERT 方式(Redshiftのドキュメント通り):両方に同じキーが存在する場合、ターゲットテーブルから対象レコードを DELETE。その後、バルク INSERT
  2. ON CONFLICT 方式(PostgreSQL固有): ON CONFLICT を用い、デフォルトは INSERT、コンフリクト時は UPDATE を実行

3-A : DELETE & INSERT 方式

ステージ・ターゲットテーブル間でキーが重複するレコードが存在する場合、コンフリクトにより INSERT が失敗します。

事前に重複するレコードをターゲットテーブルから DELETE しておきます。

DELETE FROM target 
USING stage 
WHERE target.primarykey = stage.primarykey;

その後、ステージテーブルにあるレコードをターゲットテーブルにまとめて INSERT します。

INSERT INTO target 
SELECT * FROM stage;

Redshift の UPSERT ドキュメントと同じアプローチです。

3-B : ON CONFLICT 方式

PostgreSQL は 9.5 から INSERT 文に ON CONFLICT 句が追加され、 UPSERT に対応しました。

コンフリクトが発生して INSERT できないときは、 DO UPDATEUPDATE を走らせます。

INSERT INTO target(email, data)
SELECT email, data
  FROM stage
ON CONFLICT ON CONSTRAINT target_pkey
DO UPDATE SET
  data = excluded.data;

ステージテーブルのデータには excluded でアクセスできます。

当然のことながら、コンフリクトが発生しないと UPDATE は起こりません。 ターゲットテーブルには制約を設定してください。

4 : Psycopg2 で一連の UPSERT 処理を書く

以上を Python/psycopg2 にまとめます。

import psycopg2

def upsert():
    with psycopg2.connect(...) as conn:
        with conn.cursor() as cur:
 
            # 1. ステージテーブルの作成
            cur.execute(SQL_CREATE_STAGE_TABLE)

            # 2. S3 のCSVをステージテーブルにロード
            cur.execute(SQL_LOAD_FROM_S3)

            # 3. UPSERT
            # 3-A:DELETE&INSERT方式
            cur.execute(SQL_DELETE)
            cur.execute(SQL_INSERT)
            # 3-B:ON CONFLICT方式
            cur.execute(SQL_ON_CONFLICT_UPSERT)

psycopg2 は 2.5 から コンテキストマネージャーに対応しているため、 connectcursorwith を使えます。

さらに、psycopg2 の connection クラスの autocommit はデフォルトが False のため、 SQLを初回呼び出し後、commit/rollback が発生するまで同じトランザクションで処理されます。

ステージ(一時)テーブルのデータは一連のSQLで有効であり、すべてのSQLが成功すれば最後にトランザクションはコミットされ、失敗すればロールバックされます。

最後に

S3 のデータを Aurora PostgreSQL に UPSERT する方法を紹介しました。

2大処理

  • S3 から PostgreSQL へのロード
  • PostgreSQL 内のテーブル間の UPSERT

はともに SQL で完結しているため、アプリケーションのスキルをほぼ要求せずに、データ処理できます。

AWS Glueを使うほど複雑な処理をやっていなかったり、データサイズが小さいために Redshift ではなく PostgreSQL を使っているようなケースにおいて、有効なアプローチではないかと思います。

それでは。

参考