Amazon EventBridgeでAmazon Forecastのジョブをイベントパターンとして使用できるようになりました

EventBridgeのアップデートにより、Forecastのジョブ実行を制御できるようになりました。パッと見は地味なアップデートですが、実はかなりのビッグアップデートです。
2021.04.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の貞松です。 ブログのネタだけ溜まっていって放出できていないのを猛省する日々です。

EventBridgeのアップデートと言うべきかForecastのアップデートと言うべきか悩みどころですが、EventBridgeのイベントパターンとしてForecastのジョブを使用できるようになりました。 これにより、Forecastで時系列予測を作成するための一連のワークフローについて、比較的簡単に依存関係のあるジョブの実行を制御することができるようになります。

本記事では、アップデート内容の解説と、実際にEventBridgeを介してForecastのジョブ実行を制御する手順について解説します。
予め記載しておきますが、AWS公式のブログや開発者ガイドを見ながらそのまま設定・実行すると確実にハマります。それらのハマリポイントについても記載しますので、どちらかというとそこを重点的にご確認いただけると幸いです。

EventBridgeのアップデート内容

概要

EventBridgeアップデートにより、イベントルールとしてForecastのジョブに関するイベントを条件として、その他の処理(Lambdaなど)をトリガーすることができるようになりました。 Forecastでは、各ジョブ間に依存関係があり、データセットインポートジョブのステータスが完了(ACTIVE)になっていないと予測子の作成(予測モデルの学習)ジョブが実行できないなど、他のジョブの状態を確認してからジョブを実行するような制御が必要になります。
EventBridgeのイベントルールでForecastのジョブ制御ができるようになることで、AWS SDKでジョブのステータスをポーリングして完了になったら次のジョブを呼び出すような無駄な自前の処理を実装しなくても済むようになります。
また、Forecastの実行制御と処理実行の呼び出し部分を分離しつつワークフローを構成することができる上に、ジョブの失敗を受け取ってログ出力、通知に処理を流すようなこともできるので、実運用に必要な一連な処理を備えた真っ当なサーバーレス構成を組むこともできるようになりました。

EventBridgeで対応しているForecastのイベントパターン

具体的に利用可能なForecastのジョブイベントは以下の通りです。

  • Forecast Dataset Deletion State Change
  • Forecast Dataset Import Job Deletion State Change
  • Forecast Dataset Import Job State Change
  • Forecast Forecast Creation State Change
  • Forecast Forecast Deletion State Change
  • Forecast Forecast Export Job State Change
  • Forecast Predictor Backtest Export Job State Change
  • Forecast Predictor Creation State Change
  • Forecast Predictor Deletion State Change

EventBridgeを介してForecastのジョブ実行を制御する

ここから実際にEventBridgeを介してForecastのジョブ実行を制御する処理を作成していきます。

ここでは、以下の流れで処理をするワークフローを作成します。

  • Forecastのデータセットインポートジョブを実行する。
  • データセットインポートジョブのステータスがACTIVEになったイベントをEventBridgeのイベントルールで受け取る。
  • EventBridgeのイベントルールからLambda Functionをトリガーして、Forecastで予測子の作成処理を実行する。

Forecastの準備

上述の通り、Forecastのデータセットグループとデータセットを作成するジョブに関するイベントについては、EventBridgeのイベントルールで対象とするイベントパターンには含まれていないので、ここでは割愛します。
以降、Forecastはデータセットインポートの実行ができる状態であるという前提で進めていきます。

Forecastジョブを実行するLambda Functionの作成

EventBridgeからトリガーされてForecastの予測子作成ジョブを実行する為のLambda Functionを作成します。 ここでは、LambdaのランタイムはPython3.8、以下のようなコードでデプロイしておきます。

import json
import boto3

