[Step Functions + Glue] Glueジョブ終了時の出力を次のステートに流す。エラーハンドリングもできるよ!

Glue PythonShellをLambdaみたいに使うための努力。
2023.04.24

Step FunctionsのステートマシンではいくつかのAPIについては独自のサービス統合がなされています。 例えばLambda関数を実行したい場合、ユーザは難しいことを考えずとも

  • 同期的な実行(Lambda関数の処理が終わるまで次のステートに進まないで待ってる)
  • 関数からreturnされた情報を次のステートに流す
  • Lambda関数内で発生したエラーの内容をステートマシンに渡す

などが行えるようになっています。 これらはステートマシンからLambdaを呼ぶ時の直感的な動きかと思いますので、 この辺の機能が作り込まなくても利用できるのはありがたいですね。

さて、一方本題のGlueジョブの実行について見てみます。 Glueジョブ実行についてもサービス統合があるのですが、こちらは残念ながらLambdaほどの統合がされていません。 (Glueは分散スケールしての大規模データ処理が本分なので、 サーバレスでのコード実行を本分とするLambdaの方が、餅は餅屋的な最適化がされている)

これだとStep Functions + Glueという構成を使う上でいくつか問題があるので、 今回はサービス統合を用いないでステートマシンへのコールバックを返す形で問題を回避してみました!

検証環境

検証については全てGlue PythonShellを用いています。

PythonShellは(PySparkを使った分散処理環境を立ち上げるまでもないような)簡単な処理をする時に使いますが、 裏でインスタンスが立ち上がって動く仕組みとしては同一と考えられるため、 以下の検証内容はPySparkのGlueジョブでも同じになるかと思います。

というか、この記事で検証している動作自体は、 ステートマシンからコールバックを待つ形で何かしら処理をした時全てに共通している動きのはずです。

サービス統合での動きの確認

まずはサービス統合での動きについて見ていきます。 Workflow Studioだとこの画面から作れます。 「タスクが完了するまで待機」にチェックを入れると同期処理となり、 ステートマシンはGlueジョブが終わるまで待機して、終わったら次のステートへ進みます。

{
  "Comment": "A description of my state machine",
  "StartAt": "Glue StartJobRun",
  "States": {
    "Glue StartJobRun": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "cm-hirano-stepfunctions-glue-output-sync"
      },
      "End": true
    }
  }
}

この場合の問題点について見ていきます。

問題点

サービス統合で呼び出されたGlueジョブの場合、その出力結果をコントロールすることができません。

これはGlueは「変数を受け取って処理を実行するが、それの結果については成功or失敗の情報しかステートマシンに伝えない」ということを意味しています。 「詳細情報を伝えるのはGlueの本来の仕事じゃないよ」と言われればその通りですが、 例えば処理した結果のファイルを「どこどこに置いたよ」という情報を次に伝えることができないのは、 ステートマシンで処理を繋いでいくことを考えるとちょっと不便です。

ジョブ成功時の挙動

実際にジョブ実行の結果を見てみます。 成功したGlueジョブの出力を見てみると、その出力は

{
  "AllocatedCapacity": 0,
  "Attempt": 0,
  "CompletedOn": 1681980548339,
  "ExecutionClass": "STANDARD",
  "ExecutionTime": 16,
  "GlueVersion": "3.0",
  "Id": "jr_f1d5b4e731e1ee9666e6c6c400f18d9a7f4bae48b2158f67a8562eb27164df00",
  "JobName": "cm-hirano-stepfunctions-glue-output-sync",
  "JobRunState": "SUCCEEDED",
  "LastModifiedOn": 1681980548339,
  "LogGroupName": "/aws-glue/python-jobs",
  "MaxCapacity": 0.0625,
  "PredecessorRuns": [],
  "StartedOn": 1681980526349,
  "Timeout": 2880
}

のようになっており、見た感じ、任意の情報を埋め込むことができる枠はなさそうです。 (ちなみにPredecessorは、「前任者」と言う意味だそうです。知らなかった) Glueが出力したファイルの場所を後続の処理に知らせたりはできなさそうです。

余談ですが、Lambda関数はlambda_handlerメソッドを書くのでreturnを書くことができますが、 Glueは起点が__main__なのでreturnを書くことができず、 元々値を返すという概念がありません。

ジョブ失敗時の挙動

次にジョブが失敗した時の出力を見てみます。

if True:
    raise ValueError("とりあえず失敗させちゃうね")

こんなPythonコードで失敗させてみます。

