この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
大阪オフィスの玉井です。
データオーケストレーションツールのDagsterは、開発したデータパイプライン(の処理)に対して、テストコードを書くことができます。
Dagsterにおけるテスト
データパイプラインを開発して、それを統率することができるDagsterですが、1つ1つの処理に対して「生成されるデータが要件通りかどうか」という観点でテストが書けるのがDagsterの特徴となっています。
これを利用して、SolidやPipeline毎に単体テストを実装することで、生成されるデータの品質を担保することができます。
やってみた
参考
こちらのチュートリアルは、Solidと同じファイルにテストコードも書いていますが、当記事では、テストファイルは分けてやってみます。
検証環境
- macOS Catalina 10.15.7
- Python 3.9.4
- pytest 6.2.3
- Dagster 0.10.9
テスト対象のSolidとPipeline
まず、テストするSolidとPipelineを用意します。…といっても、こちらで使ったものをそのまま流用します。
hello_cereal_ta.py
import csv
import os
from dagster import execute_pipeline, pipeline, solid
@solid
def hello_cereal(context):
# データセットがこのファイルと同じディレクトリにあると仮定
dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
with open(dataset_path, "r") as fd:
# 標準のcsvライブラリを使用して行を読み込む
cereals = [row for row in csv.DictReader(fd)]
context.log.info(
"シリアルに関するデータは{n_cereals}件あります。".format(n_cereals=len(cereals))
)
return cereals
@pipeline
def hello_cereal_pipeline():
hello_cereal()
テストファイルを作成する
それでは早速テストを書いていきます。
Dagsterにおけるテストというと、何か特別な機能とかを使うような気がしますが、今回の処理内容はPythonで書かれているので、実際に記述するのは、いたって普通のPythonのassert文です。そして、テストの実行はpytest
を使います。ただし、テストを書くのに便利なDagster用のライブラリがあるので、それを利用します。
test_hello_cereal.py
import csv
import os
import hello_cereal_ta
from dagster import execute_solid, execute_pipeline, pipeline, solid
def test_hello_cereal_solid():
res = execute_solid(hello_cereal_ta.hello_cereal)
# 処理が成功したかどうか
assert res.success
# 読み込んだファイルのレコード数が77件かどうか
assert len(res.output_value()) == 77
def test_hello_cereal_pipeline():
# やってることはsolidのテストと同じ
res = execute_pipeline(hello_cereal_ta.hello_cereal_pipeline)
assert res.success
assert len(res.result_for_solid("hello_cereal").output_value()) == 77
execute_solid
とexecute_pipeline
というものを使ってテストを書いています。これを使うことで、SolidやPipelineの実行結果(のオブジェクト)を取得することができます。このオブジェクトに対して色々な処理がかけられるので、非常に簡単にテストを書くことが出来ます。
テストする
テストコードを実際に実行してみます。pytest
でサクッとテストしてみましょう。
$ pytest test_hello_cereal.py
================================================================== test session starts ===================================================================
platform darwin -- Python 3.9.4, pytest-6.2.3, py-1.10.0, pluggy-0.13.1
rootdir: xxxxxxxxx/test_dagster_pipeline
collected 2 items
test_hello_cereal.py .. [100%]
=================================================================== 2 passed in 1.74s ====================================================================
無事、テストが実行され、テスト自体も通過しました。
おわりに
今回は各処理単位のテスト、要するに単体テストレベルのものをやってみました。各処理に対するテストは、今回の方法で実行することができると思います。しかし、(当然のことながら)テストは単体テストだけではありません。
Dagsterにはユニットテストを行う仕組みもあります。そちらの方も試して、どこかのタイミングで記事にしたいと思います。