SnowflakeのSnowpipe REST APIを利用して内部ステージからデータロードをしてみた

2021.03.18

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

Snowflakeでは、REST API経由でSnowpipeを実行することによって「内部ステージ」および「外部ステージ」に配置したファイルをテーブルにロードすることができます。

これまで試したことがなかったので、今回は「内部ステージ(テーブルステージ)」からのデータロードを実際に試してみました。

前提

キーペア認証については事前に設定しています。キーペア認証の設定についてはこちらをご参照ください。

OSはMacOS、Pythonは3.7を利用しています。Python環境ではpipenvを利用していますが、基本的には通常のPython環境でも変わらないかと思います。

テーブルとデータの事前準備

まずは、Snowflake上で空のテーブルを作成しておきます。今回はサンプルデータベースのregionテーブルと同じ定義のテーブルを用意します。なお、データベースは事前に用意しているものを利用しています。

USE DATABASE ootaka_sandbox_db;
USE SCHEMA public;

CREATE TABLE region
LIKE snowflake_sample_data.tpch_sf1.region;

また、下記のCSVファイルも後でテーブルステージに登録するために用意しておきます。

region.csv

R_REGIONKEY,R_NAME,R_COMMENT
0,AFRICA,This is the AFRICA
1,AMERICA,This is the AMERICA
2,ASIA,This is the ASIA
3,EUROPE,This is the EUROPE
4,MIDDLE EAST,This is the MIDDLE EAST

データロードの準備

では、こちらのドキュメントを参照しつつ、データロードの準備をしていきます。

Python SDKのインストール

まずは必要なSDKを下記でインストールします。

$ pipenv --python 3.7
$ pipenv install snowflake-ingest

ここは特に問題ないかと思います。

パイプの作成

今回は内部ステージのテーブルステージを利用するので、ステージについて特に何もせずにパイプの作成を進めます。

このとき、PIPE作成時にFILE_FORMATを指定することに注意をします。

USE DATABASE ootaka_sandbox_db;
USE SCHEMA public;

CREATE PIPE rest_api_pipe IF NOT EXISTS AS
COPY INTO region
FROM @%region
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);

先程作成したregionテーブルのテーブルステージからregionテーブルにデータをCOPYするrest_api_pipeというパイプが作成できました。

セキュリティの構成

ドキュメントに記載のある通り、本来であれば最小権限を与えるため、個別のユーザとロールを作成するべきですが今回は個別作成はスキップします。実際の運用時には適切なユーザと、ロールを作成してください。

また、「前提」に記載した通りキーペア認証については設定済みとなります。

データファイルのステージ

SnowSQLのPUTコマンドを利用して、テーブルステージに先程用意したregion.csvファイルをPUTします。