def create_predictor(event, context):
    session = boto3.Session(region_name='ap-northeast-1')
    forecast = session.client(service_name='forecast')
    
    create_predictor_response = \
    forecast.create_predictor(PredictorName="electricity_usage_predictor_deep_ar_plus",
                              AlgorithmArn='arn:aws:forecast:::algorithm/Deep_AR_Plus',
                              ForecastHorizon=48,
                              PerformAutoML=False,
                              PerformHPO=False,
                              InputDataConfig= {"DatasetGroupArn": "arn:aws:forecast:ap-northeast-1:XXXXXXXXXXXX:dataset-group/electricity_usage_dataset_group"},
                              FeaturizationConfig= {"ForecastFrequency": "H"}
                             )
    predictor_arn_deep_ar = create_predictor_response['PredictorArn']
    if type(event) == str:
        e = json.loads(event)
    return {
        'statusCode': 200,
        'body': json.dumps('Start create predictor.\nPredictorArn:{}\nInput:{}'.format(predictor_arn_deep_ar, e))
    }

EventBridgeルールの設定

EventBridgeルールの画面で「Create rule」をクリックしてイベントルールの作成を開始します。

イベントパターンの設定

まずはどういったイベントを受け取ってトリガーするのかというイベントパターンを設定します。
ここでアップデートにより追加になったForeacastのイベントパターンを選択します。

ちなみにコンソール画面上でForecastに関するイベントタイプの一覧を見ると、他のイベントタイプでも同様に存在する「All Events」や「AWS API Call via CloudTrail」を持っていることが確認できます。

イベントパターン設定のハマりポイント

プリセットの設定だけだと全く足りない

AWS公式の機械学習ブログで本機能についての解説がされており、実際にEventBridgeの設定手順が記載されているのですが、その中で、

この投稿では、データセットのインポートがいつ完了するかを知りたいので、Forecast Dataset Import Job StateChangeを選択します。 イベントを選択すると、適切なイベントパターンが[イベントパターン]セクションに入力されます。

と記載されている箇所があります。
実際のところプリセットのイベントタイプを選択しても「データセットのインポートがいつ完了するか」を引っ掛ける為の設定はイベントパターンに入ってはくれません。実際にイベントパターンに入る設定は以下の通りです。

{
  "source": ["aws.forecast"],
  "detail-type": ["Forecast Dataset Import Job State Change"]
}

このままだと、対象アカウントで実行される全てのForecast Dataset Import Job StateChangeが対象になりますし、ステータスの変更が「ACTIVE」でなくても(IN PROGRESSなど)対象になってしまいます。
よって自力でイベントパターンを記述する必要があります。

コンソール画面上のSample eventも開発者ガイドも両方イベントパターンのサンプルが間違っている

前述の通り、自力でイベントパターンを記述するのですが、プリセットの設定ではどう設定するかの手がかりすら無いので、コンソールの画面にある「Sample event」を見てみます。

{
  "version": "0",
  "id": "017fcb6d-7ca3-ebf8-819e-3e0fa956ee17",
  "detail-type": "Forecast Dataset Import Job State Change",
  "source": "aws.forecast",
  "account": "123456789012",
  "time": "2021-02-19T05:45:51Z",
  "region": "us-east-1",
  "resources": ["arn:aws:forecast:us-west-2:123456789012:dataset/example_data"],
  "detail": {
    "Arn": "arn:aws:forecast:us-west-2:123456789012:dataset/example_data",
    "Duration": 7200,
    "Status": "ACTIVE"
  }
}

要はこの形式でForecastからのイベントを受け取るので、これに対してマッチする条件を記述することでイベントパターンを設定できそうです。 ちなみにAWS公式の開発者ガイドにも同一のサンプルが記載されています。

ということで、上記のサンプルに従って以下をイベントパターンに記載しました。特定のデータセットに対するデータセットインポートジョブがACTIVEになったらイベントルールが発火するという想定です。

{
  "source": ["aws.forecast"],
  "detail-type": ["Forecast Dataset Import Job State Change"],
  "detail": {
    "Arn": ["arn:aws:forecast:ap-northeast-1:XXXXXXXXXXXX:dataset/electricity_usage_dataset/"],
    "Status": ["ACTIVE"]
  }
}