{
  "resourceType": "glue",
  "resource": "startJobRun.sync",
  "error": "States.TaskFailed",
  "cause": {
    "AllocatedCapacity": 0,
    "Attempt": 0,
    "CompletedOn": 1681981780354,
    "ErrorMessage": "ValueError: とりあえず失敗させちゃうね",
    "ExecutionClass": "STANDARD",
    "ExecutionTime": 13,
    "GlueVersion": "3.0",
    "Id": "jr_92a124ba87d392f1a6f4bb8b75c651126acb8c9443fbcac31490efb76cfb6103",
    "JobName": "cm-hirano-stepfunctions-glue-output-sync",
    "JobRunState": "FAILED",
    "LastModifiedOn": 1681981780354,
    "LogGroupName": "/aws-glue/python-jobs",
    "MaxCapacity": 0.0625,
    "PredecessorRuns": [],
    "StartedOn": 1681981759361,
    "Timeout": 2880
  }
}

失敗の場合の出力を見ると、errorStates.TaskFailedになっています。 Glueジョブにおいてどんなエラーで終了したとしても、 ステートとしてのerrorコードとしてはこれになってしまうようです。

そしてこのerrorの部分は、ステートマシンがリトライやエラーのキャッチをする際のErrorEqualsと突き合わせる部分となります。 つまり、この部分が固定値になってしまうということは、 失敗の内容によって処理を分けるような挙動をさせることができないということになります。

ということで、まとめとして、 サービス統合で同期的に起動したGlueジョブは、

  • ジョブの中から任意の出力内容を次のステートに流すことができない
  • ジョブ失敗の内容によって処理を分岐させることができない

という課題があります。

これらを解決するために、Glueジョブからステートマシンにコールバックを返すように実装していきます。

サービス統合を用いない形

ということで、今度はサービス統合を使わず、 ステートマシンに終了したことを明示的に送ってあげる構成にしたいと思います。

具体的には、Step FunctionsのAPIを叩いてジョブの終了を告げてあげる必要があります。

ステートマシン定義

終了のコールバックを待つステートマシンです。 Workflow Studioだと下のような感じで「AWS SDK」を選ぶと、 特にステートマシンから呼ぶという特別考慮はなく、普通にAPIを叩くだけというイメージになります。

今度のステートマシン定義はこんな感じになります。

{
  "Comment": "A description of my state machine",
  "StartAt": "Glue StartJobRun",
  "States": {
    "Glue StartJobRun": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:glue:startJobRun.waitForTaskToken",
      "Parameters": {
        "JobName": "cm-hirano-stepfunctions-glue-output",
        "Arguments":{
          "--TaskToken.$": "$$.Task.Token",
          "--Comment.$": "$.Comment"
        }
      },
      "TimeoutSeconds": 600,
      "End": true
    }
  }
}

startJobRun.waitForTaskTokenというのが明示的にコールバックを待つ呼び出しパターンになります。 続いて、Workflow Studioでは特にお世話してくれないポイントを2点解説します。

TaskTokenの入力

TaskTokenを入力に入れています。

"--TaskToken.$": "$$.Task.Token",

これはステートマシンからGlueジョブを呼び出した時のIDで、 このIDに対して「終わったよ」と連絡して上げることで次のステートに処理が進むことになります。 この指定は明示的に書いてあげないといけない(「コールバックを待つ」をポチッとしただけでは書かれない)ので 明示的に書いてあげる必要があります。

TimeoutSecondsを設定する

そしてもう一点、TimeoutSecondsとして、値を設定してあげる必要があります。

これはコールバックを待つステートに限らず指定可能なものですが、 コールバックを待つ場合は(実質的に)必ず明示的に指定する必要があります。 TimeoutSecondsはデフォルトでは99999999秒(=1157日)になっているらしく、 ステートマシン全体の最大実行時間である1年を上回ってしまいます。 つまり、実質タイムアウトなし(先にステートマシン自体がタイムアウトする)で動作します。

非同期でタスクを実行する時は常に「コールバックをきちんと返せない可能性」を考える必要があります。 例えばGlueジョブの中で、できるだけ早くtry-exceptで囲んだとしても、 引数からTaskTokenを取得する処理などでエラーが起きた場合は当然コールバックを返すことはできませんし、 他にも、AWSの内部的なエラーでジョブの起動自体が失敗することもあり得ますので、 コールバックが受け取れないことを必ず考慮する必要があります。

短すぎてGlueでの処理が終わる前にタイムアウトしてしまっても困りますので、 実際にかかる最大時間にいくらか余裕を加えた時間などに設定するなど、 現実的なタイムアウト時間の調整が必要になります。

Glueの実装

次にGlueジョブのPythonスクリプトです。

import json
import sys

import boto3

sfn_client = boto3.client("stepfunctions")

print(sys.argv)

