DagsterをKubernetes環境で構築してみた

個人的に気になっていたデータオーケストレーションツールのDagster

このブログでは、以下の2つを実施していきます。

  1. サンプルコードのまま、DagsterをKubernetes環境で構築する
  2. 独自のパイプラインを定義したイメージをデプロイし、Dagsterの環境を更新する

準備

DagsterのKubernetesデプロイには、パッケージマネージャーのHelmを使用しています。

この公式ドキュメントの中にインストール方法についてのREADMEがあったので、こちらを参考に構築を進めていきます。

  • 環境
    • macOS Big Sur 11.5.1
    • Docker Desktop v3.5.2
    • Kubernetes on Docker Desktop v1.21.2

Helmの環境がなかったため、Homebrewでインストールしておきます。

$ brew install helm

$ helm version
version.BuildInfo{Version:"v3.6.3", GitCommit:"d506314abfb5d21419df8c7e7e68012379db2354", GitTreeState:"dirty", GoVersion:"go1.16.5"}

手順

Helmを使えば、GitHubに上がっているソースからそのままインストールできるみたいです。やってみます。

$ helm repo add dagster https://dagster-io.github.io/helm
"dagster" has been added to your repositories

$ helm install dagster-test dagster/dagster \
    --namespace dagster \
    --create-namespace

NAME: dagster-test
LAST DEPLOYED: Thu Sep  9 14:46:34 2021
NAMESPACE: dagster
STATUS: deployed
REVISION: 1
NOTES:
Launched. You can access Dagit by running the following commands:

export DAGIT_POD_NAME=$(kubectl get pods --namespace dagster -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster-test,component=dagit" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to open Dagit"
kubectl --namespace dagster port-forward $DAGIT_POD_NAME 8080:80

Launched. You can access Dagit by running the following commands: とあるので、実行してhttp://127.0.0.1:8080にアクセスします。

$ export DAGIT_POD_NAME=$(kubectl get pods --namespace dagster -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster-test,component=dagit" -o jsonpath="{.items[0].metadata.name}")

$ echo "Visit http://127.0.0.1:8080 to open Dagit"

$ kubectl --namespace dagster port-forward $DAGIT_POD_NAME 8080:80

立ち上がりました!コンテナ環境でも爆速でデプロイできました。

Dagsterのチュートリアルが気になる方は、こちらの記事をご参照ください。

コンポーネントの構成

先のデプロイで使用した各種パラメータは、values.yamlというHelmで使用するYAMLに記載されています。このパラメータを調節することで、導入したい環境に合わせたDagsterを構築することができます。下図のコンポーネントアーキテクチャ別に分類すると、以下のように分けられそうです。

※出典: Deploying with Helm | Dagster

Daemon

  • Dagster Daemon
    • キューに実行を作成したり、スケジュールやセンサーを稼働させるデーモン?
  • Scheduler
    • ジョブのスケジューラー
  • Run Launcher
    • 実行状況を制御し、Run Workerのジョブを作成する

Dagit

  • Dagit
    • User Code DeploymentのgRPCサーバーとやり取りするWebサーバー

Database

  • PostgreSQL
    • 実行状態や履歴などを保管するDB

Run Worker

  • Pipeline Run
    • ジョブの実行を担う?

User Code Deployment

  • User Code Deployments
    • DagitやDagsterがリポジトリ情報や現在のイメージ情報を取得するためのgRPCサーバーを構築する
    • ここにユーザーが定義するパイプラインやジョブが含まれている

その他

  • Compute Log Manager
    • Dagsterは処理中の中間データ・ファイルをS3等に配置できるが、それを管理する機能?
    • 先のデプロイでは使用されていない
  • RabbitMQ
    • Celeryのバックエンドとなるメッセージブローカー
  • Redis
    • Celeryのバックエンドとなるメッセージブローカー
  • Flower
    • Celery用のWebインターフェイス
  • Ingress
    • DagitやFlower用のIngress
  • busybox
    • 接続状態の監視を行うbusybox

上の概観的に、ユーザーはUser Code Deploymentに対して、ジョブやパイプラインの定義を更新してい流れになりそう。

というわけで、話はBuild Docker image for User Code | Dagsterに繋がってきます。

独自のパイプラインのデプロイ

パイプラインデプロイの前に、kubectlにconfigを設定します。

$ kubectl config set-context dagster --namespace default --cluster docker-desktop --user=docker-desktop
Context "dagster" created.
$ kubectl config get-contexts
CURRENT   NAME             CLUSTER          AUTHINFO         NAMESPACE
          dagster          docker-desktop   docker-desktop   default
*         docker-desktop   docker-desktop   docker-desktop 
$ kubectl config use-context dagster
Switched to context "dagster".
$ kubectl config current-context
dagster

自身で実装したパイプラインをデプロイするには、まずそのパイプラインコードが含まれているDagsterリポジトリのDockerイメージをビルドする必要があります。

サンプルで用意されているDockerfileはリポジトリの以下の位置にいます。

k8s-example/
 ├ build_cache/
 │ └ example_project/
 │   ├ example_repo/
 |   |  └ repo.py
 │   ├ run_config/
 |   |  ├ celery_k8s.yaml
 |   |  ├ celery_k8s_grpc.yaml
 |   |  └ pipeline.yaml
 │   └ workspace.yaml
 ├ Dockerfile                 <- ココ
 ├ last_updated.yaml
 └ versions.yaml

このDockerfileではPythonライブラリのインストールや、build_cacheディレクトリをコピーする処理を行なっています。

ARG BASE_IMAGE
FROM "${BASE_IMAGE}"

ARG DAGSTER_VERSION

# ==> Add Dagster layer
RUN \
# Cron
       apt-get update -yqq \
    && apt-get install -yqq cron \
# Dagster
    && pip install \
        dagster==${DAGSTER_VERSION} \
        dagster-postgres==${DAGSTER_VERSION} \
        dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
        dagster-aws==${DAGSTER_VERSION} \
        dagster-k8s==${DAGSTER_VERSION} \
        dagster-celery-k8s==${DAGSTER_VERSION} \
# Cleanup
    &&  rm -rf /var \
    &&  rm -rf /root/.cache  \
    &&  rm -rf /usr/lib/python2.7 \
    &&  rm -rf /usr/lib/x86_64-linux-gnu/guile

# ==> Add user code layer
# Example pipelines
COPY build_cache/ /

要は build_cache 配下にパイプラインのスクリプトを構築しておけば、パイプラインのイメージが作成できます。今回はrepo.pyの中身をA More Complex DAGの内容で上書きしてビルドします。

import csv
import requests

from dagster import pipeline, solid


@solid
def download_cereals():
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    return [row for row in csv.DictReader(lines)]


@solid
def find_highest_calorie_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["calories"])
    )
    return sorted_cereals[-1]["name"]


