[レポート] dbtとAirflowとGreat Expectationsで堅牢なデータパイプラインを構築する #dbtcoalesce

大いなる期待
2021.01.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

大阪オフィスの玉井です。

12月7日〜11日の間、Fishtown Analytics社がcoalesceというオンラインイベントを開催していました(SQLを触っている方はピンとくるイベント名ではないでしょうか)。

「Fishtown Analytics社って何やってる会社?」という感じですが、dbtというツールを開発しているベンダーです。dbtについては、下記をご覧ください。

今回は、その中からBuilding a robust data pipeline with dbt, Airflow, and Great Expectationsというセッションを受講したので、レポートを記します。

イベント概要

公式

登壇者

概要

How do dbt and Great Expectations complement each other? In this video, Sam Bail of Superconductive will outline a convenient pattern for using these tools together and highlight where each one can play its strengths: Data pipelines are built and tested during development using dbt, while Great Expectations can handle data validation, pipeline control flow, and alerting in a production environment.

私なりの要約

Great Expectationsというデータテストツールの紹介がメインです。そして、Great Expectationsとdbtがどのように連携するのかも紹介されました。

Home | Great Expectations

セッションレポート

はじめに

アジェンダ

  • 本プレゼンは2部構成になっている
    • 第一部:データはテストするべきだということを説明する
    • 第二部:dbtとAirflowとGreat Expectationsでデータパイプラインをを作り、データをテストする方法を見せる

データはテストするべき

  • 「あなたはデータをテストするべき」
    • これをやっていない人は、どうやって夜寝れているのかわからない

  • もし私の言うことが信じられないのであれば、データをテストするべき理由を説明する

データをテストするべき理由

  • 当社のユーザーの発言
    • 「ステークホルダーが私たちよりも先にデータの問題に気づいてしまった…」
    • 「その結果、データと私たちのチームへの信頼が失われた…」
  • 信頼の低下は、データの品質の問題が原因で起こる
    • データを扱う誰もが経験したことあるはず
  • この経験が全く無い、という人は連絡をしてほしい
    • データの問題をどうやって避けてこれたのか興味がある

  • 当社のユーザーの発言
    • 「データ品質の問題を発見した後にデータパイプラインを再実行すると、計算環境のコストが再発生する」
  • データ品質(を維持するための対応)にはコストがかかる

  • データ品質の悪さ
  • データを修正する=データパイプラインのバグを修正する
    • データを扱う人ならわかると思う
    • 問題の根本原因を探るため
    • プロジェクトの後の段階で気づくと、精神的に参ってくる
  • 2018年にデータに関する障害があった
    • 毎月のデータ配信を修正するため、週末ずっと働いていた
    • 私達はこれをデータホラーストーリーと呼んでいる

dbt, Airflow, Great Expectationsでデータテストを行う方法