ところがこの設定ではForecastのデータセットインポートジョブを実行してもイベントルールが発火してくれませんでした。
当分悩んだ結果、思い立ってLambda Functionの中で、実際にEventBridgeに渡されたイベントの内容をそのまま取得して確認してみました。
以下がそのイベントの内容です。

{
  'version': '0', 
  'id': '114ed10e-5d3b-dafb-143a-75baab2deb33',
  'detail-type': 'Forecast Dataset Import Job State Change',
  'source': 'aws.forecast',
  'account': 'XXXXXXXXXXXX',
  'time': '2021-04-04T00:19:21Z',
  'region': 'ap-northeast-1',
  'resources': ['arn:aws:forecast:ap-northeast-1:XXXXXXXXXXXX:dataset-import-job/electricity_usage_dataset/electricity_usage_dataset_import'],
  'detail': {
    'duration': 3, 
    'arn': 'arn:aws:forecast:ap-northeast-1:XXXXXXXXXXXX:dataset-import-job/electricity_usage_dataset/electricity_usage_dataset_import',
    'status': 'ACTIVE'
  }
}

ポイントは2つです。

  • detail配下のkeyは先頭小文字
    • detail配下の「duration」「arn」「status」の先頭は大文字でなく小文字でした。先頭大文字では条件としてマッチしていませんでした。
  • resourcesおよびdetail配下のarnはデータセットでなくデータセットインポートのarn
    • 当たり前といえば当たり前なのですが、データセットインポートジョブのイベントで渡されるarnはデータセットでなくデータインポートのarnでした。
    • サンプルに沿って設定しても当然マッチしません
    • 更に実行したデータセットインポートジョブのarnはもちろん新しく作る(実行する)度に変わるので、prefix記法を使って記述する必要がありました。

最終的なイベントパターン設定

最終的に設定したイベントパターンは以下の通りです。こうやって載せてみると本当に大したことのない設定ですが上記諸々ハマったせいで細かい挙動の検証含めてめちゃめちゃ時間が掛かりました…。

{
  "source": ["aws.forecast"],
  "detail-type": ["Forecast Dataset Import Job State Change"],
  "detail": {
    "arn": [{
      "prefix": "arn:aws:forecast:ap-northeast-1:XXXXXXXXXXXX:dataset-import-job/electricity_usage_dataset/"
    }],
    "status": ["ACTIVE"]
  }
}

ターゲットの設定

イベントルールをトリガーとして実行されるターゲットを設定します。
ここでは前述で作成したForecastの予測子作成処理を実行する為のLambda Functionを指定します。

Forecastからデータセットインポートジョブの実行 → 予測子の作成実行を確認

Forecastのコンソール画面からEventBridgeで設定したイベントパターンに該当するデータセットのデータセットインポートジョブを実行します。
この段階ではまだ予測子が無いことも確認できます。

インポートジョブが実行中の状態です。まだ予測子はアクティブになっていません。

インポートジョブが完了しました。ここで予測子がアクティブになっていることに注目です。

予測子の詳細画面を開くと無事に予測子の作成ジョブが実行されていることを確認できました。

まとめ

EventBridgeのアップデートにより、Forecastのジョブ実行を制御できるようになりました。
本記事では、データセットインポートジョブの完了をトリガーして予測子の作成ジョブを実行するという部分的なフローのみを作成、解説しましたが、この機能を使用すればForecastに関する全てのジョブ実行をサーバーレスなワークフロー化することができ、ログ出力や通知といったハンドリングも可能になる為、実運用の環境構築がグッと現実的なものになったと言えます。
パッと見は地味なアップデートですが、実はかなりのビッグアップデートです。ぜひ本記事がForecastの環境構築の参考になれば幸いです(或いは本記事を見て環境構築についてご相談いただけると幸いです)