@solid
def find_highest_protein_cereal(cereals):
    sorted_cereals = list(
        sorted(cereals, key=lambda cereal: cereal["protein"])
    )
    return sorted_cereals[-1]["name"]


@solid
def display_results(context, most_calories, most_protein):
    context.log.info(f"Most caloric cereal: {most_calories}")
    context.log.info(f"Most protein-rich cereal: {most_protein}")


@pipeline
def complex_pipeline():
    cereals = download_cereals()
    display_results(
        most_calories=find_highest_calorie_cereal(cereals),
        most_protein=find_highest_protein_cereal(cereals),
    )

Dockerイメージのビルド先ですが、今回はDocker HubにPublicで上げて進めていきます。Docker Hubにログイン後、Create Repositoryをクリックします。

Publicを選択し、ポジトリ名をdagster-k8s-demoとして作成します。

BASE_IMAGEDAGSTER_VERSIONを引数に渡しながらビルドします。pushすればDockerHubにイメージが上がっているはずです。

$ docker build -t tharuta/dagster-user-code:0.1 . --build-arg BASE_IMAGE=python:3.7.8-slim --build-arg DAGSTER_VERSION=0.12.9

$ docker push tharuta/dagster-user-code:0.1
Using default tag: latest
The push refers to repository [docker.io/tharuta/dagster-user-code]
d8c09038a574: Pushed 
c0eae5bac8b2: Pushed 
880d8e38a8e4: Mounted from dagster/dagster-celery-k8s 
23746fb81c22: Mounted from dagster/dagster-celery-k8s 
2de5ba74fd9a: Mounted from dagster/dagster-celery-k8s 
2b99e2403063: Mounted from dagster/dagster-celery-k8s 
d0f104dc0a1f: Mounted from dagster/dagster-celery-k8s 
latest: digest: sha256:8ee1e4a2d8292e41e78e5f5610bea9de4346df3b023afb20544837f2f31fb832 size: 1789

続いてhelmの設定ファイルの user-deployments の箇所を、先ほどのDockerイメージで上書きします。現在のvalues.yamlの内容を以下のコマンドで出力させます。

$ helm show values dagster/dagster > values.yaml

values.yamlの内、以下のrepositorytagを書き換えます。

# values.yaml

dagster-user-deployments:
  # Creates a workspace file with the gRPC servers hosting your user code.
  enabled: true

  # If you plan on deploying user code in a separate Helm release, set this to false.
  enableSubchart: true

  # List of unique deployments
  deployments:
    - name: "k8s-example-user-code-1"
      image:
        # When a tag is not supplied, it will default as the Helm chart version.
        repository: "docker.io/tharuta/dagster-user-code"
        tag: latest

values.yamlを使用して、helmでpodsをアップデートします。podsが正常に稼働していることを確認したら、環境変数とポートフォワードを再起動します。

$ helm upgrade --install dagster dagster/dagster -f values.yaml
Release "dagster" has been upgraded. Happy Helming!
NAME: dagster
LAST DEPLOYED: Thu Sep  9 15:43:10 2021
NAMESPACE: default
STATUS: deployed
REVISION: 4
NOTES:
Launched. You can access Dagit by running the following commands:

export DAGIT_POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to open Dagit"
kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80

$ kubectl get pods
NAME                                                              READY   STATUS    RESTARTS   AGE
dagster-daemon-5dbc58d6fc-6k76j                                   1/1     Running   0          29m
dagster-dagit-5dc77c6f98-s5wps                                    1/1     Running   0          7m39s
dagster-dagster-user-deployments-k8s-example-user-code-1-b2ps7s   1/1     Running   0          7m39s
dagster-postgresql-0                                              1/1     Running   0          29m

$ export DAGIT_POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" -o jsonpath="{.items[0].metadata.name}")
$ kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80

無事意図したパイプラインに更新できました!

この仕組みによって、様々なユーザーがそれぞれパイプラインを記述しても、DagsterのUser Code Deploymentの部分だけ更新すればいい点で、サービス全体を疎結合に保つことができます。これは便利!

次のステップ

次回はこの辺りをやってみようかなと考えています。

  • dbtとインテグレーションしてみる
  • 本番のワークロードを踏まえた上で、AWSのEKSにデプロイ
  • 本番のワークロードを踏まえた上で、GCPのGKEでデプロイ

参照