External Network Accessを用いてSnowflakeのタスクでdbt Cloudのジョブを実行できるようにしてみた
さがらです。
External Network Accessを用いてSnowflakeのタスクでdbt Cloudのジョブを実行できるようにしてみたので、その内容をまとめてみます。
External Network Accessとは
SnowflakeのExternal Network Accessですが、一言でいうと「Snowflakeから外部ネットワークにセキュアにアクセスできる」機能です。より具体例をもって書くと、External Network Accessがあれば、SnowflakeのUDFやストアドプロシージャから外部のSaaSアプリケーションのAPIを叩く処理が簡単に定義できるようになります。
また、External Network Accessは2024年2月のリリースで一般提供(General Availability)となっているため、安心して本番環境でも使用することができます!
External Network Accessについてはすでに日本語でも多くの情報が出ております。私も今回の検証の際に参考にさせていただきました。
検証内容について
dbt CloudではAPIが用意されており、そのうちのTrigger Job Run
というAPIコマンドを用いることで、dbt Cloudで定義済のジョブをトリガーすることができます。
そこで、「前述のExternal Network Accessを用いてdbt CloudのAPIをSnowflakeから叩けるようにし、dbt CloudのAPIを叩くPython UDFをSnowflakeで定義し、Snowflakeのタスクで定義したPython UDFを実行すれば、Snowflakeのタスクで”COPYコマンドによるロード”から”dbt Cloudでのdbtジョブの実行”まですべて管理できるよね?」、という思いから試してみたのが本記事の内容となります。
1. dbt CloudでのService tokensの生成
dbt CloudのAPIを実行するために、dbt Cloud上でService tokensを生成する必要があります。
dbt Cloudの画面右上のAccount settings
から、左のService tokens
を押し、Create service token
を押します。
Token name
を入力し、APIを介してジョブを実行したいdbt projectへのJob Admin
権限を付与します。付与後、右下のSaveを押します。
すると、tokenが表示されるのでこれをコピーしておきます。このタイミングでしかtokenを確認できないため注意しましょう。
2. SnowflakeでのNetwork Ruleの作成
Snowflakeから外部ネットワークへのアクセスの許可・制限を行うために、Network Ruleオブジェクトを作成します。
CREATE OR REPLACE NETWORK RULE dbt_cloud_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('cloud.getdbt.com');
3. Snowflakeでdbt CloudのService tokenを「Secret」として保存
先程1番の工程で取得したdbt CloudのService tokenを、SnowflakeのSecretとして保存します。
CREATE OR REPLACE SECRET dbt_cloud_token TYPE = GENERIC_STRING SECRET_STRING = '<your-access-token>';
4. External Access Integrationの作成
2番で作成したNetwork Ruleと3番で作成したSecretを統合して管理するオブジェクトとしてExternal Access Integrationがあるため、これを定義します。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbt_cloud_apis_access ALLOWED_NETWORK_RULES = (dbt_cloud_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbt_cloud_token) ENABLED = true;
5. Python UDFの作成
続いて、実際にdbt Cloudに対してAPIを叩くPython UDFを定義します。
Python UDFでExternal Access Integrationを指定する方法など、各種オプションについてはこちらの記事がとてもわかりやすくまとまっているため、こちらを参考にしてもらえればと思います。
dbt Cloud特有の設定としては、以下2点に注意しました。
- APIリクエストのURLにdbt CloudのAccount IDとJob IDが必要となるため、UDFの引数としてdbt CloudのAccount IDとJob IDを指定できるようにしました。
-
v_bodys = {"cause": "Triggered by Snowflake Task"}
ですが、これはcause
に指定した文字列がdbt Cloudのジョブ履歴として下図のように表示されるようになります。
改めて、以下のコードがPython UDFを定義した際のコードとなります。
CREATE OR REPLACE FUNCTION trigger_dbt_cloud_job(account_id varchar ,job_id varchar) RETURNS variant LANGUAGE python RUNTIME_VERSION = 3.11 HANDLER = 'main' EXTERNAL_ACCESS_INTEGRATIONS = (dbt_cloud_apis_access) PACKAGES = ('requests') SECRETS = ('cred' = dbt_cloud_token) AS $$ import _snowflake import requests import json def main(p_account_id ,p_job_id): api_key = _snowflake.get_generic_secret_string('cred') url = 'https://cloud.getdbt.com/api/v2/accounts/{a_id}/jobs/{j_id}/run/'.format(a_id=p_account_id, j_id=p_job_id) v_headers = {"Authorization": "Token "+ api_key} v_bodys = {"cause": "Triggered by Snowflake Task"} response = requests.post(url, headers=v_headers, data=v_bodys) return response.json() $$;
6. タスクの定義
Python UDFの作成が終わったので、タスクを定義します。Python UDFを使っている場合は、サーバーレスタスクが使用できない点だけ注意しましょう(私は引っかかりました。)
CREATE OR REPLACE TASK trigger_dbt SCHEDULE = 'USING CRON 0 1 1 1 * UTC' WAREHOUSE = SAGARA_DBT_WH AS SELECT trigger_dbt_cloud_job('<account_id>','<job_id>') as dbt_cloud_jobs_api_response; -- タスクの有効化 ALTER TASK trigger_dbt RESUME;
実際にタスクを実行してみる
定義したタスクを実行してみます。
EXECUTE TASK trigger_dbt;
すると、下図のようにタスクが実行され、API経由でdbt Cloudのジョブが実行されました!
最後に
External Network Accessを用いてSnowflakeのタスクでdbt Cloudのジョブを実行できるように、検証してみました。External Network Accessのおかげで、Snowflakeのタスクでデータロードからdbtによる変換のワークフローを管理する道が見えたのではないでしょうか!
Snowpipeを使っている場合でも、ストリームと併せてタスクの設定でWHEN SYSTEM$STREAM_HAS_DATA('<ストリーム名>')
を定義することで、加工前のデータに変化があったときにだけdbt Cloudのジョブを実行するということも出来ちゃいます!
あとは補足として、dbt CloudのAPI responseをログとして記録したい場合には、今回は試していませんがEvent Tableが使えると思います!Event Tableについては以下の記事がわかりやすくまとまっているため、参考にしてみてください。