Glue PythonShellジョブからRedshift Spectrumパーティションを更新する

Glue PythonShellジョブからRedshift Spectrumパーティションを更新する

Glue PythonShellからSpectrumのadd partitionを行うため、オートコミットを有効にした上でSQLクエリを実行してみました。
Clock Icon2020.10.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

GlueのPythonShellを使って Redshift Spectrumで参照するS3パスにファイルを追加していくというジョブの作成をするという場面がありました。 Spectrumは基本的に容量が多いデータが対象になると思いますので、 データをS3に出力したらパーティション更新のクエリ実行もほぼ必須となります。 今回はこのクエリを実行する方法を調べました。

PyGreSQLでの実行を試す

PythonShellからRedshiftでSQLクエリを実行するにあたってはPyGreSQLがデフォルトで使えますので、 それで実装をすれば良いと思っていました。 具体的には以下のような形です。

import pgdb

query = '''
    alter table spectrum_schema.table_name
    add if not exists partition(date='{0}')
    location 's3://bucket_name/spectrum/date={0}/';
'''.format(date)

conn = pgdb.connect(host=xxx, database=xxx, user=xxx, password=xxx)
cur = conn.cursor()
cur.execute(query)
conn.commit()

しかし、これを実行しようとするとこんな感じのエラーが出ました。

INSERT INTO EXTERNAL TABLE cannot run inside a transaction block

このエラーコード自体はEXTERNAL TABLEにINSERT INTOしようとした際の文言ですが、ほぼ同等のものが出ました。 着目すべきは後半で、トランザクションの中では実行できないとのことです。 SpectrumはRedshiftだけに閉じた話ではないので、トランザクションを使えないというのは特に違和感はないですね。

トランザクション内で実行できないので、クエリをオートコミットで実行することで解決するはずです。 PyGreSQLのマニュアルをみるとConnection.autocommitというプロパティがあり、これを使えばOKそうです。

・・・が、結果的にPythonShellでは、これではダメでした。 というのも、このautocommitNew in version 5.1。 一方PythonShellでpygresqlのバージョンを調べてみると5.0.6でした。

import pkg_resources
print(pkg_resources.get_distribution("pygresql").version)
# => 5.0.6

ということでPyGreSQLを使ってのオートコミットは諦めることにしました。

解決方法

PyGreSQLが使えないので、潔くpsycopg2を使うことにしました。 GlueジョブはS3においたwhlファイルを読み込ませることができるので、その準備をします。 psycopg2-binaryは、whlファイルがPyPIにて公開されているので、これを持ってくるのが一番簡単そうです。 以下の場所からダウンロードできます。

https://pypi.org/project/psycopg2-binary/#files

PythonShellで使用する場合は下記の名前のファイルが適合します。 Pythonのバージョンは今の所3.6で固定ですので、そこもご注意を。

psycopg2_binary-2.8.6-cp36-cp36m-manylinux1_x86_64.whl

これを適当なS3パスでアップロードします。 そしてアップロードしたwhlファイルが読み込めるように、 実行時には「Python ライブラリパス」としてS3パスを指定します。

これでimport psycopg2ができるようになります。 サンプルコードはこんな感じです。

import psycopg2

query = '''
    alter table spectrum_schema.table_name
    add if not exists partition(date='{0}')
    location 's3://bucket_name/spectrum/date={0}/';
'''.format(date)

conn = psycopg2.connect(url)
conn.set_isolation_level(0)
with conn.cursor() as cursor:
    cursor.execute(query)

set_isolation_level(0)を指定することでオートコミットでクエリが実行されます。 これでパーティションの更新ができました。

まとめ

GlueのPythonShellでオートコミットを有効にしたクエリの実行を行いました。 中途半端にPyGreSQLがデフォルトで使える状況だったので苦戦してしまいましたが、 単純にpsycopg2を使うことで実現できました。

Spectrumに限らず、Athenaでも同様の状況になることが予想できますが、 同じ方法で解決できるものと思います。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.