Amazon Forecastのデータセットにデータをインポートするステートマシンを作成してみた

Step Functionsと組み合わせることで、Amazon Forecastのデータインポートと後続の処理をまとめて管理・実行できました。
2023.07.03

機械学習チームの鈴木です。

Amazon Forecastを運用するために、Step Functionsを使ったデータインポートの実行について検証をしました。

やりたいこと

Step FunctionsのステートマシンからCreateDatasetImportJobアクションを実行して、指定したデータセットにデータの取り込みをしたいです。

データの取り込み

ステートマシンからはarn:aws:states:::aws-sdk:forecast:createDatasetImportJobによってデータインポートの実行が可能です。これは非同期実行なのですが、例えば『Optimized integrations for Step Functions』の『Supported service integrations』表を見ると、記事執筆時点ではAmazon Forecastの.syncによる同期実行の対応はなかったため、どう実装しようかと思っていたところ、以下のブログをみつけたので参考に作成してみました。

リソースを作成してみた

1. データの作成

以下のブログで紹介されている方法で、データを作成しました。今回はtarget.csvのみ使いました。

作成したデータ

2. データセットグループおよびデータセットの作成

データセットグループおよびデータセットはCloudFormationで作成してみました。以下のテンプレートをコンソールからデプロイしました。

AWSTemplateFormatVersion: "2010-09-09"
Description: Forecast Dataset Group And TargetTimeSeries Dataset 

Parameters:
  Env:
    Description: Environment
    Type: String
    Default: Dev
    AllowedValues:
      - Dev
      - Prod
  ProjectName:
    Description: ProjectName
    Type: String

Resources:
    ForeCastDatasetGroup:
        Type: AWS::Forecast::DatasetGroup
        Properties: 
          DatasetArns: 
            - !GetAtt TargetTimeSeriesDataset.Arn
          DatasetGroupName: !Sub ForeCastDatasetGroup${ProjectName}${Env}
          Domain: CUSTOM
    
    TargetTimeSeriesDataset:
        Type: AWS::Forecast::Dataset
        Properties: 
          DataFrequency: D
          DatasetName: !Sub TargetTimeSeriesDataset${ProjectName}${Env}
          DatasetType: TARGET_TIME_SERIES
          Domain: CUSTOM
          Schema: 
            Attributes: 
              - AttributeName: item_id
                AttributeType: string
              - AttributeName: timestamp
                AttributeType: timestamp
              - AttributeName: target_value
                AttributeType: integer

以下のようなデータセットグループおよびデータセットを作成できました。(データをインポートしてから画像を撮ったのでLatest import statusActiveになっています。)

作成したデータセット

3. データインポート用のIAMロールの作成

今回は2種類のIAMロールが必要になります。

  • Forecastがデータのインポートに使うIAMロール
  • ステートマシンがデータのインポートを実行するために使うIAMロール

ここでは『Forecastがデータのインポートに使うIAMロール』を作成した方法を説明します。『ステートマシンがデータのインポートを実行するために使うIAMロール』は次で説明します。

検証に使ったIAMロールは、Forecastのデータセットインポートの作成より作成されるIAMロールを使いました。

データセットインポートの作成

ロールの作成

定義は以下のように作成したため、同様のIAMロールを作成しても実行が可能です。

信頼ポリシー

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "forecast.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

許可ポリシー

バケットは特に指定しませんでしたが、運用する際はResourceで限られたバケットに制限することをお勧めします。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        }
    ]
}

4. ワークフローの作成

Workflow Studioを使って作成しました。

Workflow Studioでのステートマシン作成

作成した定義は以下のようになります。インポート対象のデータのS3パスForecastがデータのインポートに使うロールのARNデータをインポートするデータセットのARNは自身の環境のものに置き換えて下さい。

MyStateMachine.asl.json

