External Network Accessを用いてSnowflakeのタスクでdbt Cloudのジョブを実行できるようにしてみた

2024.03.08

さがらです。

External Network Accessを用いてSnowflakeのタスクでdbt Cloudのジョブを実行できるようにしてみたので、その内容をまとめてみます。

External Network Accessとは

SnowflakeのExternal Network Accessですが、一言でいうと「Snowflakeから外部ネットワークにセキュアにアクセスできる」機能です。より具体例をもって書くと、External Network Accessがあれば、SnowflakeのUDFやストアドプロシージャから外部のSaaSアプリケーションのAPIを叩く処理が簡単に定義できるようになります。

また、External Network Accessは2024年2月のリリースで一般提供(General Availability)となっているため、安心して本番環境でも使用することができます!

External Network Accessについてはすでに日本語でも多くの情報が出ております。私も今回の検証の際に参考にさせていただきました。

検証内容について

dbt CloudではAPIが用意されており、そのうちのTrigger Job RunというAPIコマンドを用いることで、dbt Cloudで定義済のジョブをトリガーすることができます。

そこで、「前述のExternal Network Accessを用いてdbt CloudのAPIをSnowflakeから叩けるようにし、dbt CloudのAPIを叩くPython UDFをSnowflakeで定義し、Snowflakeのタスクで定義したPython UDFを実行すれば、Snowflakeのタスクで”COPYコマンドによるロード”から”dbt Cloudでのdbtジョブの実行”まですべて管理できるよね?」、という思いから試してみたのが本記事の内容となります。

1. dbt CloudでのService tokensの生成

dbt CloudのAPIを実行するために、dbt Cloud上でService tokensを生成する必要があります。

dbt Cloudの画面右上のAccount settingsから、左のService tokensを押し、Create service tokenを押します。

Token nameを入力し、APIを介してジョブを実行したいdbt projectへのJob Admin権限を付与します。付与後、右下のSaveを押します。

すると、tokenが表示されるのでこれをコピーしておきます。このタイミングでしかtokenを確認できないため注意しましょう。

2. SnowflakeでのNetwork Ruleの作成

Snowflakeから外部ネットワークへのアクセスの許可・制限を行うために、Network Ruleオブジェクトを作成します。

CREATE OR REPLACE NETWORK RULE dbt_cloud_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('cloud.getdbt.com');

3. Snowflakeでdbt CloudのService tokenを「Secret」として保存

先程1番の工程で取得したdbt CloudのService tokenを、SnowflakeのSecretとして保存します。

CREATE OR REPLACE SECRET dbt_cloud_token
  TYPE = GENERIC_STRING
  SECRET_STRING = '<your-access-token>';

4. External Access Integrationの作成

2番で作成したNetwork Ruleと3番で作成したSecretを統合して管理するオブジェクトとしてExternal Access Integrationがあるため、これを定義します。

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbt_cloud_apis_access
  ALLOWED_NETWORK_RULES = (dbt_cloud_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbt_cloud_token)
  ENABLED = true;

5. Python UDFの作成

続いて、実際にdbt Cloudに対してAPIを叩くPython UDFを定義します。

Python UDFでExternal Access Integrationを指定する方法など、各種オプションについてはこちらの記事がとてもわかりやすくまとまっているため、こちらを参考にしてもらえればと思います。

dbt Cloud特有の設定としては、以下2点に注意しました。

  • APIリクエストのURLにdbt CloudのAccount IDとJob IDが必要となるため、UDFの引数としてdbt CloudのAccount IDとJob IDを指定できるようにしました。

  • v_bodys = {"cause": "Triggered by Snowflake Task"}ですが、これはcauseに指定した文字列がdbt Cloudのジョブ履歴として下図のように表示されるようになります。

改めて、以下のコードがPython UDFを定義した際のコードとなります。

CREATE OR REPLACE FUNCTION trigger_dbt_cloud_job(account_id varchar ,job_id varchar)
  RETURNS variant
  LANGUAGE python
  RUNTIME_VERSION = 3.11
  HANDLER = 'main'
  EXTERNAL_ACCESS_INTEGRATIONS = (dbt_cloud_apis_access)
  PACKAGES = ('requests')
  SECRETS = ('cred' = dbt_cloud_token)
AS
$$
import _snowflake
import requests
import json

def main(p_account_id ,p_job_id):
     api_key = _snowflake.get_generic_secret_string('cred')
     url = 'https://cloud.getdbt.com/api/v2/accounts/{a_id}/jobs/{j_id}/run/'.format(a_id=p_account_id, j_id=p_job_id)
     v_headers = {"Authorization": "Token "+ api_key}
     v_bodys = {"cause": "Triggered by Snowflake Task"}
     response = requests.post(url, headers=v_headers, data=v_bodys)
     return response.json()
$$;

6. タスクの定義

Python UDFの作成が終わったので、タスクを定義します。Python UDFを使っている場合は、サーバーレスタスクが使用できない点だけ注意しましょう(私は引っかかりました。)

CREATE OR REPLACE TASK trigger_dbt
    SCHEDULE = 'USING CRON 0 1 1 1 * UTC'
    WAREHOUSE = SAGARA_DBT_WH
    AS
    SELECT
        trigger_dbt_cloud_job('<account_id>','<job_id>') as dbt_cloud_jobs_api_response;

-- タスクの有効化
ALTER TASK trigger_dbt RESUME;

実際にタスクを実行してみる

定義したタスクを実行してみます。

EXECUTE TASK trigger_dbt;

すると、下図のようにタスクが実行され、API経由でdbt Cloudのジョブが実行されました!

最後に

External Network Accessを用いてSnowflakeのタスクでdbt Cloudのジョブを実行できるように、検証してみました。External Network Accessのおかげで、Snowflakeのタスクでデータロードからdbtによる変換のワークフローを管理する道が見えたのではないでしょうか!

Snowpipeを使っている場合でも、ストリームと併せてタスクの設定でWHEN SYSTEM$STREAM_HAS_DATA('<ストリーム名>')を定義することで、加工前のデータに変化があったときにだけdbt Cloudのジョブを実行するということも出来ちゃいます!

あとは補足として、dbt CloudのAPI responseをログとして記録したい場合には、今回は試していませんがEvent Tableが使えると思います!Event Tableについては以下の記事がわかりやすくまとまっているため、参考にしてみてください。