とりあえずexportしない!Digdag変数・パラメータの使い方全種

こんにちは。DA事業本部の春田です。

Digdagで変数やパラメータを定義する時は _export するのが一般的かと思いますが、他にも色々やり方があるのでご紹介していきます。環境はGitHubに上げました。

もくじ

Digdagの変数・パラメータ

Digdag公式ドキュメントのConcepts — Export and store parametersによると、標準的なパラメータは3種類あります。

  • local
    • (digファイルで + をつけて定義する)タスクに直接セットするパラメータ
  • export
    • 親タスクから出力されるパラメータ
  • store
    • 前のタスクで保存されたパラメータ

このうち、 local が最も優先度が高いパラメータで、同名の exportstore パラメータは上書きされます。 exportstore は同じ優先度で、後に定義したもので上書きします。 export は親タスク配下にしか影響が及ばない一方、 store は後続のタスクにもパラメータが適応されます。また、 store はグローバルな変数ではないため、同じ名前の store パラメータを持つ2つのタスクが並列で実行されても、タスク間のパラメータで影響はありません。この場合、最後に終了したタスクのパラメータが、後続のタスクに適応されます。

また、 digdag serverdigdag run を起動するときに --params-file オプションで指定できるJSON/YAMLファイル形式のパラメータもあります。こちらはコマンドを叩く時にオプションとして設定するものなので、上の3種のパラメータの外のスコープにあります。digファイル内の上記のパラメータによって値を上書くことはできますが、JSON/YAMLファイルの中身自体は上書きされません。

その他にも、 Param Server という外部DBにパラメータを格納させるタイプもあります。こちらはTTLが90日と設定されている範囲でパラメータを保存しておくことが可能です。使用にはPostgreSQLかRedisのセットアップが必要です。

param_set>: Set a value into a ParamServer as persistent data — Digdag 0.9.41 documentation

パラメータの取得・保存方法

digファイル上

digファイル上でパラメータを取得するには、 ${variable} のように ${ } を使います。パラメータをlocalとして定義する場合は、 operators> と同じ階層で定義します。exportパラメータとして定義する場合は、 +taskoperators> と同じ階層に _export を記述します。一方、storeパラメータは operators> の処理の中で定義します。Pythonプログラムの場合は、 digdag.env.store() メソッドを使用します。

# 例
+print_var_parent:
  _export:
    var1: Apple
    var2: Orange
  +print_var_child1:
    echo> ${var1}
  +print_var_child2:
    py>: print_var.hello_world
    var1: Updated Apple by local
# https://docs.digdag.io/workflow_definition.html#using-api
import digdag

class MyWorkflow(object):
  def prepare(self):
    digdag.env.store({"my_param": 2})

--params-file オプションはその名の通り、JSON/YAMLファイルにパラメータを定義し、 digdag コマンドを叩く際にオプションで指定します。上と同様、 ${variable} で取得できます。

Param Server は、専用のParam operators param_get>param_set> を用いてパラメータを保存・取得します。一度 param_get> したパラメータはstoreパラメータとして、後続ジョブにおいて ${variable} で取得することができます。

+print_var_parent:
  _export:
    var1: Apple
  +set_value:
    # Key=var2 Value=OrangeでDBに保存
    param_set>:
    var2: Orange
  +get_value:
    # DB側でKey=var2のパラメータを、storeパラメータとして定義
    param_get>:
    any_key_name: var2 # any_key_name自体はダミーでしかなく、パラメータに対して何ら影響を及ぼさない
  +print_var_child:
    py>: print_var.hello_world

Pythonファイル上

Pythonファイルで変数を取得する方法は2つあります。一つは、 py> で呼び出す関数の 引数 に、渡したい同名パラメータを定義する方法です。もう一つは、Python Digdag APIの digdag.env.params メソッドを使う方法です。前者で呼び出すと、Pythonファイルでdigdagライブラリをimportする必要がなくなり、PythonファイルとDigdagをゆるやかに結合することができます。一方後者は、 session_id などのビルトイン変数もまとめて取得することができるので、ワークフローのメタ情報をPythonで扱いたい場合に便利です。どちらも一長一短ですね。

import digdag
import json

def hello_world(var1, var2):
    print("Hello World, {}".format(var1))
    print("Hello World, {}".format(var2))
    param = digdag.env.params
    print(param['session_id'])

では、実際に確認していきます。

各種パラメータの検証

