dbt-athenaでAudit_helperのテーブル監査支援機能を試してみた

Audit_helperのcompare_queriesマクロは、モデル間のレコードの差分の要約がとても手軽にできて便利でした。
2023.10.22

データアナリティクス事業本部 機械学習チームの鈴木です。

dbtのAudit_helperを試してみたので、簡単にですがご紹介します。

今回はdbt-athena-communityで使うことを想定して動作を確認しました。

Audit_helperについて

モデルをリファクタリングする際などに、元のモデルと同じ出力を提供するか確認するテーブル監査プロセスを支援するためのPackageです。

以下のブログ記事が大変参考になります。

ブログ記事では非dbtの仕組みで作成していたテーブルとdbtモデルにより作成されるテーブルを比較することを例としていますが、例えばマートのモデルのロジック修正によって期待値を格納したテーブルと差分が出ないかのような回帰テストにも利用できないかと思い、今回調べてみました。

非常に便利にテーブル差分の要約情報を取得できるPackageですが、記事執筆時点ではレコードの重複があるケースで、dbt-athena-communityで使う分には挙動を知っておくべき点があったので後述します。

Audit_helperのインストール

Packageの追加については以下のブログを参考に進めました。

dbtプロジェクトのルート直下に以下のファイルを作成しました。

packages.yml

packages:
  - package: dbt-labs/audit_helper
    version: 0.9.0

dbtプロジェクトでdbt depsコマンドを実行することで、Packageをインストールしました。

dbt deps

なお、dbt Coreおよびdbt-athena-communityのバージョンは以下です。

  • dbt Core: 1.5.6
  • dbt-athena-community: 1.5.1

compare_queriesマクロを試す

いくつかのマクロが利用できるようになりますが、今回はそのうちのcompare_queriesマクロを試しました。

1. モデルの作成

以下のようにモデルを作成しました。

models
├── audit
│   └── audit_sample.sql
├── mart
│   └── iris_new.sql
└── staging
    └── sample__sources.yml

順番に確認していきます。

sample__sources.yml

S3バケットに格納したIrisデータセットを読み出すGlueテーブルを参照するソーステーブルです。

※ S3バケットへのデータの配置とGlueテーブルの作成はすでにされているものとします。

sample__sources.yml

version: 2

sources:
  - name: iris_raw
    database: awsdatacatalog
    schema: cm-nayuts-dbt-athena
    tables:
      - name: iris

なお、データセットは、下記リンクにて公開されています。

https://archive.ics.uci.edu/ml/datasets/iris

種別ごとの件数は以下のように50件ずつ、合わせて150件のレコードがあります。

集計結果

レコードの重複を加味すると以下の件数になりました。

集計結果2

iris_new.sql

Irisデータセットを加工したマートです。

iris_new.sql

SELECT 
    class,
    sepal_length,
    sepal_width,
    petal_length,
    petal_width
FROM {{ source('iris_raw','iris') }}
WHERE class <> 'Iris-setosa'
UNION ALL
SELECT 
    'Iris-setosa' AS class,
    CAST(5.2 AS real) AS sepal_length,
    CAST(3.3 AS real) AS sepal_width,
    CAST(1.5 AS real) AS petal_length,
    CAST(0.3 AS real) AS petal_width
;

処理としては、Iris-setosaの種別のデータを削除し、1件の別のデータに差し替えました。あまりこのようなマート作成処理は書かないようには思いますが、後で見るようにAudit_heplerの出力が見やすいのでこのようにしておきました。

今回はこのマートとデータソースの差分をAudit_heplerを使って確認します。

audit_sample.sql

Audit_heplerが提供する機能のcompare_queriesマクロを含む、監査用のモデルです。

audit_sample.sql

