[新機能]SPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせしてみた
さがらです。
先月、SPCS(Snowpark Container Services)上で実行されるOpenflow Snowflake Deploymentsがパブリックプレビューとなりました。(9月17日のリリースでしたが、実際にAWSの日本リージョンで利用できるようになったのは9月29日頃と認識しています。)
これまでのOpenflowはAWS上にデプロイする方式しかなかったのですが、EKS含む多くのAWSリソースがデプロイされるため、コストや運用面に課題がありました。
今回Snowpark Container Services上にデプロイできる方式となったことにより、今まで以上にマネージドサービスとしてOpenflowを使うことができるようになりました。
このSPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせしてみたので、その内容について本記事でまとめてみます。
検証内容
基本的に以下のQuickstartの内容とSlackコネクタのドキュメントに沿って、実施していきます。
検証するSnowflakeアカウントは、有償契約しているSnowflakeのAWS東京リージョンのEnterpriseエディションです。
Slack側の準備:Slackアプリの作成とインストール
先に事前準備として、Slackアプリを作成しインストールします。
Slackアプリの作成
ブラウザでSlackのYour Appsのページを開き、右上のCreate New App
を押します。
From a manifest
を選択します。
アプリをデプロイしたいワークスペースを選択します。
以下のコードをコピーして貼り付けます。(EXAMPLE_NAME_CHANGE_THIS
の箇所は変更してください。)
{
"display_information": {
"name": "EXAMPLE_NAME_CHANGE_THIS"
},
"features": {
"bot_user": {
"display_name": "EXAMPLE_NAME_CHANGE_THIS",
"always_online": false
}
},
"oauth_config": {
"scopes": {
"bot": [
"channels:history",
"channels:read",
"groups:history",
"groups:read",
"im:history",
"im:read",
"mpim:history",
"mpim:read",
"users.profile:read",
"users:read",
"users:read.email",
"files:read",
"app_mentions:read",
"reactions:read"
]
}
},
"settings": {
"event_subscriptions": {
"bot_events": [
"message.channels",
"message.groups",
"message.im",
"message.mpim",
"reaction_added",
"reaction_removed",
"file_created",
"file_deleted",
"file_change"
]
},
"interactivity": {
"is_enabled": true
},
"org_deploy_enabled": false,
"socket_mode_enabled": true,
"token_rotation_enabled": false
}
}
貼り付けると下図のようになるため、Next
を押します。
Create
を押します。
すると下図のように表示されます。これでアプリの作成は完了です。
App-Level Tokensの生成
次に、App-Level Tokensの生成を行います。
作成したアプリのBasic Information
の中に、App-Level Tokens
という項目があるので、その項目の中でGenerate Token and Scopes
を押します。
スコープとしてconnections:write
を追加して、Generate
を押します。
トークンが生成されますので、控えておきます。
作成したSlackアプリをインストール
作成したSlackアプリを対象のワークスペースにインストールします。
作成したSlackアプリの画面で、左のメニューからInstall App
を押し、Install to <ワークスペース名>
を押します。
下図のようにOAuthの認証画面が表示されるため、Allow
を押します。
OAuth Tokens
が表示されるので、控えておきます。
作成したアプリをデータをロードしたいチャンネルに追加
対象のSlackワークスペースでデータをロードしたいチャンネルを開き、/invite @<作成したアプリ名>
とコマンドを実行します。
これにより、作成したSlackアプリがチャンネルに追加されました。
Snowflake側の準備
次にSnowflake側の準備をしていきます。
Openflow管理者用のロールを作成
まず、Openflow管理者用のロールを作成します。
-- Create the Openflow admin role (requires ACCOUNTADMIN or equivalent privileges)
USE ROLE ACCOUNTADMIN;
CREATE ROLE IF NOT EXISTS OPENFLOW_ADMIN;
-- Grant necessary privileges
GRANT CREATE DATABASE ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
-- Grant role to current user and ACCOUNTADMIN
GRANT ROLE OPENFLOW_ADMIN TO ROLE ACCOUNTADMIN;
GRANT ROLE OPENFLOW_ADMIN TO USER <ユーザー名>;
Termsに同意
Openflowの各コネクタを利用するには利用規約に同意する必要があるため、Admin
のTerms
から、Openflow Terms
とConnector Terms
に同意します。
同意後、下図のように表示されていれば問題ありません。
2025_06
を有効化 ※必要に応じて
バンドル最後に必要に応じて、バンドル2025_06
を有効化します。(今回Slackコネクタを利用するためです。)
-- Check the bundle status
USE ROLE ACCOUNTADMIN;
CALL SYSTEM$BEHAVIOR_CHANGE_BUNDLE_STATUS('2025_06');
-- Enable BCR Bundle 2025_06
CALL SYSTEM$ENABLE_BEHAVIOR_CHANGE_BUNDLE('2025_06');
Openflowのデプロイメントの作成
次に、Openflowのデプロイメントを作成していきます。
まず、Snowsightで使用するロールが作成したOPENFLOW_ADMIN
であることを確認します。
SnowsightのメニューからOpenflow
を押し、右上のLaunch Openflow
を押します。
ユーザー認証後、以下の画面となります。右上のロールがOPENFLOW_ADMIN
であることを確認してから作業しましょう。
Create a deployment
を押します。
右下のNext
を押します。
Deployment location
はSnowflake
、Name
はQUICKSTART_DEPLOYMENT
として、Next
を押します。
デプロイメントに対する権限を付与できる画面となりますが、ここでは何も設定せず、右下のCreate deployment
を押します。
下図のような画面となるので、15分~20分ほど待ちます。(私の場合は15分かかりました。)
一定時間経過後、下図のようにSTATE
列がActive
となっていれば、Openflowのデプロイメントは作成完了です!
ランタイム用のロールやリソースを作成
次に、Openflowのランタイム用のロールやリソースを作成します。
ランタイムは、実際にOpenflow用のコネクタをデプロイして動かす環境となります。(公式Docから引用した下図がわかりやすいです。)
まず、以下のクエリを実行してランタイム用のロールやリソースを作成します。(ウェアハウスのサイズと自動停止までの時間をQuickstartのクエリから変更しています。)
-- Create runtime role
USE ROLE ACCOUNTADMIN;
CREATE ROLE IF NOT EXISTS QUICKSTART_ROLE;
-- Create database for Openflow resources
CREATE DATABASE IF NOT EXISTS QUICKSTART_DATABASE;
-- Create warehouse for data processing
CREATE WAREHOUSE IF NOT EXISTS QUICKSTART_WH
WAREHOUSE_SIZE = XSMALL
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE;
-- Grant privileges to runtime role
GRANT USAGE ON DATABASE QUICKSTART_DATABASE TO ROLE QUICKSTART_ROLE;
GRANT USAGE ON WAREHOUSE QUICKSTART_WH TO ROLE QUICKSTART_ROLE;
-- Grant runtime role to Openflow admin
GRANT ROLE QUICKSTART_ROLE TO ROLE OPENFLOW_ADMIN;
次に、Slack APIにアクセスするためのExternal Access Integrationを作成します。
-- Create schema for network rules
USE ROLE ACCOUNTADMIN;
CREATE SCHEMA IF NOT EXISTS QUICKSTART_DATABASE.NETWORKS;
-- Create network rule for Slack
CREATE OR REPLACE NETWORK RULE slack_api_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('slack.com', 'wss.slack.com');
-- Create external access integration with network rules
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION quickstart_access
ALLOWED_NETWORK_RULES = (slack_api_network_rule)
ENABLED = TRUE;
-- Grant usage to runtime role
GRANT USAGE ON INTEGRATION quickstart_access TO ROLE QUICKSTART_ROLE;
ランタイムを作成
先程作成したロールやリソースを用いて、ランタイムを作成していきます。
Openflowの画面で、Runtimes
タブに切り替えて、右上のCreate runtime
を押します。
ランタイムの作成画面が表示されるため、下図のように設定します。設定後、右下のCreate
を押します。
すると、下図のように表示されます。ランタイムの作成も5~10分必要です。(私の場合は4分かかりました。)
一定時間経過後、下図のようにSTATE
列がActive
となっていれば、ランタイムの作成は完了です!
作成したランタイムの名前をクリックすると、実際にコネクタの追加を行えるキャンバスの画面が開きます。
Slackコネクタの設定
Slackコネクタの設定を行っていきます。
まず、以下のクエリを実行して送信先となるデータベース・スキーマを作成し、必要な権限もQuickstart用のロールに付与します。(CREATE STAGE
とCREATE SEQUENCE
とCREATE CORTEX SEARCH SERVICE
の権限も付与しています。)
USE ROLE SYSADMIN;
CREATE DATABASE DESTINATION_DB;
CREATE SCHEMA DESTINATION_DB.DESTINATION_SCHEMA;
GRANT USAGE ON DATABASE DESTINATION_DB TO ROLE QUICKSTART_ROLE;
GRANT USAGE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE TABLE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE STAGE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE SEQUENCE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE CORTEX SEARCH SERVICE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
次に、Openflowのトップページに移動し、Featured connectors
からview more connectors
を押します。
Slackでは2種類ありますが、Openflowならではの機能を使いたいので、Slack(Cortex connect)
のAdd to runtime
を押します。
下図の画面となるため、作成したランタイムを設定して、Add
を押します。
するとランタイムのキャンバス画面が立ち上がり、Slack(Cortex connect)
が追加されていることがわかります。(この追加されたオブジェクトは、Openflow用語で「process group」と呼ぶようです。)
次に、追加されたprocess groupを右クリックし、Parameters
を押します。
下図のような画面となるため、Slackコネクタの公式Docの内容に沿って、各種Parameterを入れていきます。
各設定値の横の「・・・」を押して、Override
を押すことで入力することが出来ます。
入力後、右下のApply
を押します。(今回はUpload Interval
を60 secondsに設定しました。)
下図のようにパラメータのアップデートを確認する画面となります。右下のClose
を押します。
Slackのデータを同期してみる
対象のprocess groupに対して右クリックして、Enable all controller services
を押します。
そのあと、Start
を押します。
今回は60秒ごとに同期される設定としていたため、一定時間経過した後に正しくデータがロードされているか確認してみます。
一定時間経過すると、process groupのRead/Write
のスキャン量が増えているのがわかります。
対象のprocess groupに対して右クリックして、View status history
を押すと、どれだけの容量が読み込まれたかなどのメトリクスを確認できます。
この上で、実際にデータを確認してみます。
まず送信先として設定したスキーマを見ると、下図のようにテーブルなどのオブジェクトが作成されていることがわかります。
実際にテーブルを見ると、Slackのメッセージがデータとしてロードされていることがわかります。
おまけ:エラー発生時の対処方法
実際にエラーが起きていると、process groupの右上が赤く表示され、マウスオーバーするとエラーメッセージが表示されます。
このエラーの原因調査ですが、対象のprocess groupをダブルクリックするとより細かくどのような処理が行われているかわかります。今回自分が試した際は、Create Stage
とCreate Doc ID Sequence
でエラーが出ていたので、「CREATE STAGE
とCREATE SEQUENCE
の権限が足りないのかも?」と予想し、権限を付与したらエラーが解消されました。
Snowflake Intelligenceから問い合わせしてみる
また、Cortex Search Serviceも自動で作成されているため、Snowflake-managed MCP ServerやSnowflake Intelligence経由でSlackのデータに対して問い合わせを行える仕組みが簡単に構築できます!
※以下は参考までに、Snowflake Intelligence用のロールに作成されたCortex Search Serviceの権限を付与して、Snowflake Intelligenceから自然言語で問い合わせしてみた例です。(Snowflake Intelligenceについてはこちらのブログをご覧ください。)
- 作成されたCortex Search Serviceの権限をSnowflake Intelligence用のロールに付与
GRANT USAGE ON DATABASE DESTINATION_DB TO ROLE SNOWFLAKE_INTELLIGENCE_ADMIN_RL;
GRANT USAGE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE SNOWFLAKE_INTELLIGENCE_ADMIN_RL;
GRANT USAGE ON CORTEX SEARCH SERVICE DESTINATION_DB.DESTINATION_SCHEMA.SLACK_CORTEX TO ROLE SNOWFLAKE_INTELLIGENCE_ADMIN_RL;
- Cortex Search Serviceをエージェントに追加(設定値が適切ではないかもしれないためご注意ください。)
- 実際にSnowflake Intelligenceで問い合わせしてみた結果
ちゃんと、Slackのメッセージ内容を元に回答していることがわかりますね!これは凄い!
Slackで追加メッセージを作成し、自動で同期されるかを確認してみる
※こちらはエラーが起きてうまく行きませんでした…
続いて、Slackアプリを追加しているチャンネルで下図のように追加の投稿を行って、自動で同期されるかを確認してみます。
この後でprocess groupをダブルクリックすると、Ingest Messagesのprocess group
の中のIngest Messages Into Snowflake
とLog Failures
でエラーとなりました。
エラーメッセージは下記のようなものでした。このエラーがわからず、ここで検証を一旦終えることにしました…
Ingest Messages Into Snowflake
07:53:33 GMT
All Nodes
ERROR
PutSnowpipeStreaming[id=5bb92c9c-64b7-39d0-9520-e980ef038888] Failed to flush Channels for StandardFlowFileRecord[uuid=dcdf9349-a8ae-478b-8f19-cbc48914f3db,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1760425882818-2, container=repo1, section=2], offset=106084, length=876],offset=0,name=73eb0ff9-0bf1-4fff-b86c-caf9536589d4,size=876]: org.apache.nifi.processor.exception.ProcessException: Failed to flush rows to Invalid Ingest Channels [DESTINATION_DB.DESTINATION_SCHEMA.SLACK_MESSAGES.OPENFLOW.QUICKSTARTRUNTIME2-0.0]
Log Failuers
LogAttribute[id=26a93f83-4ff1-318e-b735-5dbd0773705c] logging for flow file StandardFlowFileRecord[uuid=dcdf9349-a8ae-478b-8f19-cbc48914f3db,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1760425882818-2, container=repo1, section=2], offset=106084, length=876],offset=0,name=73eb0ff9-0bf1-4fff-b86c-caf9536589d4,size=876]
--------------------------------------------------
FlowFile Properties
Key: 'entryDate'
Value: 'Tue Oct 14 07:52:33 GMT 2025'
Key: 'lineageStartDate'
Value: 'Tue Oct 14 07:52:33 GMT 2025'
Key: 'fileSize'
Value: '876'
FlowFile Attribute Map Content
Key: 'QueryRecord.Route'
Value: 'messages'
Key: 'filename'
Value: '73eb0ff9-0bf1-4fff-b86c-caf9536589d4'
Key: 'merge.bin.age'
Value: '60043'
Key: 'merge.completion.reason'
Value: 'Bin has reached Max Bin Age'
Key: 'merge.count'
Value: '1'
Key: 'mime.type'
Value: 'application/json'
Key: 'path'
Value: './'
Key: 'record.count'
Value: '1'
Key: 'slack.channel.id'
Value: 'C03TL0W1VDF'
Key: 'slack.channel.name'
Value: 'random'
Key: 'slack.message.count'
Value: '1'
Key: 'uuid'
Value: 'dcdf9349-a8ae-478b-8f19-cbc48914f3db'
また、このエラーログが出てから一定時間経過すると、データロードに成功していないのにエラー表示が下図のように消えていました。データロードに成功していないのにエラーが解消されると困ってしまうため、このエラー周りは改善されることを期待したいです!
おまけ:process groupの削除方法
process groupの削除方法ですが、以下のような手順となります。
1.Stop
を押して一時停止
2.Disable all controller services
を押して無効化
3.Delete
を押して削除
お片付け
ランタイムとデプロイメントの削除
下図のように、ランタイムとデプロイメントをそれぞれ削除します。
- ランタイム
- デプロイメント(ランタイムの削除後でないと、削除できません)
各種Snowflakeリソースの削除
以下のクエリを実行して、各種Snowflakeリソースを削除します。
-- Switch to ACCOUNTADMIN
USE ROLE ACCOUNTADMIN;
-- Drop external access integration
DROP INTEGRATION IF EXISTS quickstart_access;
-- Drop network rules
DROP NETWORK RULE IF EXISTS slack_api_network_rule;
-- Drop warehouse
DROP WAREHOUSE IF EXISTS QUICKSTART_WH;
-- Drop database
DROP DATABASE IF EXISTS QUICKSTART_DATABASE;
DROP DATABASE IF EXISTS DESTINATION_DB;
-- Drop role
DROP ROLE IF EXISTS QUICKSTART_ROLE;
DROP ROLE IF EXISTS OPENFLOW_ADMIN;
参考:SPCS版のOpenflowのコストについて
SPCS版のOpenflowはとても簡単に利用できますが、やはり気になるのはコスト面だと思います。
コストについてはSnowflake公式からもドキュメントが出ており、こちらが参考になると思います。コストの基本的な考えとしては、「常時稼働するコントロールプールとして使っているコンピュートプールの費用がベースとなり、ランタイムに指定したコンピュートプールが稼働した分の費用と、ストレージやSnowpipeなどのSnowflakeの機能の利用料も発生する」という考えになります。
最後に
SPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせてみました。
データを追加した際の2回目のロードがうまくいかなかったりエラーの通知が消えてしまったことなどは気になったのですが、Cortex Search Serviceまで自動で作成してくれるため、すぐにロードしたデータに対してSnowflake-managed MCP ServerやSnowflake Intelligence経由で問い合わせできるのは素晴らしいなと感じました!
まだパブリックプレビュー段階のため、今後のアップデートに期待しています!