def get_callback_token(args):
    for i in range(len(args)):
        if args[i] == "--TaskToken":
            return args[i + 1]
    raise Exception("TaskTokenが見つかりませんでした")

def send_success_callback_to_step_functions(task_token, output):
    response = sfn_client.send_task_success(taskToken=task_token, output=output)
    print("成功したよ", response)

def send_fail_callback_to_step_functions(task_token, error=None, cause=None):
    response = sfn_client.send_task_failure(
        taskToken=task_token, error=error, cause=cause
    )
    print("失敗したよ!", response)

print("job start!!!!!!!!!!!")
callback_token = get_callback_token(sys.argv)
try:
    print(f"callback_token = {callback_token}")
    output = json.dumps(sys.argv)
    print(f"output = {output}")
    send_success_callback_to_step_functions(callback_token, output)
except Exception:
    error = "エラーの内容!"
    cause = "エラーの詳細!"
    send_fail_callback_to_step_functions(callback_token, error=error, cause=cause)

get_callback_tokenメソッドでTaskTokenを取得しています。 ステートマシン定義でTaskTokenという引数に渡していたので、 sys.argvから取得できます。1

あとは基本的にsfn_client.send_task_successsfn_client.send_task_failureに渡すだけです。

ジョブ成功の場合

successの場合はoutputに出力となる文字列を渡すことができます。 これがステートマシンでは次に渡される情報になりますので、 ここに任意の情報を入れることで、Glueジョブからでも次のステートに情報伝達が可能です!

なおここちょっと注意が必要ですが、渡せるのは文字列です。 dictで渡すと怒られますので、json.dumpsしたオブジェクトを渡すようにしましょう。

ジョブ失敗の場合

一方failureの場合はerrorcauseを渡すことができます。 このerrorこそが前述のように、ステートマシンのリトライやエラーキャッチにおける ErrorEqualsの対象になる部分となります。 ステートマシンのErrorEqualsに指定するものとerrorに流す文字列を揃えることで、 エラー内容によるステートマシンの処理分岐が可能になります。

また、causeに関しては、任意のエラー情報を格納することができます。 こちらもoutputと同様に渡せるのは文字列となります。

Glueジョブの入出力例

入力例

{
  "JobName": "cm-hirano-stepfunctions-glue-output",
  "Arguments": {
    "--Comment": "Insert your JSON here",
    "--TaskToken": "AQCUAAAAKgAAAAMAAAAAAAAAAbw6nIc55xKGLBN2IoI4qCG2VhEtVeuEr9TrpBCb9CFZ14FZk0ajx35nlU+Q6w+hzINsnbea98UoNslPk4wDH4JNwdSCElC+KS7gacFH39gS8rQaR7ipVicwlBNbBjg=CAjPF1sXJ4ndbcG3lb8rUjNVF6HrrVZWc8g3HZGQvMvwatLEjV5Nr+1jP49j0f3NdH3ELNvwKWcuxKW7YUeLx3Xa+YC0sCFi8O2T86GJk5EpQuqgeObSmAWz2+JrU38qH9Tkt3oR8vJ22hbAMsIGlXS0QpgmpJyZkvokDQaoORCwM8WvElA6qjs6RFdEtd+C657B6zSsTxzPr0UVgj6RbucXKFgm4KEUt2hf6dl/EMFUbX/I8lewlk1CML5PNn10ZrrIuclR983z1fiWs1sWG0X0Kf07SwYlEcCfw9PxAdHi2UjL0E+ENj5oxnM55f+z6RL8ofBut0BvqjaSRD9LRfVoJOVu4LqaNQ9hNUK3kF3sNh10DTN+Nhf2GuU+W19dCoXdsQ+aOrz+3q8uJHIA0q72yuu+VjrbmyXEZuiGzje7pC7wF9/isAa8lpEZduJFsa/CQx7M5hEJHhR/RxlAfoHvpgX1nPr/poVXzE1BR5r8nQTEglf1/f31hpGuqL8iEXMHom9/+S5tirSgXFkv"
  }
}

出力例