{# in dbt Develop #}


{% set old_fct_orders_query %}
select
    sepal_length,
    sepal_width,
    petal_length,
    petal_width,
    class
from {{ref('iris_new')}}
{% endset %}


{% set new_fct_orders_query %}
select
    sepal_length,
    sepal_width,
    petal_length,
    petal_width,
    class
from {{ source('iris_raw','iris') }}
{% endset %}


{{ audit_helper.compare_queries(
    a_query=old_fct_orders_query,
    b_query=new_fct_orders_query
) }}

マートとデータソースの中身をそれぞれクエリA, Bとして比較しました。compare_queriesマクロにはprimary_keyが指定できますが、今回は該当するキーがないので指定しませんでした。レポジトリのマクロの記載によると、compare_relationsマクロと非常に類似しているとのことで、こちらのマクロではオプションの項目だったので省略可と判断しています。

2. モデルの実行

dbt runコマンドでモデルを実行しました。

dbt run

# 05:08:59  Running with dbt=1.5.6
# 05:08:59  Registered adapter: athena=1.5.1
# 05:08:59  Found 2 models, 0 tests, 0 snapshots, 0 analyses, 467 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics, 0 groups
# 05:08:59  
# 05:09:03  Concurrency: 1 threads (target='dev')
# 05:09:03  
# 05:09:03  1 of 2 START sql table model cm-nayuts-dbt-athena.iris_new ..................... [RUN]
# 05:09:08  1 of 2 OK created sql table model cm-nayuts-dbt-athena.iris_new ................ [OK 101 in 5.12s]
# 05:09:08  2 of 2 START sql table model cm-nayuts-dbt-athena.audit_sample ................. [RUN]
# 05:09:13  2 of 2 OK created sql table model cm-nayuts-dbt-athena.audit_sample ............ [OK 3 in 5.12s]
# 05:09:13  
# 05:09:13  Finished running 2 table models in 0 hours 0 minutes and 13.86 seconds (13.86s).
# 05:09:13  
# 05:09:13  Completed successfully
# 05:09:13  
# 05:09:13  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

audit_sampleテーブルが作成されているので中身を確認しました。

実行した監査結果

in_aはマート、in_bはソーステーブルからのデータになります。それぞれにあるデータかどうかで分けて件数と割合を出力してくれています。

結果は想定とは若干ズレがあり、Athenaエンジンv3のexceptの仕様に由来するものだと思われました。重複するレコードは1件分となっているようです。dbt-athena-communityプラグインを使った場合に、重複したレコード件数は1件にまとめない結果の出力を求めている場合は注意した方がよさそうです。PKがあるデータに対してであれば問題ないと推測されます。

参考までに以下が実行されたモデル作成のSQLになります。

-- /* {"app": "dbt", "dbt_version": "1.5.6", "profile_name": "sample_project", "target_name": "dev", "node_id": "model.sample_project.audit_sample"} */

  
    create table "awsdatacatalog"."cm-nayuts-dbt-athena"."audit_sample"
  with (
    table_type='hive',
    is_external=true,external_location='s3://データを入れたバケット名/data/cm-nayuts-dbt-athena/audit_sample/eb5177c1-abbe-40ec-9f49-4f6d6a21fa9b',
    format='parquet'
  )
  as
    










with a as (

select
    sepal_length,
    sepal_width,
    petal_length,
    petal_width,
    class
from "awsdatacatalog"."cm-nayuts-dbt-athena"."iris_new"


),

b as (

select
    sepal_length,
    sepal_width,
    petal_length,
    petal_width,
    class
from "awsdatacatalog"."cm-nayuts-dbt-athena"."iris"


),

a_intersect_b as (
select * from a
    
intersect

select * from b

),

a_except_b as (
select * from a
    
except

select * from b

),

b_except_a as (
select * from b
    
except

select * from a

),

all_records as (
select
        *,
        true as in_a,
        true as in_b
    from a_intersect_b
union all
select
        *,
        true as in_a,
        false as in_b
    from a_except_b
union all
select
        *,
        false as in_a,
        true as in_b
    from b_except_a

),

summary_stats as (
select
    in_a,
        in_b,
        count(*) as count
from all_records
    group by 1, 2

),

final as (
select
    *,
        round(100.0 * count / sum(count) over (), 2) as percent_of_total
from summary_stats
    order by in_a desc, in_b desc

)

select * from final

該当のマクロのソースは以下になります。

https://github.com/dbt-labs/dbt-audit-helper/blob/main/macros/compare_queries.sql

終わりに

dbt-athena-communityプラグインでAudit_helperを試してみたご紹介でした。

参考になりましたら幸いです。