Digdagにおけるrequireオペレータ使用時のretry挙動を理解する

2022.10.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部のueharaです。

今回はDigdagでrequireオペレータを使用したときのretryの挙動が気になったので、その確認をしてみたいと思います。

DigdagのWebUIでいうと、下記イメージの赤枠部分になります。

requireオペレータとは?

Digdagの公式ドキュメントを確認すると次のように説明されています。

require> operator requires completion of another workflow. This operator is similar to call> operator, but this operator doesn’t start the other workflow if it’s already running or has done for the same session time of this workflow. If the workflow is running or newly started, this operator waits until it completes. In addition, require operator can kick the another project’s workflow.

まとめると、requireオペレータ使用時の挙動は次の通りになります。

  • 別のワークフローの完了を要求する
  • ワークフローが既に実行されていたり完了済であれば開始しない(callオペレータとの違い)
  • ワークフローが実行中、または新しく開始した場合は完了するまで待機する

requireオペレータでretryがカウントされていく理由

結論から言うと、ポーリングをしているからになります。

先程requireオペレータの挙動として「ワークフローが実行中、または新しく開始した場合は完了するまで待機する」と記載しましたが、Digdagではこれをポーリングすることにより確認をしています。

したがって、完了確認のためポーリングをした回数がそのまま「retry」としてカウントされることになります。

ポーリングのタイミング

ただし、このretry回数を見ていると、どうやらポーリングの間隔が一定ではないように見えます。

具体的には、requireオペレータでの実行直後はポーリング間隔が短く、10秒程度経つとその間隔が一定になっています。

これも結論から言うと、ポーリング間隔に「Exponential Backoff(指数バックオフ)」というアルゴリズムを取り入れているからになります。

簡単に言うと、徐々にポーリング間隔を長く設定するアルゴリズムです。

古くから主に通信制御で用いられることが多い手法で、AWS SDKで実装されていたりもします。

実際にDigdagのソースを確認すると、ポーリング間隔は次のように設定されています。

int interval = (int) Math.min(1 * Math.pow(2, iteration), MAX_TASK_RETRY_INTERVAL);

ここでMAX_TASK_RETRY_INTERVAL10で設定されているため、ポーリング間隔は1秒、2秒、4秒、8秒と指数関数的に増えていき、以後は10秒で固定となります。

実際に確認してみる

実際にプログラムを書いて確認してみます。

構成としては、 sample.dig から mytask.dig を呼び、 mytask.dig は60秒待機するPythonスクリプト test_task.py を呼び出すとします。

sample.dig

timezone: Asia/Tokyo

+do_mytask:
  require>: mytask

mytask.dig

timezone: Asia/Tokyo

+do_test_task:
  py>: test_task.do_task

test_task.py

import time

def do_task():
    print("start sleep")
    time.sleep(60)
    print("done")

60秒で完了するプログラムであれば、理論値としては8回のretryがカウントされるはずです。

実行結果

sample.dig を実行したところ、理論値の通り、8回のretryを確認することができました!

最後に

今回はrequireオペレータを使用したときのretryの挙動について確認を行いました。

Digdagの公式ドキュメンでは(私が確認した限りでは)この辺の仕様については記載が無かったため、同じように疑問をもった方の参考になると幸いです。

参考

digdag/RequireOperatorFactory.java

Digdag 0.10.4 documentation