{
  "Comment": "A description of my state machine",
  "StartAt": "ImportData",
  "States": {
    "ImportData": {
      "Type": "Task",
      "Parameters": {
        "DataSource": {
          "S3Config": {
            "Path": "インポート対象のデータのS3パス",
            "RoleArn": "Forecastがデータのインポートに使うロールのARN"
          }
        },
        "DatasetArn": "データをインポートするデータセットのARN",
        "DatasetImportJobName.$": "$.DataImportJobName",
        "ImportMode": "FULL",
        "TimestampFormat": "yyyy-MM-dd"
      },
      "Resource": "arn:aws:states:::aws-sdk:forecast:createDatasetImportJob",
      "Next": "Check-Data-Import",
      "ResultPath": "$.createDatasetImportJobResult"
    },
    "Check-Data-Import": {
      "Type": "Task",
      "Parameters": {
        "DatasetImportJobArn.$": "$.createDatasetImportJobResult.DatasetImportJobArn"
      },
      "Resource": "arn:aws:states:::aws-sdk:forecast:describeDatasetImportJob",
      "Next": "Fork-Data-Import",
      "ResultPath": "$.describeDatasetImportJobResult"
    },
    "Fork-Data-Import": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.describeDatasetImportJobResult.Status",
          "StringEquals": "ACTIVE",
          "Next": "Done-Data-Import"
        }
      ],
      "Default": "Wait-Data-Import"
    },
    "Done-Data-Import": {
      "Type": "Pass",
      "End": true
    },
    "Wait-Data-Import": {
      "Type": "Wait",
      "Seconds": 60,
      "Next": "Check-Data-Import"
    }
  }
}

作成する上で注意した点は以下になります。

ImportModeの指定

ImportModeはFULLとしました。

CreateDatasetImportJobアクションのドキュメントのImportModeの箇所に記載があるように、現状ImportModeにはFULLINCREMENTALがあります。

増分モードでインポートしたい場合はINCREMENTALでよいですが、それほどデータポイントが多くないケースを想定していたのでFULLにしました。

Forecastは記事執筆時点ではデータ1GBあたり0.088USDのインポートしたデータ量に対しても課金が発生するため、洗い替えによる運用のしやすさを取るか、増分モードによるコスト節約を取るかはデータ量を見て判断になると思います。

ステートマシンがデータのインポートを実行するために使うIAMロールの作成

ステートマシンがデータのインポートを実行するために使うIAMロールはWorkflow Studioにてステートマシンを作成する際に自動作成されますが、Forecastを利用するための権限については後から自分で追加しました。

以下のように、IAMロールに追加でポリシーをアタッチしました。Forecastがデータのインポートに使うロールのARNは自身の環境のものに置き換えて下さい。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "forecast:CreateDatasetImportJob",
                "forecast:DescribeDatasetImportJob"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": [
                "Forecastがデータのインポートに使うロールのARN"
            ]
        }
    ]
}

特にiam:PassRoleのアクションは忘れていると以下のようなエラーが発生します。

インポートジョブ名

今回はインポートジョブ名は入力から渡すようにしました。インポートジョブ名が過去に実行したものと重複が許されないためです。

実際に運用する際は、後続の予測子の学習などと合わせてEventBridgeなどで呼び出されると思うので、入力として日付などを埋め込んだ文字列をインポートジョブ名として渡すとよいように思っています。

タイムアウト設定

ステートマシン側のタイムアウト設定はできますが、Forecastのインポートジョブのタイムアウト設定が分かりませんでした。ただし、現状はForecastのクォータが最悪の場合でも料金表と見比べてそれほど影響がない値に設定されているので、ひとまずステートマシン側のタイムアウト設定だけを考えれば十分そうかなと思いました。

動かしてみた

以下の入力でステートマシンを実行してみました。

{
    "DataImportJobName": "SampleImportJob"
}

何回かインポートジョブが終わるのを待った後、無事成功しました!

実行したステートマシン

実行履歴

以下のようにジョブが正常に完了したことを確認できました。

ジョブの完了

最後に

Amazon Forecastのデータセットにデータをインポートするステートマシンを作成してみました。

IAMロールの設定など少し注意が必要な点がありましたが、Workflow Studioを使ってとても簡単に作成ができました。

参考になりましたら幸いです。