Cortex Search Service用にPDFをパースしてチャンキングするdbtのIncremental modelを考えてみた

Cortex Search Service用にPDFをパースしてチャンキングするdbtのIncremental modelを考えてみた

2026.02.11

さがらです。

Cortex Search Serviceを使用してRAGを構築するためには、PDFなどのドキュメントをAI_PARSE_DOCUMENT関数などでパースしたテーブルを用意する必要があります。

このテーブルについてdbtで構築する方法を考えてみたので、その内容について本記事でまとめます。

実装したdbt model

早速ですが、実装したdbt modelは以下となります。Snowflake社の事例PDFを置いたS3バケットのディレクトリテーブルに対して、ドキュメントのパースとチャンキングを行っています。

このdbt modelのポイントは以下となります。

  • unique_key='relative_path' + incremental_strategy='delete+insert'でIncremental modelとして構成。同一ファイルパスのデータは削除後に再挿入される
    • 既存のディレクトリテーブルに同一パス・同一またはそれ以降の更新日時のレコードがあればスキップ(未処理 or 更新されたファイルのみ処理されるため、AI_PARSE_DOCUMENTによるパース処理のコストを抑えることが可能です)
  • チャンキングはSNOWFLAKE.CORTEX.SPLIT_TEXT_MARKDOWN_HEADER関数を使用
{{
    config(
        materialized='incremental',
        unique_key='relative_path',
        incremental_strategy='delete+insert'
    )
}}

with directory_files as (

    select
        relative_path,
        last_modified,
        TO_FILE('@RAW_DB.SAMPLE_SOURCE_SNOWPIPE.STAGE_CORPUS_SAMPLE', relative_path) as doc_file
    from
        DIRECTORY(@RAW_DB.SAMPLE_SOURCE_SNOWPIPE.STAGE_CORPUS_SAMPLE) as d
    where
        relative_path like 'snowflake-case-studies/%.pdf'
        {% if is_incremental() %}
        and not exists (
            select 1
            from {{ this }} as t
            where t.relative_path = d.relative_path
              and t.last_modified >= d.last_modified
        )
        {% endif %}

),

parsed as (

    select
        relative_path,
        last_modified,
        SNOWFLAKE.CORTEX.AI_PARSE_DOCUMENT(
            doc_file,
            {'mode': 'LAYOUT', 'page_split': TRUE}
        ) as parsed_result
    from
        directory_files

),

concatenated as (

    select
        p.relative_path,
        p.last_modified,
        LISTAGG(f.value:content::varchar, '\n\n') within group (order by f.index) as full_content
    from
        parsed as p,
        lateral flatten(input => p.parsed_result:pages) as f
    group by
        p.relative_path, p.last_modified

),

chunked as (

    select
        c.relative_path,
        c.last_modified,
        ch.value:headers:header_1::varchar as header_1,
        ch.value:headers:header_2::varchar as header_2,
        ch.value:headers:header_3::varchar as header_3,
        ch.value:chunk::varchar as chunk,
        ch.index as chunk_index
    from
        concatenated as c,
        lateral flatten(input =>
            SNOWFLAKE.CORTEX.SPLIT_TEXT_MARKDOWN_HEADER(
                c.full_content,
                OBJECT_CONSTRUCT('#', 'header_1', '##', 'header_2', '###', 'header_3'),
                10000
            )
        ) as ch

)

select
    SHA2(relative_path || ':' || chunk_index, 256) as doc_chunk_id,
    relative_path,
    SPLIT_PART(relative_path, '/', -1) as file_name,
    REPLACE(REPLACE(SPLIT_PART(relative_path, '/', -1), 'Case_Study_', ''), '.pdf', '') as case_study_name,
    header_1,
    header_2,
    header_3,
    chunk_index,
    chunk,
    last_modified,
    CURRENT_TIMESTAMP() as _parsed_at
from
    chunked

また、dbt_project.ymlでは下記のようにpre_hookpost_hookを設定しています。modelsフォルダ内でcorpusフォルダを用意し、この中でCortex Search Service用にパースとチャンキングを行うmodelを定義していく前提です。

  • pre_hook:PDFを置いているS3に対する外部ステージをリフレッシュすることで、dbt modelがクエリするディレクトリテーブルをmodel構築前に更新
  • post_hook:Cortex Search Service用のテーブルはCHANGE_TRACKINGの有効化が必要のため、構築後に有効化するクエリを実施
name: 'sample_dbt_project'
version: '1.9.0'
config-version: 2

profile: 'sample_project'

models:
  sample_dbt_project:
    corpus:
      sample:
        +schema: corpus_sample
        +pre-hook:
          - "ALTER STAGE RAW_DB.SAMPLE_SOURCE_SNOWPIPE.STAGE_CORPUS_SAMPLE REFRESH"
        +post-hook:
          - "ALTER TABLE {{ this }} SET CHANGE_TRACKING = TRUE"

