Amazon Redshift ストアドプロシージャをプログラムから呼び出す方法

2019.06.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

今回は、プログラムから Redshift のストアドプロシージャを実行して結果を得る方法と、Java(JDBCドライバ)とPython(psycopg2)から呼び出すコードについて解説します。Amazon Redshift は、PostgreSQLと概ね互換性がありますが、ストアドプロシージャのインタフェースは異なります。

ストアドプロシージャを実行して結果を取得するには

まず、Redshift のストアドプロシージャを実行して結果を取得する2つの方法について解説します。なお、ともに以下のストアドプロシージャのサンプルは同じテーブルから同じ結果が得られるように作成しています。

方法1:ストアドプロシージャの結果をカーソルから取得する方法

引数にrefcursor型(INOUT)の変数rs_outを受け取り、OPENしたカーソルを呼び出し元に返します。OPENしたカーソルをCLOSEせずに終わることで、同一ドランザクション内でカーソルを利用できるようになります。

-- DROP PROCEDURE sp_get_result_cursor(integer, refcursor);
CREATE OR REPLACE PROCEDURE sp_get_result_cursor(orderdate IN integer, rs_out INOUT refcursor)
AS $$
BEGIN
OPEN rs_out FOR SELECT * FROM public.lineorder WHERE lo_orderdate >= orderdate;
END;
$$ LANGUAGE plpgsql;

psqlコマンドでストアドプロシージャの実行、カーソルから結果を取得すると以下のようになります。カーソルから結果を取得するには、ストアドプロシージャの実行とカーソルからの結果の取得を同一トランザクションで実行する必要があります。そのため、下記の呼び出しでは、最初にBEGIN、最後にCOMMITを実行しています。最初にBEGINを実行せずにストアドプロシージャの実行すると、OPENしたはずのカーソルが参照できません。

cmdb=> BEGIN;
BEGIN
cmdb=> CALL sp_get_result_cursor(19920101, 'lineorder_cursor');
rs_out
------------------
lineorder_cursor
(1 row)

cmdb=> FETCH ALL FROM lineorder_cursor;
lo_orderkey | lo_linenumber | lo_custkey | lo_partkey | lo_suppkey | lo_orderdate | lo_orderpriority | lo_shippriority | lo_quantity | lo_extendedprice | lo_ordertotalprice | lo_discount | lo_revenue | lo_supplycost | lo_tax | lo_commitdate | lo_shipmode
-------------+---------------+------------+------------+------------+--------------+------------------+-----------------+-------------+------------------+--------------------+-------------+------------+---------------+--------+---------------+-------------
415775811 | 2 | 1031035 | 114370 | 589528 | 19920101 | 4-NOT SPECI | 0 | 0 | 0 | 0 | 5 | 0 | 83062 | 8 | 19920323 | SHIP
455439878 | 1 | 770221 | 15218 | 691002 | 19920101 | 1-URGENT | 0 | 21 | 2379741 | 20621925 | 6 | 2236956 | 67992 | 1 | 19920314 | SHIP
:
:

cmdb=> COMMIT;
COMMIT

方法2:ストアドプロシージャの結果を一時テーブルから取得する方法

引数にvarchar型(INOUT)の変数temp_tablenameを受け取り、一時テーブルを作成します。作成したテーブルは一時テーブルなので接続を切断すると自動的に削除されます。ストアドプロシージャの中では、コネクションプーリングなどにより、一時テーブルが既に作成されていることを考慮して、一時テーブルがもし存在したら削除するように実装しています。

-- DROP PROCEDURE sp_get_temp_table(integer, varchar);
CREATE PROCEDURE sp_get_temp_table(orderdate IN integer, temp_tablename INOUT varchar(256))
AS $$
DECLARE
row record;
BEGIN
EXECUTE 'DROP TABLE IF EXISTS ' || temp_tablename;
EXECUTE 'CREATE TEMP TABLE ' || temp_tablename || ' AS SELECT * from cm_user.lineorder WHERE lo_orderdate >= ' || orderdate;
END;
$$ LANGUAGE plpgsql;

psqlコマンドでストアドプロシージャの実行、一時テーブルから結果を取得すると以下のようになります。一時テーブルから結果を取得するには、同一コネクションでクエリを実行する必要があります。

