[dagster] solidの中で使われる特定の値をパラメータ化する

はじめてpandasってやつを触りました
2021.06.01

大阪オフィスの玉井です。

dagsterは、パイプラインの実行時に指定した値を使用して、データ処理を流すことができます。

やってみた

環境

  • macOS Big Sur 11.3.1
  • dagster 0.10.9

パラメータ化する

下記のような処理で考えてみます。

@solid
def download_csv(context):
    df = pd.read_csv("https://www.police.pref.nara.jp/cmsfiles/contents/0000004/4634/nara_2020zidouhanbaikinerai.csv", encoding="shift_jis")
    return df

ファイルのダウンロード先のURL自体がハードコーディングされています。これでも問題なく動きますが、場合によっては、URL自体が変わることも考えられます。その度にコードを修正するのもアレですので、あまりURLベタ書きは望ましく有りません。

dagsterの場合、下記のように書くことができます。

@solid(config_schema={"url": str})
def download_csv(context):
    df = pd.read_csv(context.solid_config["url"], encoding="shift_jis")
    return df

こう記述することで、urlという変数に格納されるURLを、ファイルのダウンロード先として参照するようにできます。また、@solidの横のconfig_schemaという記述で、指定する値の型を指定することができます(今回はURLを想定しているので文字列を期待する)。

この変数に実際に入れる値はどうするのか?というのは、dagitのUI上で指定します。

パラメータを指定してパイプラインを動かす

今回は下記のパイプラインを動かしてみます。

download_nara.py

import pandas as pd
from dagster import execute_pipeline, pipeline, solid


@solid(config_schema={"url": str})
def download_csv(context):
    df = pd.read_csv(context.solid_config["url"], encoding="shift_jis")
    return df


@solid
def count_municipal(context, df):
    result = df['市区町村(発生地)'].value_counts().index[0]
    context.log.info(f'このデータ上で一番犯罪が多かった奈良県の市区町村は{result}です')


@pipeline
def configurable_pipeline():
    count_municipal(download_csv())

dagitを実行します。

dagit -f donwload_nara.py

playrgroundというタブを開きます。

ここに、urlという変数に入れる値を設定します。設定の仕方は、下記のように、yaml形式で記述します。

solids:
  download_csv:
    config:
      url: "https://www.police.pref.nara.jp/cmsfiles/contents/0000004/4634/nara_2020zidouhanbaikinerai.csv

パイプラインを実行します。ちゃんと、出力が出てきましたね。

playrgroundタブのConfigで、ファイルのダウンロードURLを別のものに変えれば、当然、出力も変わります(出力メッセージを変えています)。

おわりに

この機能を使うことで、より色々な人が汎用的に使えるパイプラインを組むことが出来るようになります。

参考