dbt model初回実行&Cortex Search Service構築

S3には下図のように、ブルボン様とアイシン高丘様の事例だけ入れておきます。(ステージ作成のクエリなどは割愛します。)

2026-02-11_06h59_06

上述のdbt modelをdbt buildなどで実行すると、下図のように事例PDFをパースしてチャンキングしたテーブルがSnowflakeに作られます。

2026-02-11_07h03_28

このdbt modelに対するCortex Search ServiceとCortex Agentを以下のクエリで構築します。

-- Cortex Search Service作成
CREATE OR REPLACE CORTEX SEARCH SERVICE PROD_DB.AIML_SAMPLE.CSS_SNOWFLAKE_CASE_STUDIES
    ON chunk
    ATTRIBUTES file_name, case_study_name, header_1, header_2, header_3, chunk_index, last_modified
    WAREHOUSE = CORTEX_SEARCH_WH
    TARGET_LAG = '365 day'
    EMBEDDING_MODEL = 'voyage-multilingual-2'
    INITIALIZE = ON_CREATE
    COMMENT = 'Snowflake導入事例PDFのチャンクデータに対するCortex Search Service'
AS
SELECT
    file_name,
    case_study_name,
    header_1,
    header_2,
    header_3,
    chunk_index,
    chunk,
    last_modified
FROM PROD_DB.CORPUS_SAMPLE.COR_SNOWFLAKE_CASE_STUDIES_PARSED;

--  Agent作成
CREATE OR REPLACE AGENT PROD_DB.AIML_SAMPLE.AGENT_SNOWFLAKE_CASE_STUDIES
    COMMENT = 'Snowflake導入事例PDFを検索・回答するCortex Agent'
    PROFILE = '{"display_name": "Snowflake事例検索", "avatar": "search", "color": "#29B5E8"}'
    FROM SPECIFICATION
    $$
    instructions:
      system: |
        あなたはSnowflake導入事例PDFの内容について質問に回答するアシスタントです。
        ユーザーからの質問に対して、Cortex Search Serviceを使って関連する事例情報を検索し、
        正確な情報に基づいて回答してください。
        検索結果に含まれない情報については、推測せずにその旨を伝えてください。
      response: |
        日本語で簡潔に回答してください。
        回答には参照した事例名(case_study_name)を明記してください。

    tools:
      - tool_spec:
          type: "cortex_search"
          name: "SearchCaseStudies"
          description: "Snowflake導入事例PDFの検索"

    tool_resources:
      SearchCaseStudies:
        name: "PROD_DB.AIML_SAMPLE.CSS_SNOWFLAKE_CASE_STUDIES"
        max_results: "5"
    $$;

-- Snowflake Intelligence への追加

ALTER SNOWFLAKE INTELLIGENCE SNOWFLAKE_INTELLIGENCE_OBJECT_DEFAULT
    ADD AGENT PROD_DB.AIML_SAMPLE.AGENT_SNOWFLAKE_CASE_STUDIES;

この上でSnowflake Intelligenceから問い合わせすると、下図のように結果を得られます。

2026-02-11_07h10_29

S3バケットのデータを更新し、dbt model 2回目の実行

まず、差分更新がちゃんと効いているかを検証するため、S3バケットに以下2ファイルの変更を加えます。

  • アイシン高丘様の事例のPDFの2ページ目を削除し、同じファイル名でS3バケットに配置
  • 千葉銀行様の事例のPDFファイルを新規でS3バケットに配置

2026-02-11_07h16_02

この状態で、dbt modelをdbt buildなどで実行して更新すると、アイシン高丘様のページ数が減ったためチャンク数が減っており、新しく千葉銀行様のレコードが追加されていることがわかります。

2026-02-11_07h36_34

また、パースを行ったタイムスタンプを記録する_PARSED_AT列を見ると、差分があったアイシン高丘様と千葉銀行様のレコードのみ、タイムスタンプが更新されていることがわかります。

2026-02-11_07h47_39

テーブルが更新できたので、以下のクエリを実行してCortex Search Serviceも更新します。

ALTER CORTEX SEARCH SERVICE PROD_DB.AIML_SAMPLE.CSS_SNOWFLAKE_CASE_STUDIES REFRESH;

この状態でSnowflake Intelligenceから再度質問をしてみると、更新したデータに対して問い合わせできていることがわかります。

2026-02-11_07h41_02

最後に

Cortex Search Service用にPDFをパースしてチャンキングするdbtのIncremental modelを考えてみたので、その内容についてまとめてみました。

dbtのIncremental modelを使うことで、差分が発生したファイルだけ検知してAI_PARSE_DOCUMENT関数の対象とできるので、コストを削減できます。

ぜひご活用ください!

この記事をシェアする

FacebookHatena blogX

関連記事