SnowflakeのSnowpipe REST APIを利用して内部ステージからデータロードをしてみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
Snowflakeでは、REST API経由でSnowpipeを実行することによって「内部ステージ」および「外部ステージ」に配置したファイルをテーブルにロードすることができます。
これまで試したことがなかったので、今回は「内部ステージ(テーブルステージ)」からのデータロードを実際に試してみました。
前提
キーペア認証については事前に設定しています。キーペア認証の設定についてはこちらをご参照ください。
OSはMacOS、Pythonは3.7を利用しています。Python環境ではpipenvを利用していますが、基本的には通常のPython環境でも変わらないかと思います。
テーブルとデータの事前準備
まずは、Snowflake上で空のテーブルを作成しておきます。今回はサンプルデータベースのregion
テーブルと同じ定義のテーブルを用意します。なお、データベースは事前に用意しているものを利用しています。
USE DATABASE ootaka_sandbox_db; USE SCHEMA public; CREATE TABLE region LIKE snowflake_sample_data.tpch_sf1.region;
また、下記のCSVファイルも後でテーブルステージに登録するために用意しておきます。
R_REGIONKEY,R_NAME,R_COMMENT 0,AFRICA,This is the AFRICA 1,AMERICA,This is the AMERICA 2,ASIA,This is the ASIA 3,EUROPE,This is the EUROPE 4,MIDDLE EAST,This is the MIDDLE EAST
データロードの準備
では、こちらのドキュメントを参照しつつ、データロードの準備をしていきます。
Python SDKのインストール
まずは必要なSDKを下記でインストールします。
$ pipenv --python 3.7 $ pipenv install snowflake-ingest
ここは特に問題ないかと思います。
パイプの作成
今回は内部ステージのテーブルステージを利用するので、ステージについて特に何もせずにパイプの作成を進めます。
このとき、PIPE作成時にFILE_FORMATを指定することに注意をします。
USE DATABASE ootaka_sandbox_db; USE SCHEMA public; CREATE PIPE rest_api_pipe IF NOT EXISTS AS COPY INTO region FROM @%region FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);
先程作成したregion
テーブルのテーブルステージからregion
テーブルにデータをCOPYするrest_api_pipe
というパイプが作成できました。
セキュリティの構成
ドキュメントに記載のある通り、本来であれば最小権限を与えるため、個別のユーザとロールを作成するべきですが今回は個別作成はスキップします。実際の運用時には適切なユーザと、ロールを作成してください。
また、「前提」に記載した通りキーペア認証については設定済みとなります。
データファイルのステージ
SnowSQLのPUTコマンドを利用して、テーブルステージに先程用意したregion.csv
ファイルをPUTします。
foo_bar#(no warehouse)@OOTAKA_SANDBOX_DB.PUBLIC>PUT file://./region.csv @%region; region.csv_c.gz(0.00MB): [##########] 100.00% Done (0.079s, 0.00MB/s). ╒════════════╤═══════════════╤═════════════╤═════════════╤════════════════════╤════════════════════╤══════════╤═════════╕ │ source │ target │ source_size │ target_size │ source_compression │ target_compression │ status │ message │ ╞════════════╪═══════════════╪═════════════╪═════════════╪════════════════════╪════════════════════╪══════════╪═════════╡ │ region.csv │ region.csv.gz │ 179 │ 135 │ NONE │ GZIP │ UPLOADED │ │ ╘════════════╧═══════════════╧═════════════╧═════════════╧════════════════════╧════════════════════╧══════════╧═════════╛ 1 Row(s) produced. Time Elapsed: 2.332s
この結果のtarget
に注意します。ファイルが自動圧縮されているのでregion.csv.gz
に変わっているのがわかります。このファイルは後でPythonコードで指定するので覚えておきます。
Snowpipe REST APIを利用してデータロード
以上で準備ができたので、PythonコードからREST APIを利用してロードをしてみます。
今回はPythonコードはドキュメントのサンプルプログラムをそのまま利用してみます。
また、ドキュメントで記載されている通り、各プレースホルダーの値は実際の値に置き換える必要があるので、適切な値に置き換えます。file_list
の値に指定するファイル名は、先程控えておいたファイル名region.csv.gz
を指定することに注意します。
from logging import getLogger from snowflake.ingest import SimpleIngestManager from snowflake.ingest import StagedFile from snowflake.ingest.utils.uris import DEFAULT_SCHEME from datetime import timedelta from requests import HTTPError from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PrivateFormat from cryptography.hazmat.primitives.serialization import NoEncryption import time import datetime import os import logging logging.basicConfig( filename='/tmp/ingest.log', level=logging.DEBUG) logger = getLogger(__name__) # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. def get_private_key_passphrase(): return 'foobar' with open("/Users/ootaka.daisuke/.snowsql/keys/rsa_key.p8", 'rb') as pem_in: pemlines = pem_in.read() private_key_obj = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) private_key_text = private_key_obj.private_bytes( Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8') # Assume the public key has been registered in Snowflake: # private key in PEM format # List of files in the stage specified in the pipe definition file_list=['region.csv.gz'] ingest_manager = SimpleIngestManager(account='foobar', host='foobar.ap-northeast-1.aws.snowflakecomputing.com', user='foo_bar', pipe='ootaka_sandbox_db.public.rest_api_pipe', private_key=private_key_text) # List of files, but wrapped into a class staged_file_list = [] for file_name in file_list: staged_file_list.append(StagedFile(file_name, None)) try: resp = ingest_manager.ingest_files(staged_file_list) except HTTPError as e: # HTTP error, may need to retry logger.error(e) exit(1) # This means Snowflake has received file and will start loading assert(resp['responseCode'] == 'SUCCESS') # Needs to wait for a while to get result in history while True: history_resp = ingest_manager.get_history() if len(history_resp['files']) > 0: print('Ingest Report:\n') print(history_resp) break else: # wait for 20 seconds time.sleep(20) hour = timedelta(hours=1) date = datetime.datetime.utcnow() - hour history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z') print('\nHistory scan report: \n') print(history_range_resp)
このPythonコードを実行してみます。
$ pipenv run python ingest_file.py History scan report: {'files': [], 'startTimeInclusive': '2021-03-17T12:53:04.979Z', 'endTimeExclusive': '2021-03-17T13:53:05.106Z', 'rangeStartTime': '2021-03-17T12:53:04.979Z', 'rangeEndTime': '2021-03-17T13:53:05.106Z', 'pipe': 'ootaka_sandbox_db.public.rest_api_pipe', 'completeResult': 'true'} History scan report: {'files': [{'path': 'region.csv.gz', 'stageLocation': 'tables/773095533178/', 'fileSize': 144, 'timeReceived': '2021-03-17T13:52:44.652Z', 'lastInsertTime': '2021-03-17T13:53:17.008Z', 'rowsInserted': 5, 'rowsParsed': 5, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'startTimeInclusive': '2021-03-17T12:53:25.271Z', 'endTimeExclusive': '2021-03-17T13:53:25.387Z', 'rangeStartTime': '2021-03-17T13:53:17.008Z', 'rangeEndTime': '2021-03-17T13:53:17.008Z', 'pipe': 'ootaka_sandbox_db.public.rest_api_pipe', 'completeResult': 'true'} Ingest Report: {'pipe': 'OOTAKA_SANDBOX_DB.PUBLIC.REST_API_PIPE', 'completeResult': True, 'nextBeginMark': '1_0', 'files': [{'path': 'region.csv.gz', 'stageLocation': 'tables/773095533178/', 'fileSize': 144, 'timeReceived': '2021-03-17T13:52:44.652Z', 'lastInsertTime': '2021-03-17T13:53:17.029Z', 'rowsInserted': 5, 'rowsParsed': 5, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'statistics': {'activeFilesCount': 0}}
無事にロードされました!念の為、テーブルも確かめてみます。
foo_bar#X_SMALL_WH@OOTAKA_SANDBOX_DB.PUBLIC>SELECT * FROM region; ╒═════════════╤═════════════╤═════════════════════════╕ │ R_REGIONKEY │ R_NAME │ R_COMMENT │ ╞═════════════╪═════════════╪═════════════════════════╡ │ 0 │ AFRICA │ This is the AFRICA │ ├─────────────┼─────────────┼─────────────────────────┤ │ 1 │ AMERICA │ This is the AMERICA │ ├─────────────┼─────────────┼─────────────────────────┤ │ 2 │ ASIA │ This is the ASIA │ ├─────────────┼─────────────┼─────────────────────────┤ │ 3 │ EUROPE │ This is the EUROPE │ ├─────────────┼─────────────┼─────────────────────────┤ │ 4 │ MIDDLE EAST │ This is the MIDDLE EAST │ ╘═════════════╧═════════════╧═════════════════════════╛ 5 Row(s) produced. Time Elapsed: 0.571s
問題ありませんね。
後片付け
不要になったものを削除しておきます。
USE DATABASE ootaka_sandbox_db; USE SCHEMA public; DROP TABLE region; DROP PIPE rest_api_pipe;
まとめ
以上、Snowpipe REST APIを利用して内部ステージからデータロードをしてみました。
記事中では、まるでスッとできたかのように書いていますが、いくつか はまった点としては以下がありました。
- PIPE作成時のFILE_FORMAT
- 最初に明示的に記載するのを忘れており、エラーを何度も出しました。
- ロードするcsvファイル
- ファイルフォーマットに問題があり修正しました。
ingest_manager.ingest_files
に渡すファイル名- 何を指定したらよいのか、何度も迷いました。
- 結論としては、
LIST @%region;
のようにして実行した、LISTコマンドで表示されたfile
の値を指定すれば問題ありませんでした。
2021/03/18時点では、外部ステージの場合は取り込みの自動化(AUTO INGEST)ができますが、内部ステージでは自動化ができないので、Snowpipe REST APIを活用することで自動化することも出来そうですね。
どなたかのお役に立てば幸いです。それでは!