cmdb=> CALL sp_get_temp_table(19920101, 'temp_lineorder');
INFO: Table "temp_lineorder" does not exist and will be skipped
temp_tablename
----------------
temp_lineorder
(1 row)

cmdb=> SELECT * FROM temp_lineorder;
lo_orderkey | lo_linenumber | lo_custkey | lo_partkey | lo_suppkey | lo_orderdate | lo_orderpriority | lo_shippriority | lo_quantity | lo_extendedprice | lo_ordertotalprice | lo_discount | lo_revenue | lo_supplycost | lo_tax | lo_commitdate | lo_shipmode
-------------+---------------+------------+------------+------------+--------------+------------------+-----------------+-------------+------------------+--------------------+-------------+------------+---------------+--------+---------------+-------------
415775811 | 2 | 1031035 | 114370 | 589528 | 19920101 | 4-NOT SPECI | 0 | 0 | 0 | 0 | 5 | 0 | 83062 | 8 | 19920323 | SHIP
415893895 | 2 | 2741387 | 809619 | 778335 | 19920101 | 5-LOW | 0 | 12 | 1834284 | 7969151 | 2 | 1797598 | 91714 | 7 | 19920326 | RAIL
:
:

方法1と方法2をどのように使い分けるのか?

方法1のストアドプロシージャから返されるカーソルには、DECLARE CURSOR で説明しているのと同じ制約とパフォーマンスに関する考慮事項が適用されます。2つの方式をご紹介しましたが、カーソルを使わなければならない場合を除き、制限が少ない「方法2」を推奨します。カーソルの制限事項は以下のとおりです。

  • 結果セット全体がリーダーノードにマテリアライズドされるため、大きな結果セットにカーソルを使用すると、パフォーマンスが低下する可能性がある
  • 1 つのセッションで、同時に開くことができるカーソルは 1 つのみ

カーソルについては、私が以前書いたブログ(Amazon Redshift ストアドプロシージャでカラムの中のカンマ区切りデータを複数レコードに分解する)のようにカーソルでなければならない場合を除き、可能な限り別の方法2をお勧めします。

方法2は一時テーブルを作成するため、コネクションプーリングする場合はクエリ結果を取得したら一時テーブルを必ず削除するようにハンドリンクしてください。

Java(JDBCドライバ)からストアドプロシージャを実行する

実行環境

  • Java JDK
  • OpenJDK Runtime Environment Corretto-8.212.04.1 (build 1.8.0_212-b04)

  • JDBCドライバ

  • RedshiftJDBC42-1.2.16.1027.jar

方式1:ストアドプロシージャの結果をカーソルから取得する方法

ストアドプロシージャは、SELECTステートメントと同様にexecuteで実行してカーソルを取得しました。FETCH ALLにて全件取得しています。28行目のコメントのようにFETCH FORWARD 5と指定して少量の結果セットを取得することも可能です。

