Digdag オペレーター実践:外部スクリプトの実行 #digdag

treasure-data-logo

Digdagを理解してみるシリーズ、当エントリからは『オペレータ』に関するものを幾つか見て行きたいと思います。Digdagにおける『オペレータ』とは、タスクを実行する事が出来る仕組みとなります。

実行環境について

環境については下記エントリで状況を整え、Python及びRubyの環境がある事を前提とします。

$ python --version
Python 3.5.2
$ ruby --version
ruby 2.0.0p648 (2015-12-16) [x86_64-linux]

そして下記内容より当エントリで紹介する『オペレータ』の紹介となります。

sh>: - シェルスクリプトの実行

シェルスクリプト及びコマンドを実行する際に使用します。

以下の形でコマンドを実行する事が出来ます。

+step1:
  sh>: echo "hello world"

また、以下の形でシェルスクリプトファイルを指定する事で処理を実行させる事も可能です。

+step1:
  sh>: tasks/step1.sh
+step2:
  sh>: tasks/step2.sh

シェル実行は、デフォルトでは/bin/shを使いますが、もしzshを使いたいという要望がある場合は以下の形で_exportセクションで設定を行っておく事で利用が可能となります。

_export:
  sh:
    shell: [/usr/bin/zsh]

py>: - Pythonスクリプトの実行

Pythonスクリプトを、pythonコマンドを使って実行します。キーワード引数への変数マッピングを含む詳細については、Python APIドキュメントを参照してください。

+step1:
  py>: my_step1_method
+step2:
  py>: tasks.MyWorkflow.step2

Pythonの場合、py>: [PACKAGE.CLASS.]METHODの形で指定を行う事で所定スクリプトの指定メソッドを実行させる事も可能となります。以下の様な構成で

exec-python-method.dig
pythonutils/
  CmUtil.py

Pythonスクリプトをこのような形で用意していた場合、

#!/usr/bin/env python
# -*- coding: utf-8 -*-

def printMessage():
  print("Hello, Python 3!");

digファイルを以下のような設定としておくと、

timezone: Asia/Tokyo

+exec-python-method:
  py>: pythonutils.CmUtil.printMessage

以下のような結果を得る事が出来ます。

$ digdag run exec-python-method.dig 
2016-11-13 07:39:44 +0900: Digdag v0.8.18
2016-11-13 07:39:46 +0900 [WARN] (main): Using a new session time 2016-11-13T00:00:00+09:00.
2016-11-13 07:39:46 +0900 [INFO] (main): Using session /home/ec2-user/cm-digdag/cm1/.digdag/status/20161113T000000+0900.
2016-11-13 07:39:46 +0900 [INFO] (main): Starting a new session project id=1 workflow name=exec-python-method session_time=2016-11-13T00:00:00+09:00
2016-11-13 07:39:47 +0900 [INFO] (0016@+exec-python-method+exec-python-method): py>: pythonutils.CmUtil.printMessage
Hello, Python 3!
Success. Task state is saved at /home/ec2-user/cm-digdag/cm1/.digdag/status/20161113T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

ちなみに、以下のような形でPythonスクリプトを用意して

#!/usr/bin/env python
# -*- coding: utf-8 -*-

print("Hello, Python 3 World!");

下記の様にファイルを実行する形を試してみようとしましたが、

timezone: Asia/Tokyo

+exec-python-script:
  py>: /home/ec2-user/cm-digdag/cm1/python-process.py

エラーが出てしまい実行出来ませんでした。ちょっとまだPython力の理解が足りないからなのかもしれませんが、上記ドキュメントでもファイル実行に関する記載が無かったのでもしかしたらこの形での実行は想定していないのかもしれません。

$ digdag run exec-python-script.dig 
2016-11-13 07:47:38 +0900: Digdag v0.8.18
2016-11-13 07:47:39 +0900 [WARN] (main): Using a new session time 2016-11-13T00:00:00+09:00.
2016-11-13 07:47:39 +0900 [INFO] (main): Using session /home/ec2-user/cm-digdag/cm1/.digdag/status/20161113T000000+0900.
2016-11-13 07:47:40 +0900 [INFO] (main): Starting a new session project id=1 workflow name=exec-python-script session_time=2016-11-13T00:00:00+09:00
2016-11-13 07:47:41 +0900 [INFO] (0016@+exec-python-script+exec-python-script): py>: /home/ec2-user/cm-digdag/cm1/python-process.py
Traceback (most recent call last):
  File "<stdin>", line 85, in digdag_inspect_command
ImportError: No module named '/home/ec2-user/cm-digdag/cm1/python-process'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 136, in <module>
  File "<stdin>", line 92, in digdag_inspect_command
ValueError: Empty module name
2016-11-13 07:47:41 +0900 [ERROR] (0016@+exec-python-script+exec-python-script): Task failed with unexpected error: Python command failed with code 1
java.lang.RuntimeException: Python command failed with code 1
	at io.digdag.standards.operator.PyOperatorFactory$PyOperator.runCode(PyOperatorFactory.java:145)
	at io.digdag.standards.operator.PyOperatorFactory$PyOperator.runTask(PyOperatorFactory.java:89)
	at io.digdag.util.BaseOperator.run(BaseOperator.java:51)
	at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:313)
	at io.digdag.cli.Run$OperatorManagerWithSkip.callExecutor(Run.java:678)
	at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:258)
	at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:141)
	at io.digdag.core.agent.LocalWorkspaceManager.withExtractedArchive(LocalWorkspaceManager.java:25)
	at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:139)
	at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:123)
	at io.digdag.cli.Run$OperatorManagerWithSkip.run(Run.java:660)
	at io.digdag.core.agent.MultiThreadAgent.lambda$run$0(MultiThreadAgent.java:95)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
2016-11-13 07:47:42 +0900 [INFO] (0016@+exec-python-script^failure-alert): type: notify
error: 
  * +exec-python-script+exec-python-script:
    Python command failed with code 1

Task state is saved at /home/ec2-user/cm-digdag/cm1/.digdag/status/20161113T000000+0900 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

rb>: - Rubyスクリプトの実行

Rubyスクリプトをrubyコマンドを使って実行します。

_export: require:を使ってワークフローを設定する方法のベストプラクティスについては下記をご参照ください。

_export:
  rb:
    require: tasks/my_workflow

+step1:
  rb>: my_step1_method
+step2:
  rb>: Task::MyWorkflow.step2

ちなみにRubyの場合、Python同様にメソッド指定を行うパターンと、

rb>: Task::MyWorkflow.my_task

もう1つ、ファイル名を指定するパターンが使えるようです。

require: task/my_workflow

まとめ

という訳でDigdagにおける『外部スクリプトの実行』に関する手順のご紹介でした。現行ではシェル、Python、Rubyの3つが利用可能となっていますが最低限この3種類があれば大抵の処理は実現出来そうなので特に心配は要らないですね。あとは用途に応じてどの様なスクリプトを用意しておき、また効率的なソースコード管理を行い、共通プログラム的なものを用意出来るかといった所が肝になってくるのではないでしょうか。こちらからは以上です。

参考情報

AWS Cloud Roadshow 2017 福岡