今回、基本となるPythonプログラムは以下です。 var1 var2 と、Digdag APIで取得できる全パラメータを出力します。 print_and_store_vars では、関数の最後で var2 にstoreパラメータを保存します。

import digdag
import json

def print_vars(var1, var2):
    print("Hello World, {}".format(var1))
    print("Hello World, {}".format(var2))
    print(json.dumps(digdag.env.params, indent=4))

def print_and_store_vars(var1, var2):
    print("Hello World, {}".format(var1))
    print("Hello World, {}".format(var2))
    print(json.dumps(digdag.env.params, indent=4))
    digdag.env.store({"var2": "Updated Orange by store"})

export local store

export、local、storeパラメータのそれぞれの挙動を確認します。 child2 では親タスクで _export されていたパラメータが、それぞれ上書きされて出力されましたが、 child3 のパラメータには影響を与えていません。また、 child3 で変更を加えた var2 は、 child4 で変更が維持されたままになっています。 child4 では、同じパラメータ var1 に対して、exportとlocalで上書きしていますが、結果的に最も優先度の高いlocalが出力されました。

+parent:
  _export:
    var1: Apple
    var2: Orange
  +child1:
    py>: print_var.print_vars
  +child2:
    _export:
      var2: Updated Orange by export
    py>: print_var.print_vars
    var1: Updated Apple by local
  +child3:
    py>: print_var.print_and_store_vars
  +child4:
    _export:
      var1: Updated Apple by export
    py>: print_var.print_vars
    var1: Updated Apple by local
[INFO] (8807@[0:new-project]+print_var+parent+child1) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Apple
Hello World, Orange
{
    "session_unixtime": 1583989342, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Apple", 
    "var2": "Orange", 
    "attempt_id": 23, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:02:22+00:00", 
    "session_id": 23, 
    "session_local_time": "2020-03-12 05:02:22", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child1"
}
[INFO] (8807@[0:new-project]+print_var+parent+child2) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Updated Apple by local
Hello World, Updated Orange by export
{
    "session_unixtime": 1583989342, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Updated Apple by local", 
    "var2": "Updated Orange by export", 
    "attempt_id": 23, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:02:22+00:00", 
    "session_id": 23, 
    "session_local_time": "2020-03-12 05:02:22", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child2"
}
[INFO] (8807@[0:new-project]+print_var+parent+child3) io.digdag.core.agent.OperatorManager: py>: print_var.print_and_store_vars
Hello World, Apple
Hello World, Orange
{
    "session_unixtime": 1583989342, 
    "py>": "print_var.print_and_store_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Apple", 
    "var2": "Orange", 
    "attempt_id": 23, 
    "_command": "print_var.print_and_store_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:02:22+00:00", 
    "session_id": 23, 
    "session_local_time": "2020-03-12 05:02:22", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child3"
}
[INFO] (8807@[0:new-project]+print_var+parent+child4) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Updated Apple by local
Hello World, Updated Orange by store
{
    "session_unixtime": 1583989342, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Updated Apple by local", 
    "var2": "Updated Orange by store", 
    "attempt_id": 23, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:02:22+00:00", 
    "session_id": 23, 
    "session_local_time": "2020-03-12 05:02:22", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child4"
}

--params-file

digdag server を立ち上げる際に、 --params-file で以下のYAMLファイルを渡しておきます。

var1: 'Apple from params file'
var2: 'Orange from params file'

実行するワークフローは、上のワークフローで親タスクで定義しているexportパラメータを抜いたものです。

+parent:
  +child1:
    py>: print_var.print_vars
  +child2:
    _export:
      var2: Updated Orange by export
    py>: print_var.print_vars
    var1: Updated Apple by local
  +child3:
    py>: print_var.print_and_store_vars
  +child4:
    _export:
      var1: Updated Apple by export
    py>: print_var.print_vars
    var1: Updated Apple by local

--params-fileで定義したパラメータは、ワークフロー横断的で、exportよりもグローバルになっていることが確認できます。

