RedshiftのDB間でスキーマ単位で一括テーブルコピーをしてみた

はじめに

DI部のおおたきです。Redshiftでは1つのクラスター内に複数DBを作成することが可能ですが、DBのコピーって出来るのかなと調べたところそのような機能は残念ながらありませんでした。
そこで何か良い方法はないかと調べたところRedshiftにはスナップショットからテーブルを復元できる方法があります。この機能はDB間をまたいで出来るため新しく作成したDBに対してテーブルを復元することでコピーすることが可能です。
ただし、まとめてテーブルを指定して復元することは出来ないため、1つずつ操作するしかありません。
(1つのテーブルの復元が終わるのを待って、次のテーブルを復元させるという感じです)
数テーブル程度ならマネージメントコンソールから手作業で構わないと思いますが、テーブ数が増えると作業時間がかなりかかってしまい手間になります。
そこで今回Pythonを使ってスキーマ単位で一括テーブルコピーできるプログラムを書いてみました。

プログラムを書いてみた

AWS Lambdaから実行できるようにしようと思いましたが、処理時間が長くなのでLambdaは不向きのためローカル環境で動作をさせるようにしました。
boto3を使います。またテーブル一覧を取得するためpsycopg2ライブラリも使用しています。
実行する前はこのライブラリを事前にインストールして使えるようにしておいてください。
またboto3を使うにあたりアクセスキーとシークレットキーが必要なので事前にクレデンシャルの設定をしておいてください。
処理の流れとしては以下のような感じになります。

  1. Redshiftに接続して対象スキーマのテーブル一覧を取得する
  2. boto3のrestore_table_from_cluster_snapshotでテーブルを新しいDBに復元する
  3. boto3のdescribe_table_restore_statusでテーブルの復元状態をチェックし、処理が完了したら次のテーブルを復元する
  4. 処理が完了していなければ待機し、再度復元状態をチェックする

コード

コードは以下のような感じです。

from boto3.session import Session
import psycopg2
from time import sleep

CLUSTER_ID = '対象のクラスターID'
SNAPSHOT_ID = '復元対象のスナップショットID'
SCHEMA_NAME = '復元する対象のスキーマー名'
SOURCE_DB = '復元元のDB'
TARGET_DB = '復元先のDB'

def get_connection():
    return psycopg2.connect(
        host='Redshiftのホスト名',
        database=SOURCE_DB,
        port=5439,
        user='接続ユーザー名',
        password='接続パスワード'
);

profile = 'クレデンシャルが設定してあるプロファイル名'
session = Session(profile_name=profile)
client = session.client('redshift')

conn = get_connection()
cursor = conn.cursor()
cursor.execute("select \"table\" as table_name from SVV_TABLE_INFO where schema = '" \
               + SCHEMA_NAME  + "' and database = '" + SOURCE_DB + "' order by table_name")table_name")

results = cursor.fetchall()
cursor.close()
conn.close()

for row in results:
    table = row[0]
    print(table)
    response1 = client.restore_table_from_cluster_snapshot(
        ClusterIdentifier = CLUSTER_ID,
        SnapshotIdentifier = SNAPSHOT_ID,
        SourceDatabaseName = SOURCE_DB,
        SourceSchemaName = SCHEMA_NAME,
        SourceTableName = table,
        TargetDatabaseName = TARGET_DB,
        TargetSchemaName = SCHEMA_NAME,
        NewTableName = table
    )
    print(response1)

    while True:
        request_id = response1['TableRestoreStatus']['TableRestoreRequestId']
        print(request_id)
        response2 = client.describe_table_restore_status(
            ClusterIdentifier='csa-redshift-csanalytics-stack',
            TableRestoreRequestId=request_id
        )
        print(response2)
        status = response2['TableRestoreStatusDetails'][0]['Status']
        if (status == 'SUCCEEDED'):
            break
        else:
            sleep(15)

簡単に解説します。まず5~9行目で復元する環境のDBやスキーマ名などを設定をします。11~18行目でRedshiftに接続するコネクションを取得しています。26行目で対象となるスキーマのテーブル一覧を取得します。後は取得したテーブル数だけ36~45行目で復元処理を行い、48~60行目で復元状態をチェックしています。describe_table_restore_statusの引数にリクエストIDを指定することで復元処理中のテーブルの実行状態のみのデータを取得しています。また処理が終わっていなければ15秒待機させ終わっていれば次のテーブルを処理するようになっています。

動かす前の事前準備

