[新機能]SPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせしてみた

[新機能]SPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせしてみた

2025.10.14

さがらです。

先月、SPCS(Snowpark Container Services)上で実行されるOpenflow Snowflake Deploymentsがパブリックプレビューとなりました。(9月17日のリリースでしたが、実際にAWSの日本リージョンで利用できるようになったのは9月29日頃と認識しています。)

https://docs.snowflake.com/en/release-notes/openflow#sep-17-2025-snowflake-openflow-snowflake-deployments-preview

これまでのOpenflowはAWS上にデプロイする方式しかなかったのですが、EKS含む多くのAWSリソースがデプロイされるため、コストや運用面に課題がありました。

今回Snowpark Container Services上にデプロイできる方式となったことにより、今まで以上にマネージドサービスとしてOpenflowを使うことができるようになりました。

このSPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせしてみたので、その内容について本記事でまとめてみます。

検証内容

基本的に以下のQuickstartの内容とSlackコネクタのドキュメントに沿って、実施していきます。

https://quickstarts.snowflake.com/guide/getting_started_with_Openflow_spcs/index.html?index=..%2F..index#0

https://docs.snowflake.com/en/user-guide/data-integration/openflow/connectors/slack/setup

検証するSnowflakeアカウントは、有償契約しているSnowflakeのAWS東京リージョンのEnterpriseエディションです。

Slack側の準備:Slackアプリの作成とインストール

先に事前準備として、Slackアプリを作成しインストールします。

Slackアプリの作成

ブラウザでSlackのYour Appsのページを開き、右上のCreate New Appを押します。

2025-10-14_10h47_40

From a manifestを選択します。

2025-10-14_10h48_06

アプリをデプロイしたいワークスペースを選択します。

2025-10-14_10h48_43