データのテストはちょっと難しい

  • データをテストした方がいいということはわかったと思う
  • データ品質を保つプロジェクトに着手する際には、多くのことを知る必要がある
    • どのツールを使えばいいのか?
    • 正しいテストをしているということをどうやって知るのか?
    • テストが失敗したときはどうすればいいのか?
    • 最新の状態に保つにはどうすればいいのか?
    • ステークホルダーがデータの状態を知るにはどうすればいいのか?

  • 私達が「dAGスタック」と呼んでいるものを紹介する
    • dbt, Airflow, Great Expectations
  • Airflowについて
    • 他のワークフローオーケストレーションツールに置き換えても問題ない
    • 鍵はdbtとGreat Expectations
    • Dagsterを使っても良い

  • 最初に使うのはdbt
  • 今回、dbt自体を詳細に説明しない

  • 次はAirflow
  • 私はAirflowを「ステロイドを使ったcron」と呼んでいる
    • cronジョブに加えて、さらに色々なことができる
    • 条件式、分岐、アラート等ができる
  • Airflow以外にも同系統のツールが存在している理由
    • Airflowにはいくつか欠点があるから
    • でも今回やりたいことについてはAirflowで十分できる

  • dbtのDAGとの関係について

    • dbtのDAGを、AirflowのDAGのノードの一つして持つことが出来る
  • Airflowからdbtモデルを実行する方法
    • 様々なパターンがある
    • Airflowで、個別のdbtモデルごとにスケジュールを組んで、dbtモデルをAirflow DAG内のタスクにマッピングする等
    • dbtのDAGが、AirflowのDAGの1ノードに対応していることになる

  • 最後はGreat Expectations
    • オープンソースのデータ検証・文書化ツール
    • オープンソースなので誰でも利用可能
    • Pythonで開発されている
    • 私はこれを開発しているsuperconductive社で働いている
  • Great Expectationsで出来ること
    • データセットに対してバリデーションすることができる
    • これを「プロセスバリデーション」と言う
    • 例「このカラムの値は、常に1〜6でなければならない」というバリデーションをかませる
  • プロセスバリデーションに対しては「パス」するか「失敗」するかのどちらか
    • これがExpectationの概念

  • 「Expectation」とは何か?
    • データに何を期待しているのかを指すもの
    • これはコードで表現することができる
  • スライドの上部のコード例
    • 「カラムの値が1〜6であること」を記述している
  • 「Expectation」はJSONで保存することができる
    • 別プロジェクト等で再利用することができる
  • 「Expectation」はドキュメンテーションすることできる
    • 後ほど詳細説明
  • 「Expectation」の内容はどうやって考えるのか?
    • データをよく理解しているメンバーが基準を策定する
    • 受入テストを行うユーザー側に基準を策定してもらう
    • 「このカラムの範囲はX〜Yで、NULLではダメ」という仕様を作り、それを「Expectation」に落とす

  • Great Expectationsには「自動プロファイリング」という機能がある
    • 過去のデータを参考に「Expectation」を自動作成する
  • スライドの例でいうと…
    • 過去のデータを参照した結果、値の範囲が「1〜6」だった
    • 今後も「1〜6という値が期待されるだろう」ということで、「1〜6であるべき」というExpectationが自動作成される
  • 当然、過去データが本当に正しいデータを常に持っているとは限らない
    • ある程度の専門知識を持ってチェックする必要がある
    • Expectationの追加や微調整は必要

  • Expectationを保存すると、expectation suiteというものに保存される
    • Expectationのセットのようなもの
    • 今後発生するデータや他のデータに再利用できる

  • Great ExpectationsにはData Docsという機能がある
    • 基本的にはバリデーション結果を表示するもの
    • 通らなかったデータや、通らなかった理由もある程度わかるようになっている
  • スライドの例
    • 「1〜6の値でなければならない」というバリデーションの結果
    • 1579個の不合格が発生した(赤字)
    • 不合格の例として、0の値があったことがわかる
  • 「データ品質レポート」と呼ぶ人もいる

  • 3つのツールを紹介した
  • 次は「dAG」スタックの紹介に入る

チームやシステム間でハンドオフ(データの受け渡し)がある場合は絶対にテストをするべき

  • スライドは左から右に見ていく
  • 一番左はソースデータ
    • DBからソースデータを取得する
    • 何らかのAPIからデータを取得する
    • 失敗した場合は何らかのレスポンスを得る
    • 成功した場合は、そのデータをステージング環境に抽出する
  • 抽出が正しいか確認するためにバリデーションを行う
    • 問題ない場合はデータ変換のステップに移行する
  • データ変換を経て、最終的にデータを本番環境に移行する
    • ここで別のテストを追加することもできる
  • (ここで突拍子もないことをいうが)データが別チームや別システムに移動する=ハンドオフが発生する
    • ハンドオフにはバリデーションのステップがある

  • 具体的な例
  • Airflowプロジェクトがあると仮定する
    • dbtプロジェクトもある
    • Great Expectationのプロジェクトもある

  • Airflowに色々組み込んでいる
    • Great ExpectationのAirflow演算子を使用している
    • 純粋なPythonを使用しているデータロードのタスクがいくつかある
    • dbt_ run演算子も使用している

  • 最終的なAirflowのプロジェクトとしては、スライドのスクリーンショットのようになる
    • ソースデータをバリデート
    • ソースデータをロード
    • dbtを実行して分析結果を出力してバリデート
    • 最終的にデータを公開

  • ソースデータでは、期待された形式と一致しているかどうかをテストしている
    • 手元のCSVをアップロードしてすぐにテストできる
    • すぐにテストする理由→ここのテストが通過できないと、後続のジョブに流す意味がなくなるから

  • 次はソースデータが正しくロードできたかどうかをテストする
  • ほとんどの場合、ソースデータと比較して、行数が減ってないかどうかを見てテストする
  • もしくは値の不一致
    • 特殊文字、テキスト、切り捨て、浮動小数点など
  • データを移動するたびにテストの必要がある

  • ロードしたデータに対してdbtを実行する
  • dbt自体もテストができる
  • 実際のデータをテストすることと、データ変換のコードをテストすることは区別する必要がある
    • dbtのテストは「コード(SQL)のテスト」に重点を置く
    • 変換中の結合処理でファンアウト発生していないか
    • 変換中に特定のカラムにNULLが発生していないか

まとめ

  • 複数の場所でデータをテストする
  • 異なるタイプのテストをする

おわりに

アプリケーション開発におけるテストの重要性はエンジニアにとって当たり前のことですが、データ分析のために生成したデータに対するテストというのは、日本ではあまり馴染みがないかもしれません。データのテストに特化したツールというものがあることに驚きでした。