プログラムを動かす前にやっておく必要なことがあります。コピー先のDBは当然作成ししていないとだめですが、それ以外にスキーマも作成していないとエラーとなってしまいますので事前に作成が必要です。またコピー先に既に同一名のテーブルが存在する場合もエラーとなってしまうので気をつけてください。
あと自分が嵌ったこととして、対象とするスナップショットは起動中のRedshiftから作成したものでないとエラーになります。例えば現在起動中のRedshiftがAというスナップショットからリストアしたもののとします。そのAというスナップショットを指定してテーブルの復元をしようとするとエラーとなってしまいます。したがって現在起動している状態のRedshiftからBというスナップショットを作成し、そのスナップショットを指定してテーブルの復元をするようにしてください。

動かしてみた

実際に動かすと以下のようなログが出力され、StatusがPENDING⇒IN_PROGRESS⇒SUCCEEDEDと変わるのが分かります。また少し見づらいですがStatusがSUCCEEDEDのログのRequestTimeとdateを比較することで処理にどれぐらいかかったかも分かります。

{'TableRestoreStatusDetails': [{'TableRestoreRequestId': 'f80a2911-794f-4d27-97c5-f0ffb8273978', 'Status': 'PENDING', 'RequestTime': datetime.datetime(2018, 4, 12, 2, 6, 42, 591000, tzinfo=tzutc()), 'ClusterIdentifier': 'cm-redshift', 'SnapshotIdentifier': 'manual-cm-redshift20180412', 'SourceDatabaseName': 'cmdb', 'SourceSchemaName': 'test', 'SourceTableName': 'test1_tb', 'TargetDatabaseName': 'cmdb2', 'TargetSchemaName': 'test', 'NewTableName': 'test1_tb'}], 'ResponseMetadata': {'RequestId': '24ff24a1-3df6-11e8-bdf9-93f64a64c59d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '24ff24a1-3df6-11e8-bdf9-93f64a64c59d', 'content-type': 'text/xml', 'content-length': '1138', 'date': 'Thu, 12 Apr 2018 02:06:42 GMT'}, 'RetryAttempts': 0}}
・
・
・
{'TableRestoreStatusDetails': [{'TableRestoreRequestId': 'f80a2911-794f-4d27-97c5-f0ffb8273978', 'Status': 'IN_PROGRESS', 'RequestTime': datetime.datetime(2018, 4, 12, 2, 6, 42, 591000, tzinfo=tzutc()), 'ClusterIdentifier': 'cm-redshift', 'SnapshotIdentifier': 'manual-cm-redshift20180412', 'SourceDatabaseName': 'cmdb', 'SourceSchemaName': 'test', 'SourceTableName': 'test1_tb', 'TargetDatabaseName': 'cmdb2', 'TargetSchemaName': 'vietnam', 'NewTableName': 'test1_tb'}], 'ResponseMetadata': {'RequestId': '6d20b81b-3df6-11e8-9dc9-5f625d07a268', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6d20b81b-3df6-11e8-9dc9-5f625d07a268', 'content-type': 'text/xml', 'content-length': '1142', 'date': 'Thu, 12 Apr 2018 02:08:43 GMT'}, 'RetryAttempts': 0}}
・
・
・
{'TableRestoreStatusDetails': [{'TableRestoreRequestId': 'f80a2911-794f-4d27-97c5-f0ffb8273978', 'Status': 'SUCCEEDED', 'RequestTime': datetime.datetime(2018, 4, 12, 2, 6, 42, 591000, tzinfo=tzutc()), 'ProgressInMegaBytes': 16, 'TotalDataInMegaBytes': 16, 'ClusterIdentifier': 'cm-redshift', 'SnapshotIdentifier': 'manual-cm-redshift20180412', 'SourceDatabaseName': 'cmdb', 'SourceSchemaName': 'test', 'SourceTableName': 'test1_tb', 'TargetDatabaseName': 'cmdb2', 'TargetSchemaName': 'test', 'NewTableName': 'test1_tb'}], 'ResponseMetadata': {'RequestId': '7f2b66d6-3df6-11e8-ac9a-41ed610ba1dd', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '7f2b66d6-3df6-11e8-ac9a-41ed610ba1dd', 'content-type': 'text/xml', 'content-length': '1250', 'date': 'Thu, 12 Apr 2018 02:09:13 GMT'}, 'RetryAttempts': 0}}

まとめ

いかがでしたでしょうか。今回はスキーマ単位での別DBへのテーブル復元でしたが、テーブル一覧を取得するSQLを修正することで複数スキーマも一括で復元できるかと思います。クラスターを複数立てると費用がかかって困るけど環境を分けて使用したいなどの場合複数DBを作成することがあるかと思いますが、そんな時に一括でテーブルを復元できるのは便利ではないでしょうか。今回は以上です。