以下のコードをコピーして貼り付けます。(EXAMPLE_NAME_CHANGE_THISの箇所は変更してください。

			
			{
    "display_information": {
        "name": "EXAMPLE_NAME_CHANGE_THIS"
    },
    "features": {
        "bot_user": {
            "display_name": "EXAMPLE_NAME_CHANGE_THIS",
            "always_online": false
        }
    },
    "oauth_config": {
        "scopes": {
            "bot": [
                "channels:history",
                "channels:read",
                "groups:history",
                "groups:read",
                "im:history",
                "im:read",
                "mpim:history",
                "mpim:read",
                "users.profile:read",
                "users:read",
                "users:read.email",
                "files:read",
                "app_mentions:read",
                "reactions:read"
            ]
        }
    },
    "settings": {
        "event_subscriptions": {
            "bot_events": [
                "message.channels",
                "message.groups",
                "message.im",
                "message.mpim",
                "reaction_added",
                "reaction_removed",
                "file_created",
                "file_deleted",
                "file_change"
            ]
        },
        "interactivity": {
            "is_enabled": true
        },
        "org_deploy_enabled": false,
        "socket_mode_enabled": true,
        "token_rotation_enabled": false
    }
}

		

貼り付けると下図のようになるため、Nextを押します。

2025-10-14_10h51_06

Createを押します。

2025-10-14_10h52_05

すると下図のように表示されます。これでアプリの作成は完了です。

2025-10-14_10h54_41

App-Level Tokensの生成

次に、App-Level Tokensの生成を行います。

作成したアプリのBasic Informationの中に、App-Level Tokensという項目があるので、その項目の中でGenerate Token and Scopesを押します。

2025-10-14_10h55_54

スコープとしてconnections:writeを追加して、Generateを押します。

2025-10-14_10h59_33

トークンが生成されますので、控えておきます。

作成したSlackアプリをインストール

作成したSlackアプリを対象のワークスペースにインストールします。

作成したSlackアプリの画面で、左のメニューからInstall Appを押し、Install to <ワークスペース名>を押します。

2025-10-14_11h04_38

下図のようにOAuthの認証画面が表示されるため、Allowを押します。

2025-10-14_11h07_05

OAuth Tokensが表示されるので、控えておきます。

2025-10-14_11h07_50

作成したアプリをデータをロードしたいチャンネルに追加

対象のSlackワークスペースでデータをロードしたいチャンネルを開き、/invite @<作成したアプリ名>とコマンドを実行します。

2025-10-14_11h24_37

これにより、作成したSlackアプリがチャンネルに追加されました。

2025-10-14_11h25_11

Snowflake側の準備

次にSnowflake側の準備をしていきます。

https://quickstarts.snowflake.com/guide/getting_started_with_Openflow_spcs/index.html?index=..%2F..index#1

Openflow管理者用のロールを作成

まず、Openflow管理者用のロールを作成します。

			
			-- Create the Openflow admin role (requires ACCOUNTADMIN or equivalent privileges)
USE ROLE ACCOUNTADMIN;
CREATE ROLE IF NOT EXISTS OPENFLOW_ADMIN;

-- Grant necessary privileges
GRANT CREATE DATABASE ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE OPENFLOW_ADMIN;

-- Grant role to current user and ACCOUNTADMIN
GRANT ROLE OPENFLOW_ADMIN TO ROLE ACCOUNTADMIN;
GRANT ROLE OPENFLOW_ADMIN TO USER <ユーザー名>;

		

Termsに同意

Openflowの各コネクタを利用するには利用規約に同意する必要があるため、AdminTermsから、Openflow TermsConnector Termsに同意します。

2025-10-14_14h45_57

2025-10-14_14h47_47

同意後、下図のように表示されていれば問題ありません。

2025-10-14_14h48_25

バンドル2025_06を有効化 ※必要に応じて

最後に必要に応じて、バンドル2025_06を有効化します。(今回Slackコネクタを利用するためです。)

			
			-- Check the bundle status
USE ROLE ACCOUNTADMIN;
CALL SYSTEM$BEHAVIOR_CHANGE_BUNDLE_STATUS('2025_06');

-- Enable BCR Bundle 2025_06
CALL SYSTEM$ENABLE_BEHAVIOR_CHANGE_BUNDLE('2025_06');

		

Openflowのデプロイメントの作成

次に、Openflowのデプロイメントを作成していきます。

https://quickstarts.snowflake.com/guide/getting_started_with_Openflow_spcs/index.html?index=..%2F..index#2

まず、Snowsightで使用するロールが作成したOPENFLOW_ADMINであることを確認します。

2025-10-14_13h45_44

SnowsightのメニューからOpenflowを押し、右上のLaunch Openflowを押します。

2025-10-14_13h48_34

2025-10-14_13h49_52

ユーザー認証後、以下の画面となります。右上のロールがOPENFLOW_ADMINであることを確認してから作業しましょう。

Create a deploymentを押します。

2025-10-14_13h52_36

右下のNextを押します。

2025-10-14_13h53_55

Deployment locationSnowflakeNameQUICKSTART_DEPLOYMENTとして、Nextを押します。

2025-10-14_13h55_26

デプロイメントに対する権限を付与できる画面となりますが、ここでは何も設定せず、右下のCreate deploymentを押します。

2025-10-14_13h57_54

下図のような画面となるので、15分~20分ほど待ちます。(私の場合は15分かかりました。)

2025-10-14_13h58_23

一定時間経過後、下図のようにSTATE列がActiveとなっていれば、Openflowのデプロイメントは作成完了です!

2025-10-14_14h14_14

ランタイム用のロールやリソースを作成

次に、Openflowのランタイム用のロールやリソースを作成します。

https://quickstarts.snowflake.com/guide/getting_started_with_Openflow_spcs/index.html?index=..%2F..index#3

ランタイムは、実際にOpenflow用のコネクタをデプロイして動かす環境となります。(公式Docから引用した下図がわかりやすいです。)

openflow-architecture

まず、以下のクエリを実行してランタイム用のロールやリソースを作成します。(ウェアハウスのサイズと自動停止までの時間をQuickstartのクエリから変更しています。)

			
			-- Create runtime role
USE ROLE ACCOUNTADMIN;
CREATE ROLE IF NOT EXISTS QUICKSTART_ROLE;

-- Create database for Openflow resources
CREATE DATABASE IF NOT EXISTS QUICKSTART_DATABASE;

-- Create warehouse for data processing
CREATE WAREHOUSE IF NOT EXISTS QUICKSTART_WH
  WAREHOUSE_SIZE = XSMALL
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE;

-- Grant privileges to runtime role
GRANT USAGE ON DATABASE QUICKSTART_DATABASE TO ROLE QUICKSTART_ROLE;
GRANT USAGE ON WAREHOUSE QUICKSTART_WH TO ROLE QUICKSTART_ROLE;

-- Grant runtime role to Openflow admin
GRANT ROLE QUICKSTART_ROLE TO ROLE OPENFLOW_ADMIN;

		

次に、Slack APIにアクセスするためのExternal Access Integrationを作成します。

			
			-- Create schema for network rules
USE ROLE ACCOUNTADMIN;
CREATE SCHEMA IF NOT EXISTS QUICKSTART_DATABASE.NETWORKS;

-- Create network rule for Slack
CREATE OR REPLACE NETWORK RULE slack_api_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('slack.com', 'wss.slack.com');

-- Create external access integration with network rules
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION quickstart_access
  ALLOWED_NETWORK_RULES = (slack_api_network_rule)
  ENABLED = TRUE;

-- Grant usage to runtime role
GRANT USAGE ON INTEGRATION quickstart_access TO ROLE QUICKSTART_ROLE;

		

ランタイムを作成

先程作成したロールやリソースを用いて、ランタイムを作成していきます。

Openflowの画面で、Runtimesタブに切り替えて、右上のCreate runtimeを押します。

2025-10-14_14h17_06

ランタイムの作成画面が表示されるため、下図のように設定します。設定後、右下のCreateを押します。

2025-10-14_14h18_33

2025-10-14_14h19_12

すると、下図のように表示されます。ランタイムの作成も5~10分必要です。(私の場合は4分かかりました。)

2025-10-14_14h20_30

一定時間経過後、下図のようにSTATE列がActiveとなっていれば、ランタイムの作成は完了です!

2025-10-14_14h24_56

作成したランタイムの名前をクリックすると、実際にコネクタの追加を行えるキャンバスの画面が開きます。

2025-10-14_14h35_02

2025-10-14_14h36_02

Slackコネクタの設定

Slackコネクタの設定を行っていきます。

https://docs.snowflake.com/en/user-guide/data-integration/openflow/connectors/slack/setup

まず、以下のクエリを実行して送信先となるデータベース・スキーマを作成し、必要な権限もQuickstart用のロールに付与します。(CREATE STAGECREATE SEQUENCECREATE CORTEX SEARCH SERVICEの権限も付与しています。)

			
			USE ROLE SYSADMIN;

CREATE DATABASE DESTINATION_DB;
CREATE SCHEMA DESTINATION_DB.DESTINATION_SCHEMA;
GRANT USAGE ON DATABASE DESTINATION_DB TO ROLE QUICKSTART_ROLE;
GRANT USAGE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE TABLE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE STAGE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE SEQUENCE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;
GRANT CREATE CORTEX SEARCH SERVICE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE QUICKSTART_ROLE;

		

次に、Openflowのトップページに移動し、Featured connectorsからview more connectorsを押します。

2025-10-14_14h41_22

Slackでは2種類ありますが、Openflowならではの機能を使いたいので、Slack(Cortex connect)Add to runtimeを押します。

2025-10-14_14h52_26

下図の画面となるため、作成したランタイムを設定して、Addを押します。

2025-10-14_14h53_29

するとランタイムのキャンバス画面が立ち上がり、Slack(Cortex connect)が追加されていることがわかります。(この追加されたオブジェクトは、Openflow用語で「process group」と呼ぶようです。)

2025-10-14_14h54_50

次に、追加されたprocess groupを右クリックし、Parametersを押します。

2025-10-14_14h56_32

下図のような画面となるため、Slackコネクタの公式Docの内容に沿って、各種Parameterを入れていきます。

各設定値の横の「・・・」を押して、Overrideを押すことで入力することが出来ます。

2025-10-14_14h59_52

2025-10-14_15h00_22

入力後、右下のApplyを押します。(今回はUpload Intervalを60 secondsに設定しました。)

2025-10-14_15h05_07

下図のようにパラメータのアップデートを確認する画面となります。右下のCloseを押します。

2025-10-14_15h06_12

Slackのデータを同期してみる

対象のprocess groupに対して右クリックして、Enable all controller servicesを押します。

2025-10-14_15h12_47

そのあと、Startを押します。

2025-10-14_16h13_40

今回は60秒ごとに同期される設定としていたため、一定時間経過した後に正しくデータがロードされているか確認してみます。

一定時間経過すると、process groupのRead/Writeのスキャン量が増えているのがわかります。

2025-10-14_15h33_13

対象のprocess groupに対して右クリックして、View status historyを押すと、どれだけの容量が読み込まれたかなどのメトリクスを確認できます。

2025-10-14_15h21_09

2025-10-14_15h23_10

この上で、実際にデータを確認してみます。

まず送信先として設定したスキーマを見ると、下図のようにテーブルなどのオブジェクトが作成されていることがわかります。

2025-10-14_16h25_59

実際にテーブルを見ると、Slackのメッセージがデータとしてロードされていることがわかります。

2025-10-14_16h14_56

おまけ:エラー発生時の対処方法

実際にエラーが起きていると、process groupの右上が赤く表示され、マウスオーバーするとエラーメッセージが表示されます。

2025-10-14_16h16_42

このエラーの原因調査ですが、対象のprocess groupをダブルクリックするとより細かくどのような処理が行われているかわかります。今回自分が試した際は、Create StageCreate Doc ID Sequenceでエラーが出ていたので、「CREATE STAGECREATE SEQUENCEの権限が足りないのかも?」と予想し、権限を付与したらエラーが解消されました。

2025-10-14_16h18_07

Snowflake Intelligenceから問い合わせしてみる

また、Cortex Search Serviceも自動で作成されているため、Snowflake-managed MCP ServerやSnowflake Intelligence経由でSlackのデータに対して問い合わせを行える仕組みが簡単に構築できます!

※以下は参考までに、Snowflake Intelligence用のロールに作成されたCortex Search Serviceの権限を付与して、Snowflake Intelligenceから自然言語で問い合わせしてみた例です。(Snowflake Intelligenceについてはこちらのブログをご覧ください。)

  • 作成されたCortex Search Serviceの権限をSnowflake Intelligence用のロールに付与
			
			GRANT USAGE ON DATABASE DESTINATION_DB TO ROLE SNOWFLAKE_INTELLIGENCE_ADMIN_RL;
GRANT USAGE ON SCHEMA DESTINATION_DB.DESTINATION_SCHEMA TO ROLE SNOWFLAKE_INTELLIGENCE_ADMIN_RL;
GRANT USAGE ON CORTEX SEARCH SERVICE DESTINATION_DB.DESTINATION_SCHEMA.SLACK_CORTEX TO ROLE SNOWFLAKE_INTELLIGENCE_ADMIN_RL;

		
  • Cortex Search Serviceをエージェントに追加(設定値が適切ではないかもしれないためご注意ください。)

2025-10-14_16h43_01

  • 実際にSnowflake Intelligenceで問い合わせしてみた結果

ちゃんと、Slackのメッセージ内容を元に回答していることがわかりますね!これは凄い!

2025-10-14_16h46_15

Slackで追加メッセージを作成し、自動で同期されるかを確認してみる

※こちらはエラーが起きてうまく行きませんでした…

続いて、Slackアプリを追加しているチャンネルで下図のように追加の投稿を行って、自動で同期されるかを確認してみます。

2025-10-14_16h51_21

この後でprocess groupをダブルクリックすると、Ingest Messagesのprocess groupの中のIngest Messages Into SnowflakeLog Failuresでエラーとなりました。

2025-10-14_16h58_14

エラーメッセージは下記のようなものでした。このエラーがわからず、ここで検証を一旦終えることにしました…

  • Ingest Messages Into Snowflake
			
			07:53:33 GMT

All Nodes

ERROR

PutSnowpipeStreaming[id=5bb92c9c-64b7-39d0-9520-e980ef038888] Failed to flush Channels for StandardFlowFileRecord[uuid=dcdf9349-a8ae-478b-8f19-cbc48914f3db,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1760425882818-2, container=repo1, section=2], offset=106084, length=876],offset=0,name=73eb0ff9-0bf1-4fff-b86c-caf9536589d4,size=876]: org.apache.nifi.processor.exception.ProcessException: Failed to flush rows to Invalid Ingest Channels [DESTINATION_DB.DESTINATION_SCHEMA.SLACK_MESSAGES.OPENFLOW.QUICKSTARTRUNTIME2-0.0]

		
  • Log Failuers
			
			LogAttribute[id=26a93f83-4ff1-318e-b735-5dbd0773705c] logging for flow file StandardFlowFileRecord[uuid=dcdf9349-a8ae-478b-8f19-cbc48914f3db,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1760425882818-2, container=repo1, section=2], offset=106084, length=876],offset=0,name=73eb0ff9-0bf1-4fff-b86c-caf9536589d4,size=876]