[INFO] (0174@[0:new-project]+print_var+parent+child1) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Apple from params file
Hello World, Orange from params file
{
    "session_unixtime": 1583990259, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Apple from params file", 
    "var2": "Orange from params file", 
    "attempt_id": 26, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:17:39+00:00", 
    "session_id": 26, 
    "session_local_time": "2020-03-12 05:17:39", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child1"
}
[INFO] (0174@[0:new-project]+print_var+parent+child2) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Updated Apple by local
Hello World, Updated Orange by export
{
    "session_unixtime": 1583990259, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Updated Apple by local", 
    "var2": "Updated Orange by export", 
    "attempt_id": 26, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:17:39+00:00", 
    "session_id": 26, 
    "session_local_time": "2020-03-12 05:17:39", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child2"
}
[INFO] (0174@[0:new-project]+print_var+parent+child3) io.digdag.core.agent.OperatorManager: py>: print_var.print_and_store_vars
Hello World, Apple from params file
Hello World, Orange from params file
{
    "session_unixtime": 1583990259, 
    "py>": "print_var.print_and_store_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Apple from params file", 
    "var2": "Orange from params file", 
    "attempt_id": 26, 
    "_command": "print_var.print_and_store_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:17:39+00:00", 
    "session_id": 26, 
    "session_local_time": "2020-03-12 05:17:39", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child3"
}
[INFO] (0174@[0:new-project]+print_var+parent+child4) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Updated Apple by local
Hello World, Updated Orange by store
{
    "session_unixtime": 1583990259, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Updated Apple by local", 
    "var2": "Updated Orange by store", 
    "attempt_id": 26, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:17:39+00:00", 
    "session_id": 26, 
    "session_local_time": "2020-03-12 05:17:39", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child4"
}

Param Server

Param Serverにパラメータを保存・取得するには、 param_set>param_get> を用います。

+parent:
  +child1:
    py>: print_var.print_and_store_vars
  +set_values:
    param_set>:
    var1: Updated Apple by param_set
  +child2:
    py>: print_var.print_vars
  +get_values:
    param_get>:
    var1: var1
  +child3:
    py>: print_var.print_vars

child2 の前に var1param_set していますが、この段階ではワークフロー上の var1 への影響はありません。 child2 後に param_get をすることで、 var1 はstoreパラメータとしてワークフロー上の var1 に反映され、 child3 で変更が確認できます。

[INFO] (0176@[0:new-project]+print_var+parent+child1) io.digdag.core.agent.OperatorManager: py>: print_var.print_and_store_vars
Hello World, Apple from params file
Hello World, Orange from params file
{
    "session_unixtime": 1583991151, 
    "py>": "print_var.print_and_store_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "4c26d3bd-32c5-4551-93b3-c92eb6bebad0", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Apple from params file", 
    "var2": "Orange from params file", 
    "attempt_id": 27, 
    "_command": "print_var.print_and_store_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:32:31+00:00", 
    "session_id": 27, 
    "session_local_time": "2020-03-12 05:32:31", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child1"
}
[INFO] (0176@[0:new-project]+print_var+parent+child2) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Apple from params file
Hello World, Updated Orange by store
{
    "session_unixtime": 1583991151, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "4c26d3bd-32c5-4551-93b3-c92eb6bebad0", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Apple from params file", 
    "var2": "Updated Orange by store", 
    "attempt_id": 27, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:32:31+00:00", 
    "session_id": 27, 
    "session_local_time": "2020-03-12 05:32:31", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child2"
}
[INFO] (0176@[0:new-project]+print_var+parent+child3) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars
Hello World, Updated Apple by param_set
Hello World, Updated Orange by store
{
    "session_unixtime": 1583991151, 
    "py>": "print_var.print_vars", 
    "last_executed_session_tz_offset": "+0000", 
    "last_executed_session_time": "", 
    "timezone": "UTC", 
    "last_executed_session_date": "", 
    "session_tz_offset": "+0000", 
    "session_uuid": "4c26d3bd-32c5-4551-93b3-c92eb6bebad0", 
    "project_id": 1, 
    "last_executed_session_date_compact": "", 
    "_type": "py", 
    "var1": "Updated Apple by param_set", 
    "var2": "Updated Orange by store", 
    "attempt_id": 27, 
    "_command": "print_var.print_vars", 
    "session_date": "2020-03-12", 
    "last_executed_session_unixtime": "", 
    "last_executed_session_local_time": "", 
    "session_time": "2020-03-12T05:32:31+00:00", 
    "session_id": 27, 
    "session_local_time": "2020-03-12 05:32:31", 
    "session_date_compact": "20200312", 
    "task_name": "+print_var+parent+child3"
}

最後に

普段Digdagを使う上ではあまり気にしない仕様ですが、知っておくと生成データやステータス情報を受け渡す時にかなり役立つかと思います。GitHubにDocker環境ごと上げてますので、ぜひ試してみてください。

参照