データオーケストレーションツールのDagsterを使ってみた

ダグといえば、ダグ・アルドリッチ
2021.04.19

大阪オフィスの玉井です。

dbt界隈の人たちがこぞって推奨している(ように思える)ツールであるDagsterを使ってみました。

Dasterとは?

公式の紹介文を引用します。

Dagster is a data orchestrator. It lets you define pipelines (DAGs) in terms of the data flow between logical components called solids. These pipelines can be developed locally and run anywhere.

「データオーケストレーター」と言われると、なかなかピンときませんが、ジョブ管理ツールの一種と思っていただければわかりやすいと思います(Apache Airflow等と同じカテゴリ)。データパイプラインの開発はもちろん、一連の処理の運用まで行えるツールになっています。

基本的な使い方をやってみた

検証環境

  • macOS Catalina 10.15.7
  • Dagster 0.10.9
  • Python 3.9.2
  • pip 21.0.1

Dagsterをインストールする

インストールはpipで行います。

$ pip install dagster dagit

dagsterというのは、その名の通り、ツール本体です。dagitというのは、Dagsterを操作できるUI等になります。

1つの処理を動かす

まずは、こちらのチュートリアルをやってみます。

まず、処理する元データを入手しておきます。

$ curl -O https://raw.githubusercontent.com/dagster-io/dagster/master/examples/docs_snippets/docs_snippets/intro_tutorial/cereal.csv

そして、いきなりですが、下記をDagsterで動かしてみます。こちらはDagsterのチュートリアルのコードです(一部日本語に変えています)。

import csv
import os

from dagster import execute_pipeline, pipeline, solid

@solid
def hello_cereal(context):
    # データセットがこのファイルと同じディレクトリにあると仮定
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        # 標準のcsvライブラリを使用して行を読み込む
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(
        "シリアルに関するデータは{n_cereals}件あります。".format(n_cereals=len(cereals))
    )

    return cereals

@pipeline
def hello_cereal_pipeline():
    hello_cereal()

Dagsterの特徴的な要素として、solidがあります。一連のワークフローにおいて、中の一つ一つの処理を、Dagsterではsolidという単位で定義します。めちゃくちゃ雑にいうと、基本、1処理1solidみたいな感じですね。

そして、作った処理はpipelineでつなげていきます。ただ、このコードでは、solidは1つしかないので、そのsolidを普通に呼び出すだけになっています。

作成した一連の処理の実行方法には色々ありますが、今回はDagsterをUIで操作できるdagitを使ってみます。dagitを起動するには、下記のコマンドを実行します。

$ dagit -f hello_cereal.py
...
Loading repository...
Serving on http://127.0.0.1:3000 in process xxxxx

ローカル内にdagitが立ち上がるので、表示されたIPを、Webブラウザで接続すると、DagsterのUIが表示されます。

画面には、定義されているsolidが表示されます。今回のsolidは1つだけなので、ぽつんと1つだけ存在しています。solidを選ぶと、処理の詳細情報を確認することができます。

処理の実行は、PlaygroundというメニューからLanch Executionというボタンを選択して行うことが出来ます。

Lanch Executionを選択すると、solidとして定義した処理が実際に実行されました。上記のコードを見てもらえばわかる通り、このsolidは、同ディレクトリに存在するcsvファイルを読み込み、件数がメッセージとともに出力されるという内容になります。処理結果は画面下部にログ形式で表示されます。

依存関係のある2つの処理を動かす

先程は単体のsolid一つだけでしたが、今度は2つのsolidを組み合わせてみます。最初のsolidでデータを取り出し、そのデータを2つめのsolidで処理する…という流れです。

import csv
import os

from dagster import execute_pipeline, pipeline, solid

@solid
def load_cereals(context):
    csv_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(csv_path, "r") as fd:
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(f"シリアルに関するデータは {len(cereals)} 件あります。".format())
    return cereals

@solid
def sort_by_calories(context, cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )

    context.log.info(f'一番カロリーが高いのは {sorted_cereals[-1]["name"]} です。')

@pipeline
def serial_pipeline():
    sort_by_calories(load_cereals())

※コードは公式チュートリアルのもので、メッセージ等だけ日本語に変えてます。

load_cerealsでcsvファイルを読み込み、データ部分だけを出力します。そして、sort_by_caloriesでそのデータを受け取り、caloriesというカラムでデータを並べ替え、一番上にきたデータをメッセージとして出力します。

sort_by_caloriesは、load_cerealsの返り値があることが前提の処理となっています。言い換えれば、sort_by_caloriesload_cerealsに依存していることになります。データパイプライン等の、データの一連の処理というのは、往々にして、各処理がこのような依存関係になっています。

各々の処理(solid)に対する依存関係は、Dagsterではpipelineという部分で表現します。例えば、今回の処理は、sort_by_calories(load_cereals())と記述します。

dagitで読みこむと、下記のようになります。依存関係が視覚的に表現されていますね。

実際に実行してみました。結果も正しく出力されています。

並行処理

今度は、2つのsolidを並行処理するパターンをやってみます。チュートリアルのコードはこちら(一部だけ日本語に変えてます)。

import csv
import os

from dagster import execute_pipeline, pipeline, solid

@solid
def load_cereals(_):
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        cereals = [row for row in csv.DictReader(fd)]
    return cereals

@solid
def sort_by_calories(_, cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )
    most_calories = sorted_cereals[-1]["name"]
    return most_calories

@solid
def sort_by_protein(_, cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["protein"])
    )
    most_protein = sorted_cereals[-1]["name"]
    return most_protein

@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"一番カロリーが高いのは {most_calories} です")
    context.log.info(f"一番プロテインが含まれているのは {most_protein} です")

@pipeline
def complex_pipeline():
    cereals = load_cereals()
    display_results(
        most_calories=sort_by_calories(cereals),
        most_protein=sort_by_protein(cereals),
    )

ここらへんまでくると、とりあえずdagitで見たほうが処理の流れがわかりやすいので、先に見てみましょう。

csvファイルを読み込んだ後(load_cereals)、先程のカロリー別のソートに加えて、もう一つのソート処理が増えています(sort_by_protein)。これは、sort_by_caloriesと並行して処理されるようになっています。そしてこの2つの処理の返り値を受け取って、最後にdisplay_resultsが実行されるようになっています。

2つのソート処理は、両方ともload_cerealsに依存しています。しかし、ソート処理同士は依存していません。ですので、この2つの処理は並行して実行されるというわけです。最後のdisplay_resultsは、2つのソート処理に依存しています。これをpipelineで表現すると、上記のようになります。

実行結果は下記の通り。

おわりに

Dagsterは「データオーケストレーションツール」ということで、作成した処理をどういう風に運用するか等の観点での機能も色々あります。また、処理の開発方法についても(solidや他の概念など)、今回はほんの基本レベルしか触れられていません。さらに、本番運用する場合は、別途本番環境にデプロイする必要がありますが、その方法についても色々あります。

Dagsterについても、色々試してみては、ブログに残していきたいですね。