通知機能を試してみた | Luigi Advent Calendar 2016 #21

2016.12.21

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』21日目の内容となります。
ジョブの通知機能について試してみたいと思います。

先日20日目はRedshiftへのCopyをIAM ROLEを用いる形にしてみたでした。

下準備

Luigiの通知機能について何があるか調べてみたところ、SES / SNSといった文言がLuigiのソースコードから確認できました。
今回は両方共試してみたいと思います。 なお、SES SNSについてはすでに構築済みのものとします。

また今回は通知が目的のため、わざと例外が発生するタスクをつくってみました。

NotificationTest.py

import luigi


class RaiseException(luigi.Task):

    def run(self):
        ans = 10 / 0
        print(ans)


if __name__ == '__main__':
    luigi.run()

通知設定を行なってみた SES編

LuigiのSES通知はboto3を用いているので、導入する必要があります。

$ pip install boto3

SESへの通知ですが実際に行うこととしては、Luigiの設定ファイルの記載のみです。

luigi.cfg

[core]
error-email: (送信元アドレス)
email-sender: (送信先アドレス)
email-prefix: [luigi]

[email]
force-send: true
type: ses
[/bash]

7行目のforce-send: trueの記載ですが、これは通知のテストを行う際にはtrueにしておく必要があります。
これに関しては以下のような記載がドキュメントにあります。

If true, e-mails are sent in all run configurations (even if stdout is connected to a tty device). Defaults to False. Configuration — Luigi 2.4.0 documentation

標準出力が出るような状態はtty deviceだよ、とのことらいしので今回はtrueにして送信されるようにします。
上記を実行する際に2点注意が必要です。

設定ファイルのなかでは、Credentialの情報は記載しません。
そのため、SES送付に用いるCredentialの情報は別途 ~/.aws/credential ファイルに記載する・IAM roleを用いる等をする必要があります。

Regionの指定もLuigi上で行なえません。
そのためコチラも環境変数等で指定しておく必要があります。

これらの注意点を踏まえて実行すると以下のようになります。
下記の例では、SESはVarginia(us-east-1)を用いているため、AWS_DEFAULT_REGIONでRegion指定を行なっています。

$ AWS_DEFAULT_REGION=us-east-1 python ./NotificationTest.py --local-scheduler RaiseException
DEBUG: Checking if RaiseException() is complete
/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py:305: UserWarning: Task RaiseException() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   RaiseException__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 47873] Worker Worker(salt=106145114, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=47873) running   RaiseException()
ERROR: [pid 47873] Worker Worker(salt=106145114, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=47873) failed    RaiseException()
Traceback (most recent call last):
  File "/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py", line 192, in run
    new_deps = self._run_get_new_deps()
  File "/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
    task_gen = self.task.run()
  File "./NotificationTest.py", line 7, in run
    ans = 10 / 0
ZeroDivisionError: integer division or modulo by zero
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Sending warning email to ['(送信先アドレス)']
DEBUG: Message sent to SES.
MessageId: 01000158fa0904e1-2cbd6e95-d068-43c3-8c9c-ed3d5943fa6f-000000,
RequestId: 13cec3f3-c179-11e6-a87f-9d709b5c7333,
HTTPSStatusCode: 200
INFO: Informed scheduler that task   RaiseException__99914b932b   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=106145114, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=47873) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 RaiseException()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

通知メールとしては以下のような内容が送付されてます。

件名:[luigi] Luigi: RaiseException() FAILED. Host: HL00088.local
本文:A task failed when running. Most likely run() raised an exception.

Name: RaiseException

Parameters:


Command line:
  /Users/kajiwarayutaka/Project/FR/luigiStudy/HelloWorld.py --local-scheduler RaiseException

Runtime error:
Traceback (most recent call last):
  File "/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py", line 192, in run
    new_deps = self._run_get_new_deps()
  File "/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
    task_gen = self.task.run()
  File "/Users/kajiwarayutaka/Project/FR/luigiStudy/HelloWorld.py", line 19, in run
    ans = 10 / 0
ZeroDivisionError: integer division or modulo by zero

通知設定を行なってみた SNS編

SNSを用いた通知もSESと基本変わりません。

luigi.cfg

[core]
error-email: arn:aws:sns:ap-northeast-1:123456789012:your-use-sns-arn
email-prefix: [luigi]

[email]
force-send: true
type: sns

実行する際にRegionを環境変数として指定することもSESで送るときと同様です。
今回はap-northeast-1(東京リージョン)のSNSを用いているので、環境変数ではそれを渡して上げる必要があります。

$ AWS_DEFAULT_REGION=ap-northeast-1 python ./NotificationTest.py --local-scheduler RaiseException
DEBUG: Checking if RaiseException() is complete
/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py:305: UserWarning: Task RaiseException() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   RaiseException__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 80552] Worker Worker(salt=090357848, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=80552) running   RaiseException()
ERROR: [pid 80552] Worker Worker(salt=090357848, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=80552) failed    RaiseException()
Traceback (most recent call last):
  File "/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py", line 192, in run
    new_deps = self._run_get_new_deps()
  File "/Users/kajiwarayutaka/.pyenv/versions/luigiStudy/lib/python2.7/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
    task_gen = self.task.run()
  File "./NotificationTest.py", line 7, in run
    ans = 10 / 0
ZeroDivisionError: integer division or modulo by zero
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Sending warning email to ['arn:aws:sns:ap-northeast-1:123456789012:your-use-sns-arn']
DEBUG: Message sent to SNS.
MessageId: 64832294-f515-57be-bc66-725fa305a55c,
RequestId: bb12eff7-4c57-5bcf-aea9-bf3c1a47392d,
HTTPSStatusCode: 200
INFO: Informed scheduler that task   RaiseException__99914b932b   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=090357848, workers=1, host=HL00088.local, username=kajiwarayutaka, pid=80552) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 RaiseException()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

通知内容はSESと同様のため省略します。

まとめ

明日はCentral Schedulerを試してみたいと思います。