Dagsterで開発したデータパイプラインの処理をテストする

僕が知っているテストはExcel+スクショ
2021.04.30

大阪オフィスの玉井です。

データオーケストレーションツールのDagsterは、開発したデータパイプライン(の処理)に対して、テストコードを書くことができます。

Dagsterにおけるテスト

データパイプラインを開発して、それを統率することができるDagsterですが、1つ1つの処理に対して「生成されるデータが要件通りかどうか」という観点でテストが書けるのがDagsterの特徴となっています。

これを利用して、SolidやPipeline毎に単体テストを実装することで、生成されるデータの品質を担保することができます。

やってみた

参考

こちらのチュートリアルは、Solidと同じファイルにテストコードも書いていますが、当記事では、テストファイルは分けてやってみます。

検証環境

  • macOS Catalina 10.15.7
  • Python 3.9.4
  • pytest 6.2.3
  • Dagster 0.10.9

テスト対象のSolidとPipeline

まず、テストするSolidとPipelineを用意します。…といっても、こちらで使ったものをそのまま流用します。

hello_cereal_ta.py

import csv
import os

from dagster import execute_pipeline, pipeline, solid

@solid
def hello_cereal(context):
    # データセットがこのファイルと同じディレクトリにあると仮定
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        # 標準のcsvライブラリを使用して行を読み込む
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(
        "シリアルに関するデータは{n_cereals}件あります。".format(n_cereals=len(cereals))
    )

    return cereals

@pipeline
def hello_cereal_pipeline():
    hello_cereal()

テストファイルを作成する

それでは早速テストを書いていきます。

Dagsterにおけるテストというと、何か特別な機能とかを使うような気がしますが、今回の処理内容はPythonで書かれているので、実際に記述するのは、いたって普通のPythonのassert文です。そして、テストの実行はpytestを使います。ただし、テストを書くのに便利なDagster用のライブラリがあるので、それを利用します。

test_hello_cereal.py

import csv
import os
import hello_cereal_ta

from dagster import execute_solid, execute_pipeline, pipeline, solid

def test_hello_cereal_solid():
    res = execute_solid(hello_cereal_ta.hello_cereal)
    # 処理が成功したかどうか
    assert res.success
    # 読み込んだファイルのレコード数が77件かどうか
    assert len(res.output_value()) == 77

def test_hello_cereal_pipeline():
   # やってることはsolidのテストと同じ
    res = execute_pipeline(hello_cereal_ta.hello_cereal_pipeline)
    assert res.success
    assert len(res.result_for_solid("hello_cereal").output_value()) == 77

execute_solidexecute_pipelineというものを使ってテストを書いています。これを使うことで、SolidやPipelineの実行結果(のオブジェクト)を取得することができます。このオブジェクトに対して色々な処理がかけられるので、非常に簡単にテストを書くことが出来ます。

テストする

テストコードを実際に実行してみます。pytestでサクッとテストしてみましょう。

$ pytest test_hello_cereal.py
================================================================== test session starts ===================================================================
platform darwin -- Python 3.9.4, pytest-6.2.3, py-1.10.0, pluggy-0.13.1
rootdir: xxxxxxxxx/test_dagster_pipeline
collected 2 items

test_hello_cereal.py ..                                                                                                                            [100%]

=================================================================== 2 passed in 1.74s ====================================================================

無事、テストが実行され、テスト自体も通過しました。

おわりに

今回は各処理単位のテスト、要するに単体テストレベルのものをやってみました。各処理に対するテストは、今回の方法で実行することができると思います。しかし、(当然のことながら)テストは単体テストだけではありません。

Dagsterにはユニットテストを行う仕組みもあります。そちらの方も試して、どこかのタイミングで記事にしたいと思います。