foo_bar#(no warehouse)@OOTAKA_SANDBOX_DB.PUBLIC>PUT file://./region.csv @%region;
region.csv_c.gz(0.00MB): [##########] 100.00% Done (0.079s, 0.00MB/s).          
╒════════════╤═══════════════╤═════════════╤═════════════╤════════════════════╤════════════════════╤══════════╤═════════╕
│ source     │ target        │ source_size │ target_size │ source_compression │ target_compression │ status   │ message │
╞════════════╪═══════════════╪═════════════╪═════════════╪════════════════════╪════════════════════╪══════════╪═════════╡
│ region.csv │ region.csv.gz │         179 │         135 │ NONE               │ GZIP               │ UPLOADED │         │
╘════════════╧═══════════════╧═════════════╧═════════════╧════════════════════╧════════════════════╧══════════╧═════════╛
1 Row(s) produced. Time Elapsed: 2.332s

この結果のtargetに注意します。ファイルが自動圧縮されているのでregion.csv.gzに変わっているのがわかります。このファイルは後でPythonコードで指定するので覚えておきます。

Snowpipe REST APIを利用してデータロード

以上で準備ができたので、PythonコードからREST APIを利用してロードをしてみます。

今回はPythonコードはドキュメントのサンプルプログラムをそのまま利用してみます。

また、ドキュメントで記載されている通り、各プレースホルダーの値は実際の値に置き換える必要があるので、適切な値に置き換えます。file_listの値に指定するファイル名は、先程控えておいたファイル名region.csv.gzを指定することに注意します。

ingest_file.py

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
  return 'foobar'

with open("/Users/ootaka.daisuke/.snowsql/keys/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  get_private_key_passphrase().encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
file_list=['region.csv.gz']
ingest_manager = SimpleIngestManager(account='foobar',
                                     host='foobar.ap-northeast-1.aws.snowflakecomputing.com',
                                     user='foo_bar',
                                     pipe='ootaka_sandbox_db.public.rest_api_pipe',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = []
for file_name in file_list:
    staged_file_list.append(StagedFile(file_name, None))

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)

このPythonコードを実行してみます。

$ pipenv run python ingest_file.py

History scan report: 

{'files': [], 'startTimeInclusive': '2021-03-17T12:53:04.979Z', 'endTimeExclusive': '2021-03-17T13:53:05.106Z', 'rangeStartTime': '2021-03-17T12:53:04.979Z', 'rangeEndTime': '2021-03-17T13:53:05.106Z', 'pipe': 'ootaka_sandbox_db.public.rest_api_pipe', 'completeResult': 'true'}

History scan report: 

{'files': [{'path': 'region.csv.gz', 'stageLocation': 'tables/773095533178/', 'fileSize': 144, 'timeReceived': '2021-03-17T13:52:44.652Z', 'lastInsertTime': '2021-03-17T13:53:17.008Z', 'rowsInserted': 5, 'rowsParsed': 5, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'startTimeInclusive': '2021-03-17T12:53:25.271Z', 'endTimeExclusive': '2021-03-17T13:53:25.387Z', 'rangeStartTime': '2021-03-17T13:53:17.008Z', 'rangeEndTime': '2021-03-17T13:53:17.008Z', 'pipe': 'ootaka_sandbox_db.public.rest_api_pipe', 'completeResult': 'true'}
Ingest Report:

{'pipe': 'OOTAKA_SANDBOX_DB.PUBLIC.REST_API_PIPE', 'completeResult': True, 'nextBeginMark': '1_0', 'files': [{'path': 'region.csv.gz', 'stageLocation': 'tables/773095533178/', 'fileSize': 144, 'timeReceived': '2021-03-17T13:52:44.652Z', 'lastInsertTime': '2021-03-17T13:53:17.029Z', 'rowsInserted': 5, 'rowsParsed': 5, 'errorsSeen': 0, 'errorLimit': 1, 'complete': True, 'status': 'LOADED'}], 'statistics': {'activeFilesCount': 0}}

無事にロードされました!念の為、テーブルも確かめてみます。

foo_bar#X_SMALL_WH@OOTAKA_SANDBOX_DB.PUBLIC>SELECT * FROM region;
╒═════════════╤═════════════╤═════════════════════════╕                         
│ R_REGIONKEY │ R_NAME      │ R_COMMENT               │
╞═════════════╪═════════════╪═════════════════════════╡
│           0 │ AFRICA      │ This is the AFRICA      │
├─────────────┼─────────────┼─────────────────────────┤
│           1 │ AMERICA     │ This is the AMERICA     │
├─────────────┼─────────────┼─────────────────────────┤
│           2 │ ASIA        │ This is the ASIA        │
├─────────────┼─────────────┼─────────────────────────┤
│           3 │ EUROPE      │ This is the EUROPE      │
├─────────────┼─────────────┼─────────────────────────┤
│           4 │ MIDDLE EAST │ This is the MIDDLE EAST │
╘═════════════╧═════════════╧═════════════════════════╛
5 Row(s) produced. Time Elapsed: 0.571s

問題ありませんね。

後片付け

不要になったものを削除しておきます。

USE DATABASE ootaka_sandbox_db;
USE SCHEMA public;

DROP TABLE region;
DROP PIPE rest_api_pipe;

まとめ

以上、Snowpipe REST APIを利用して内部ステージからデータロードをしてみました。

記事中では、まるでスッとできたかのように書いていますが、いくつか はまった点としては以下がありました。

  • PIPE作成時のFILE_FORMAT
    • 最初に明示的に記載するのを忘れており、エラーを何度も出しました。
  • ロードするcsvファイル
    • ファイルフォーマットに問題があり修正しました。
  • ingest_manager.ingest_filesに渡すファイル名
    • 何を指定したらよいのか、何度も迷いました。
    • 結論としては、 LIST @%region; のようにして実行した、LISTコマンドで表示されたfileの値を指定すれば問題ありませんでした。

2021/03/18時点では、外部ステージの場合は取り込みの自動化(AUTO INGEST)ができますが、内部ステージでは自動化ができないので、Snowpipe REST APIを活用することで自動化することも出来そうですね。

どなたかのお役に立てば幸いです。それでは!