--------------------------------------------------

FlowFile Properties
Key: 'entryDate'
Value: 'Tue Oct 14 07:52:33 GMT 2025'
Key: 'lineageStartDate'
Value: 'Tue Oct 14 07:52:33 GMT 2025'
Key: 'fileSize'
Value: '876'
FlowFile Attribute Map Content
Key: 'QueryRecord.Route'
Value: 'messages'
Key: 'filename'
Value: '73eb0ff9-0bf1-4fff-b86c-caf9536589d4'
Key: 'merge.bin.age'
Value: '60043'
Key: 'merge.completion.reason'
Value: 'Bin has reached Max Bin Age'
Key: 'merge.count'
Value: '1'
Key: 'mime.type'
Value: 'application/json'
Key: 'path'
Value: './'
Key: 'record.count'
Value: '1'
Key: 'slack.channel.id'
Value: 'C03TL0W1VDF'
Key: 'slack.channel.name'
Value: 'random'
Key: 'slack.message.count'
Value: '1'
Key: 'uuid'
Value: 'dcdf9349-a8ae-478b-8f19-cbc48914f3db'

		

また、このエラーログが出てから一定時間経過すると、データロードに成功していないのにエラー表示が下図のように消えていました。データロードに成功していないのにエラーが解消されると困ってしまうため、このエラー周りは改善されることを期待したいです!

