データアナリティクス事業本部 機械学習チームの鈴木です。
dbtのAudit_helperを試してみたので、簡単にですがご紹介します。
今回はdbt-athena-communityで使うことを想定して動作を確認しました。
Audit_helperについて
モデルをリファクタリングする際などに、元のモデルと同じ出力を提供するか確認するテーブル監査プロセスを支援するためのPackageです。
以下のブログ記事が大変参考になります。
ブログ記事では非dbtの仕組みで作成していたテーブルとdbtモデルにより作成されるテーブルを比較することを例としていますが、例えばマートのモデルのロジック修正によって期待値を格納したテーブルと差分が出ないかのような回帰テストにも利用できないかと思い、今回調べてみました。
非常に便利にテーブル差分の要約情報を取得できるPackageですが、記事執筆時点ではレコードの重複があるケースで、dbt-athena-communityで使う分には挙動を知っておくべき点があったので後述します。
Audit_helperのインストール
Packageの追加については以下のブログを参考に進めました。
dbtプロジェクトのルート直下に以下のファイルを作成しました。
packages.yml
packages:
- package: dbt-labs/audit_helper
version: 0.9.0
dbtプロジェクトでdbt deps
コマンドを実行することで、Packageをインストールしました。
dbt deps
なお、dbt Coreおよびdbt-athena-communityのバージョンは以下です。
- dbt Core: 1.5.6
- dbt-athena-community: 1.5.1
compare_queriesマクロを試す
いくつかのマクロが利用できるようになりますが、今回はそのうちのcompare_queries
マクロを試しました。
1. モデルの作成
以下のようにモデルを作成しました。
models
├── audit
│ └── audit_sample.sql
├── mart
│ └── iris_new.sql
└── staging
└── sample__sources.yml
順番に確認していきます。
sample__sources.yml
S3バケットに格納したIrisデータセットを読み出すGlueテーブルを参照するソーステーブルです。
※ S3バケットへのデータの配置とGlueテーブルの作成はすでにされているものとします。
sample__sources.yml
version: 2
sources:
- name: iris_raw
database: awsdatacatalog
schema: cm-nayuts-dbt-athena
tables:
- name: iris
なお、データセットは、下記リンクにて公開されています。
https://archive.ics.uci.edu/ml/datasets/iris
種別ごとの件数は以下のように50件ずつ、合わせて150件のレコードがあります。
レコードの重複を加味すると以下の件数になりました。
iris_new.sql
Irisデータセットを加工したマートです。
iris_new.sql
SELECT
class,
sepal_length,
sepal_width,
petal_length,
petal_width
FROM {{ source('iris_raw','iris') }}
WHERE class <> 'Iris-setosa'
UNION ALL
SELECT
'Iris-setosa' AS class,
CAST(5.2 AS real) AS sepal_length,
CAST(3.3 AS real) AS sepal_width,
CAST(1.5 AS real) AS petal_length,
CAST(0.3 AS real) AS petal_width
;
処理としては、Iris-setosa
の種別のデータを削除し、1件の別のデータに差し替えました。あまりこのようなマート作成処理は書かないようには思いますが、後で見るようにAudit_heplerの出力が見やすいのでこのようにしておきました。
今回はこのマートとデータソースの差分をAudit_heplerを使って確認します。
audit_sample.sql
Audit_heplerが提供する機能のcompare_queries
マクロを含む、監査用のモデルです。
audit_sample.sql
{# in dbt Develop #}
{% set old_fct_orders_query %}
select
sepal_length,
sepal_width,
petal_length,
petal_width,
class
from {{ref('iris_new')}}
{% endset %}
{% set new_fct_orders_query %}
select
sepal_length,
sepal_width,
petal_length,
petal_width,
class
from {{ source('iris_raw','iris') }}
{% endset %}
{{ audit_helper.compare_queries(
a_query=old_fct_orders_query,
b_query=new_fct_orders_query
) }}
マートとデータソースの中身をそれぞれクエリA, Bとして比較しました。compare_queries
マクロにはprimary_key
が指定できますが、今回は該当するキーがないので指定しませんでした。レポジトリのマクロの記載によると、compare_relations
マクロと非常に類似しているとのことで、こちらのマクロではオプションの項目だったので省略可と判断しています。
2. モデルの実行
dbt run
コマンドでモデルを実行しました。
dbt run
# 05:08:59 Running with dbt=1.5.6
# 05:08:59 Registered adapter: athena=1.5.1
# 05:08:59 Found 2 models, 0 tests, 0 snapshots, 0 analyses, 467 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics, 0 groups
# 05:08:59
# 05:09:03 Concurrency: 1 threads (target='dev')
# 05:09:03
# 05:09:03 1 of 2 START sql table model cm-nayuts-dbt-athena.iris_new ..................... [RUN]
# 05:09:08 1 of 2 OK created sql table model cm-nayuts-dbt-athena.iris_new ................ [OK 101 in 5.12s]
# 05:09:08 2 of 2 START sql table model cm-nayuts-dbt-athena.audit_sample ................. [RUN]
# 05:09:13 2 of 2 OK created sql table model cm-nayuts-dbt-athena.audit_sample ............ [OK 3 in 5.12s]
# 05:09:13
# 05:09:13 Finished running 2 table models in 0 hours 0 minutes and 13.86 seconds (13.86s).
# 05:09:13
# 05:09:13 Completed successfully
# 05:09:13
# 05:09:13 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
audit_sample
テーブルが作成されているので中身を確認しました。
in_a
はマート、in_b
はソーステーブルからのデータになります。それぞれにあるデータかどうかで分けて件数と割合を出力してくれています。
結果は想定とは若干ズレがあり、Athenaエンジンv3のexcept
の仕様に由来するものだと思われました。重複するレコードは1件分となっているようです。dbt-athena-communityプラグインを使った場合に、重複したレコード件数は1件にまとめない結果の出力を求めている場合は注意した方がよさそうです。PKがあるデータに対してであれば問題ないと推測されます。
参考までに以下が実行されたモデル作成のSQLになります。
-- /* {"app": "dbt", "dbt_version": "1.5.6", "profile_name": "sample_project", "target_name": "dev", "node_id": "model.sample_project.audit_sample"} */
create table "awsdatacatalog"."cm-nayuts-dbt-athena"."audit_sample"
with (
table_type='hive',
is_external=true,external_location='s3://データを入れたバケット名/data/cm-nayuts-dbt-athena/audit_sample/eb5177c1-abbe-40ec-9f49-4f6d6a21fa9b',
format='parquet'
)
as
with a as (
select
sepal_length,
sepal_width,
petal_length,
petal_width,
class
from "awsdatacatalog"."cm-nayuts-dbt-athena"."iris_new"
),
b as (
select
sepal_length,
sepal_width,
petal_length,
petal_width,
class
from "awsdatacatalog"."cm-nayuts-dbt-athena"."iris"
),
a_intersect_b as (
select * from a
intersect
select * from b
),
a_except_b as (
select * from a
except
select * from b
),
b_except_a as (
select * from b
except
select * from a
),
all_records as (
select
*,
true as in_a,
true as in_b
from a_intersect_b
union all
select
*,
true as in_a,
false as in_b
from a_except_b
union all
select
*,
false as in_a,
true as in_b
from b_except_a
),
summary_stats as (
select
in_a,
in_b,
count(*) as count
from all_records
group by 1, 2
),
final as (
select
*,
round(100.0 * count / sum(count) over (), 2) as percent_of_total
from summary_stats
order by in_a desc, in_b desc
)
select * from final
該当のマクロのソースは以下になります。
https://github.com/dbt-labs/dbt-audit-helper/blob/main/macros/compare_queries.sql
終わりに
dbt-athena-communityプラグインでAudit_helperを試してみたご紹介でした。
参考になりましたら幸いです。