クライアント側のメモリ不足エラーが発生する場合、JDBC の fetch size パラメータを設定することで、ユーザーが少量の結果セットを取得するように指定できます。詳細については、「JDBC フェッチサイズパラメータの設定」を参照してください。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class RunSPCursorFetch {
protected static final String url = "jdbc:redshift://cm-cluster.abcdefghijkl.us-east-1.redshift.amazonaws.com:5439/cmdb";
protected static final String user = "cm_user";
protected static final String pass = "Cm20190609";

public static void main(String[] args) {
{
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", pass);
props.setProperty("ssl","true");
props.setProperty("sslfactory","com.amazon.redshift.ssl.NonValidatingFactory");

try (Connection conn = DriverManager.getConnection(url, props)) {
conn.setAutoCommit(false);
PreparedStatement proc = conn.prepareStatement("CALL sp_get_result_cursor(19920101, 'lineorder_cursor');");
proc.execute();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("FETCH ALL FROM lineorder_cursor;"); // 全てのレコードを取得
// ResultSet rs = stmt.executeQuery("FETCH FORWARD 5 FROM lineorder_cursor;"); // 先頭の5レコードを取得
while (rs.next()) {
int lo_orderkey = rs.getInt(1);
System.out.println("orderkey: " + lo_orderkey);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}

方法2:ストアドプロシージャの結果を一時テーブルから取得する方法

ストアドプロシージャの実行はexecuteします。結果を取得するためのSELECTステートメントは、executeQueryで実行しました。

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class RunSPTempSelect {
protected static final String url = "jdbc:redshift://cm-cluster.abcdefghijkl.us-east-1.redshift.amazonaws.com:5439/cmdb";
protected static final String user = "cm_user";
protected static final String pass = "Cm20190609";

public static void main(String[] args) {
{
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", pass);
props.setProperty("ssl","true");
props.setProperty("sslfactory","com.amazon.redshift.ssl.NonValidatingFactory");

try (Connection conn = DriverManager.getConnection(url, props)) {
conn.setAutoCommit(false);
PreparedStatement proc = conn.prepareStatement("CALL sp_get_temp_table(19920101, 'temp_lineorder');");
proc.execute();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM temp_lineorder;"); // 全てのレコードを取得
// ResultSet rs = stmt.executeQuery("SELECT * FROM temp_lineorder LIMIT 5;"); // 先頭の5レコードを取得
while (rs.next()) {
int lo_orderkey = rs.getInt(1);
System.out.println("orderkey: " + lo_orderkey);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}

Python(psycopg2)からストアドプロシージャを実行する

実行環境

  • Python 3.6.3
  • psycopg2 version 2.8.2
  • postgresql 11.1_1 (libpq.5.11)

方式1:ストアドプロシージャの結果をカーソルから取得する方法

ストアドプロシージャは、psycopg2のcallprocではエラーとなるため、executeで実行してカーソルを取得しました。カーソルから結果セットを取得します。

import psycopg2

# DBへの接続
conn = psycopg2.connect(
host="cm-cluster.abcdefghijkl.us-east-1.redshift.amazonaws.com",
database="cmdb",
port="5439",
user="cm_user",
password="Cm20190609"
)

cursor = conn.cursor();
# cursor.callproc("sp_get_result_cursor", ["19920101", "lineorder_cursor"]) # error
cursor.execute("CALL sp_get_result_cursor(19920101, 'lineorder_cursor');");

cursor2 = conn.cursor('lineorder_cursor');
for row in cursor2:
print("orderkey: " + str(row[0]))

cursor.close();
cursor2.close();
conn.close();

callprocを実行したときのエラーメッセージは以下のとおりです。

/Users/cmuser/venv/python3.6.0/bin/python /Users/cmuser/IdeaProjects/RedshiftSP/run_sp_cursor_fetch.py
Traceback (most recent call last):
File "/Users/cmuser/IdeaProjects/RedshiftSP/run_sp_cursor_fetch.py", line 13, in
cursor.callproc("sp_get_result_cursor", ["19920101", "lineorder_cursor"])
psycopg2.errors.WrongObjectType: sp_get_result_cursor("unknown", "unknown") is a procedure
HINT: To call a procedure, use CALL.

方法2:ストアドプロシージャの結果を一時テーブルから取得する方法

ストアドプロシージャと結果を取得するためのSELECTステートメントは、ともにexecuteで実行します。

import psycopg2

# DBへの接続
conn = psycopg2.connect(
host="cm-cluster.abcdefghijkl.us-east-1.redshift.amazonaws.com",
database="cmdb",
port="5439",
user="cm_user",
password="Cm20190609"
)

cursor = conn.cursor();
cursor.execute("CALL sp_get_temp_table(19920101, 'temp_lineorder');");
cursor.execute("SELECT * FROM temp_lineorder;");
results = cursor.fetchall()
for row in results:
print("orderkey: " + str(row[0]))

cursor.close();
conn.close();

まとめ

データ分析でよく用いられる言語として、Java(JDBCドライバ)とPython(psycopg2)からストアドプロシージャの呼び出しについてまとめました。Pythonについては試行錯誤の結果なので、よりスマートな方法があるかもしれません。

あくまでも個人的な見解ですが、カーソルはモノリシックなDBを前提にして機能なので、大きなデータを取り扱うシェアードナッシングアーキテクチャのRedshiftには方式2のような水平分散の仕組みを活かす方式を可能な限り選択することおおすすめします。

合わせて読みたい

Amazon Redshift カスタムJDBCドライバを使い倒す

psycopg2によるPython2.7からのAmazon Redshiftアクセスサンプル