2025-10-14_17h09_55

おまけ:process groupの削除方法

process groupの削除方法ですが、以下のような手順となります。

1.Stopを押して一時停止

2025-10-14_17h19_09

2.Disable all controller servicesを押して無効化

2025-10-14_17h20_06

3.Deleteを押して削除

2025-10-14_17h20_35

お片付け

ランタイムとデプロイメントの削除

下図のように、ランタイムとデプロイメントをそれぞれ削除します。

  • ランタイム

2025-10-14_17h23_23

  • デプロイメント(ランタイムの削除後でないと、削除できません)

2025-10-14_17h24_36

各種Snowflakeリソースの削除

以下のクエリを実行して、各種Snowflakeリソースを削除します。

			
			-- Switch to ACCOUNTADMIN
USE ROLE ACCOUNTADMIN;

-- Drop external access integration
DROP INTEGRATION IF EXISTS quickstart_access;

-- Drop network rules
DROP NETWORK RULE IF EXISTS slack_api_network_rule;

-- Drop warehouse
DROP WAREHOUSE IF EXISTS QUICKSTART_WH;

-- Drop database
DROP DATABASE IF EXISTS QUICKSTART_DATABASE;
DROP DATABASE IF EXISTS DESTINATION_DB;

-- Drop role
DROP ROLE IF EXISTS QUICKSTART_ROLE;
DROP ROLE IF EXISTS OPENFLOW_ADMIN;

		

参考:SPCS版のOpenflowのコストについて

SPCS版のOpenflowはとても簡単に利用できますが、やはり気になるのはコスト面だと思います。

コストについてはSnowflake公式からもドキュメントが出ており、こちらが参考になると思います。コストの基本的な考えとしては、「常時稼働するコントロールプールとして使っているコンピュートプールの費用がベースとなり、ランタイムに指定したコンピュートプールが稼働した分の費用と、ストレージやSnowpipeなどのSnowflakeの機能の利用料も発生する」という考えになります。

https://docs.snowflake.com/en/user-guide/data-integration/openflow/cost-spcs

最後に

SPCS上で実行されるOpenflow Snowflake Deploymentsを用いてSlackのデータをロードしてSnowflake Intelligenceから問い合わせてみました。

データを追加した際の2回目のロードがうまくいかなかったりエラーの通知が消えてしまったことなどは気になったのですが、Cortex Search Serviceまで自動で作成してくれるため、すぐにロードしたデータに対してSnowflake-managed MCP ServerやSnowflake Intelligence経由で問い合わせできるのは素晴らしいなと感じました!

まだパブリックプレビュー段階のため、今後のアップデートに期待しています!

この記事をシェアする

FacebookHatena blogX

関連記事