[
  "/tmp/glue-python-scripts-gfcr/cm-hirano-stepfunctions-glue-output",
  "--enable-job-insights",
  "true",
  "--Comment",
  "Insert your JSON here",
  "--enable-glue-datacatalog",
  "true",
  "library-set",
  "analytics",
  "--scriptLocation",
  "s3://aws-glue-assets-123456789012-ap-northeast-1/scripts/cm-hirano-stepfunctions-glue-output",
  "--python-version",
  "3.9",
  "--job-language",
  "python",
  "--TempDir",
  "s3://aws-glue-assets-123456789012-ap-northeast-1/temporary/",
  "--TaskToken",
  "AQCUAAAAKgAAAAMAAAAAAAAAAbw6nIc55xKGLBN2IoI4qCG2VhEtVeuEr9TrpBCb9CFZ14FZk0ajx35nlU+Q6w+hzINsnbea98UoNslPk4wDH4JNwdSCElC+KS7gacFH39gS8rQaR7ipVicwlBNbBjg=CAjPF1sXJ4ndbcG3lb8rUjNVF6HrrVZWc8g3HZGQvMvwatLEjV5Nr+1jP49j0f3NdH3ELNvwKWcuxKW7YUeLx3Xa+YC0sCFi8O2T86GJk5EpQuqgeObSmAWz2+JrU38qH9Tkt3oR8vJ22hbAMsIGlXS0QpgmpJyZkvokDQaoORCwM8WvElA6qjs6RFdEtd+C657B6zSsTxzPr0UVgj6RbucXKFgm4KEUt2hf6dl/EMFUbX/I8lewlk1CML5PNn10ZrrIuclR983z1fiWs1sWG0X0Kf07SwYlEcCfw9PxAdHi2UjL0E+ENj5oxnM55f+z6RL8ofBut0BvqjaSRD9LRfVoJOVu4LqaNQ9hNUK3kF3sNh10DTN+Nhf2GuU+W19dCoXdsQ+aOrz+3q8uJHIA0q72yuu+VjrbmyXEZuiGzje7pC7wF9/isAa8lpEZduJFsa/CQx7M5hEJHhR/RxlAfoHvpgX1nPr/poVXzE1BR5r8nQTEglf1/f31hpGuqL8iEXMHom9/+S5tirSgXFkv"
]

ステートマシンのResultPathを指定していないので、 ステート全体の出力がGlueの出力に置き換えられています。 この辺の仕組みについては、下記ブログを参照してください。

[StepFunctions]ParametersやらResultPathやら…。ステート間のパラメータ受け渡しって結局どうなってるの?を1つの図にしてみた。

失敗時の例

Glue.ValueErrorerrorに渡した文字列で、エラーの詳細!causeに渡した文字列です。

この場合だと、リトライさせたいエラーを記述する所(ErrorEquals)にGlue.ValueErrorと書けば、 Glue.ValueErrorでエラーになった場合のみリトライさせるような設定が可能になります。

なお、Glue.というプレフィックスをつけていますが、 これに何か特殊な意味があるわけではありません。 ErrorEqualsは、単純に文字列の完全一致を確認するだけですので、好きな文字列で大丈夫です。

エラーハンドリングしたステートマシン定義の例はこんな感じになります。

{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:glue:startJobRun.waitForTaskToken",
  "Parameters": {
    "JobName": "cm-hirano-stepfunctions-glue-output",
    "Arguments": {
      "--TaskToken.$": "$$.Task.Token",
      "--Comment.$": "$.Comment"
    }
  },
  "TimeoutSeconds": 600,
  "Retry": [
    {
      "ErrorEquals": [
        "Glue.ValueError"
      ],
      "BackoffRate": 2,
      "IntervalSeconds": 1,
      "MaxAttempts": 2
    }
  ],
  "Catch": [
    {
      "ErrorEquals": [
        "States.ALL"
      ],
      "Next": "Fail"
    }
  ],
  "Next": "次の処理"
}

Glue.ValueErrorの場合は2回リトライして、 それ以外のエラーの場合はFailステートに進むようにしています。

インフラでの変更点

コールバック自体はStep FunctionsのAPIを呼びますので、インフラ的な考慮も必要です。

  • GlueジョブにアタッチされるIAMロールにStep FunctionsへのAPIをコールする権限追加が必要
  • Glueがプライベートサブネットに起動する構成の場合、VPC Endpointを設置する必要がある

どちらもAWSサービスから他サービスのAPIを叩く際の標準的な処置ですね。

まとめ

サービス統合で同期的に起動したGlueジョブの呼び出しでは

  • ジョブの中から任意の出力内容を次のステートに流すことができない
  • ジョブ失敗の内容によって処理を分岐させることができない

という課題がありましたが、 サービス統合を用いずにコールバックを待つ形で実装すれば上記の課題をクリアできることがわかりました。

タイムアウト関連の所だけちょっと考える必要がありますが、 基本的にこのやり方の方がステートマシンから何か処理を呼ぶという視点での取り回しが素直になりますので、 ステートマシンからGlueジョブを呼ぶ場合はこの方法を取るのが良さそうです!

以上、誰かのお役に立てば幸いです。


  1. もう少し丁寧にやるなら、awsglue.utils.getResolvedOptionsを使います。