この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
個人的に気になっていたデータオーケストレーションツールのDagster。
このブログでは、以下の2つを実施していきます。
- サンプルコードのまま、DagsterをKubernetes環境で構築する
- 独自のパイプラインを定義したイメージをデプロイし、Dagsterの環境を更新する
準備
DagsterのKubernetesデプロイには、パッケージマネージャーのHelmを使用しています。
この公式ドキュメントの中にインストール方法についてのREADMEがあったので、こちらを参考に構築を進めていきます。
- 環境
- macOS Big Sur 11.5.1
- Docker Desktop v3.5.2
- Kubernetes on Docker Desktop v1.21.2
Helmの環境がなかったため、Homebrewでインストールしておきます。
$ brew install helm
$ helm version
version.BuildInfo{Version:"v3.6.3", GitCommit:"d506314abfb5d21419df8c7e7e68012379db2354", GitTreeState:"dirty", GoVersion:"go1.16.5"}
手順
Helmを使えば、GitHubに上がっているソースからそのままインストールできるみたいです。やってみます。
$ helm repo add dagster https://dagster-io.github.io/helm
"dagster" has been added to your repositories
$ helm install dagster-test dagster/dagster \
--namespace dagster \
--create-namespace
NAME: dagster-test
LAST DEPLOYED: Thu Sep 9 14:46:34 2021
NAMESPACE: dagster
STATUS: deployed
REVISION: 1
NOTES:
Launched. You can access Dagit by running the following commands:
export DAGIT_POD_NAME=$(kubectl get pods --namespace dagster -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster-test,component=dagit" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to open Dagit"
kubectl --namespace dagster port-forward $DAGIT_POD_NAME 8080:80
Launched. You can access Dagit by running the following commands:
とあるので、実行してhttp://127.0.0.1:8080にアクセスします。
$ export DAGIT_POD_NAME=$(kubectl get pods --namespace dagster -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster-test,component=dagit" -o jsonpath="{.items[0].metadata.name}")
$ echo "Visit http://127.0.0.1:8080 to open Dagit"
$ kubectl --namespace dagster port-forward $DAGIT_POD_NAME 8080:80
立ち上がりました!コンテナ環境でも爆速でデプロイできました。
Dagsterのチュートリアルが気になる方は、こちらの記事をご参照ください。
コンポーネントの構成
先のデプロイで使用した各種パラメータは、values.yamlというHelmで使用するYAMLに記載されています。このパラメータを調節することで、導入したい環境に合わせたDagsterを構築することができます。下図のコンポーネントアーキテクチャ別に分類すると、以下のように分けられそうです。
※出典: Deploying with Helm | Dagster
Daemon
- Dagster Daemon
- キューに実行を作成したり、スケジュールやセンサーを稼働させるデーモン?
- Scheduler
- ジョブのスケジューラー
- Run Launcher
- 実行状況を制御し、Run Workerのジョブを作成する
Dagit
- Dagit
- User Code DeploymentのgRPCサーバーとやり取りするWebサーバー
Database
- PostgreSQL
- 実行状態や履歴などを保管するDB
Run Worker
- Pipeline Run
- ジョブの実行を担う?
User Code Deployment
- User Code Deployments
- DagitやDagsterがリポジトリ情報や現在のイメージ情報を取得するためのgRPCサーバーを構築する
- ここにユーザーが定義するパイプラインやジョブが含まれている
その他
- Compute Log Manager
- Dagsterは処理中の中間データ・ファイルをS3等に配置できるが、それを管理する機能?
- 先のデプロイでは使用されていない
- RabbitMQ
- Celeryのバックエンドとなるメッセージブローカー
- Redis
- Celeryのバックエンドとなるメッセージブローカー
- Flower
- Celery用のWebインターフェイス
- Ingress
- DagitやFlower用のIngress
- busybox
- 接続状態の監視を行うbusybox
上の概観的に、ユーザーはUser Code Deploymentに対して、ジョブやパイプラインの定義を更新してい流れになりそう。
というわけで、話はBuild Docker image for User Code | Dagsterに繋がってきます。
独自のパイプラインのデプロイ
パイプラインデプロイの前に、kubectlにconfigを設定します。
$ kubectl config set-context dagster --namespace default --cluster docker-desktop --user=docker-desktop
Context "dagster" created.
$ kubectl config get-contexts
CURRENT NAME CLUSTER AUTHINFO NAMESPACE
dagster docker-desktop docker-desktop default
* docker-desktop docker-desktop docker-desktop
$ kubectl config use-context dagster
Switched to context "dagster".
$ kubectl config current-context
dagster
自身で実装したパイプラインをデプロイするには、まずそのパイプラインコードが含まれているDagsterリポジトリのDockerイメージをビルドする必要があります。
サンプルで用意されているDockerfileはリポジトリの以下の位置にいます。
k8s-example/
├ build_cache/
│ └ example_project/
│ ├ example_repo/
| | └ repo.py
│ ├ run_config/
| | ├ celery_k8s.yaml
| | ├ celery_k8s_grpc.yaml
| | └ pipeline.yaml
│ └ workspace.yaml
├ Dockerfile <- ココ
├ last_updated.yaml
└ versions.yaml
このDockerfile
ではPythonライブラリのインストールや、build_cache
ディレクトリをコピーする処理を行なっています。
ARG BASE_IMAGE
FROM "${BASE_IMAGE}"
ARG DAGSTER_VERSION
# ==> Add Dagster layer
RUN \
# Cron
apt-get update -yqq \
&& apt-get install -yqq cron \
# Dagster
&& pip install \
dagster==${DAGSTER_VERSION} \
dagster-postgres==${DAGSTER_VERSION} \
dagster-celery[flower,redis,kubernetes]==${DAGSTER_VERSION} \
dagster-aws==${DAGSTER_VERSION} \
dagster-k8s==${DAGSTER_VERSION} \
dagster-celery-k8s==${DAGSTER_VERSION} \
# Cleanup
&& rm -rf /var \
&& rm -rf /root/.cache \
&& rm -rf /usr/lib/python2.7 \
&& rm -rf /usr/lib/x86_64-linux-gnu/guile
# ==> Add user code layer
# Example pipelines
COPY build_cache/ /
要は build_cache
配下にパイプラインのスクリプトを構築しておけば、パイプラインのイメージが作成できます。今回はrepo.py
の中身をA More Complex DAGの内容で上書きしてビルドします。
import csv
import requests
from dagster import pipeline, solid
@solid
def download_cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@solid
def find_highest_calorie_cereal(cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["calories"])
)
return sorted_cereals[-1]["name"]
@solid
def find_highest_protein_cereal(cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["protein"])
)
return sorted_cereals[-1]["name"]
@solid
def display_results(context, most_calories, most_protein):
context.log.info(f"Most caloric cereal: {most_calories}")
context.log.info(f"Most protein-rich cereal: {most_protein}")
@pipeline
def complex_pipeline():
cereals = download_cereals()
display_results(
most_calories=find_highest_calorie_cereal(cereals),
most_protein=find_highest_protein_cereal(cereals),
)
Dockerイメージのビルド先ですが、今回はDocker HubにPublicで上げて進めていきます。Docker Hubにログイン後、Create Repository
をクリックします。
Public
を選択し、ポジトリ名をdagster-k8s-demo
として作成します。
BASE_IMAGE
とDAGSTER_VERSION
を引数に渡しながらビルドします。pushすればDockerHubにイメージが上がっているはずです。
$ docker build -t tharuta/dagster-user-code:0.1 . --build-arg BASE_IMAGE=python:3.7.8-slim --build-arg DAGSTER_VERSION=0.12.9
$ docker push tharuta/dagster-user-code:0.1
Using default tag: latest
The push refers to repository [docker.io/tharuta/dagster-user-code]
d8c09038a574: Pushed
c0eae5bac8b2: Pushed
880d8e38a8e4: Mounted from dagster/dagster-celery-k8s
23746fb81c22: Mounted from dagster/dagster-celery-k8s
2de5ba74fd9a: Mounted from dagster/dagster-celery-k8s
2b99e2403063: Mounted from dagster/dagster-celery-k8s
d0f104dc0a1f: Mounted from dagster/dagster-celery-k8s
latest: digest: sha256:8ee1e4a2d8292e41e78e5f5610bea9de4346df3b023afb20544837f2f31fb832 size: 1789
続いてhelmの設定ファイルの user-deployments
の箇所を、先ほどのDockerイメージで上書きします。現在のvalues.yaml
の内容を以下のコマンドで出力させます。
$ helm show values dagster/dagster > values.yaml
values.yaml
の内、以下のrepository
とtag
を書き換えます。
# values.yaml
dagster-user-deployments:
# Creates a workspace file with the gRPC servers hosting your user code.
enabled: true
# If you plan on deploying user code in a separate Helm release, set this to false.
enableSubchart: true
# List of unique deployments
deployments:
- name: "k8s-example-user-code-1"
image:
# When a tag is not supplied, it will default as the Helm chart version.
repository: "docker.io/tharuta/dagster-user-code"
tag: latest
values.yaml
を使用して、helmでpodsをアップデートします。podsが正常に稼働していることを確認したら、環境変数とポートフォワードを再起動します。
$ helm upgrade --install dagster dagster/dagster -f values.yaml
Release "dagster" has been upgraded. Happy Helming!
NAME: dagster
LAST DEPLOYED: Thu Sep 9 15:43:10 2021
NAMESPACE: default
STATUS: deployed
REVISION: 4
NOTES:
Launched. You can access Dagit by running the following commands:
export DAGIT_POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to open Dagit"
kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
dagster-daemon-5dbc58d6fc-6k76j 1/1 Running 0 29m
dagster-dagit-5dc77c6f98-s5wps 1/1 Running 0 7m39s
dagster-dagster-user-deployments-k8s-example-user-code-1-b2ps7s 1/1 Running 0 7m39s
dagster-postgresql-0 1/1 Running 0 29m
$ export DAGIT_POD_NAME=$(kubectl get pods --namespace default -l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" -o jsonpath="{.items[0].metadata.name}")
$ kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
無事意図したパイプラインに更新できました!
この仕組みによって、様々なユーザーがそれぞれパイプラインを記述しても、DagsterのUser Code Deploymentの部分だけ更新すればいい点で、サービス全体を疎結合に保つことができます。これは便利!
次のステップ
次回はこの辺りをやってみようかなと考えています。
- dbtとインテグレーションしてみる
- 本番のワークロードを踏まえた上で、AWSのEKSにデプロイ
- 本番のワークロードを踏まえた上で、GCPのGKEでデプロイ