機械学習チームの鈴木です。
Amazon Forecastを運用するために、Step Functionsを使ったデータインポートの実行について検証をしました。
やりたいこと
Step FunctionsのステートマシンからCreateDatasetImportJobアクションを実行して、指定したデータセットにデータの取り込みをしたいです。
ステートマシンからはarn:aws:states:::aws-sdk:forecast:createDatasetImportJob
によってデータインポートの実行が可能です。これは非同期実行なのですが、例えば『Optimized integrations for Step Functions』の『Supported service integrations』表を見ると、記事執筆時点ではAmazon Forecastの.syncによる同期実行の対応はなかったため、どう実装しようかと思っていたところ、以下のブログをみつけたので参考に作成してみました。
リソースを作成してみた
1. データの作成
以下のブログで紹介されている方法で、データを作成しました。今回はtarget.csv
のみ使いました。
2. データセットグループおよびデータセットの作成
データセットグループおよびデータセットはCloudFormationで作成してみました。以下のテンプレートをコンソールからデプロイしました。
AWSTemplateFormatVersion: "2010-09-09"
Description: Forecast Dataset Group And TargetTimeSeries Dataset
Parameters:
Env:
Description: Environment
Type: String
Default: Dev
AllowedValues:
- Dev
- Prod
ProjectName:
Description: ProjectName
Type: String
Resources:
ForeCastDatasetGroup:
Type: AWS::Forecast::DatasetGroup
Properties:
DatasetArns:
- !GetAtt TargetTimeSeriesDataset.Arn
DatasetGroupName: !Sub ForeCastDatasetGroup${ProjectName}${Env}
Domain: CUSTOM
TargetTimeSeriesDataset:
Type: AWS::Forecast::Dataset
Properties:
DataFrequency: D
DatasetName: !Sub TargetTimeSeriesDataset${ProjectName}${Env}
DatasetType: TARGET_TIME_SERIES
Domain: CUSTOM
Schema:
Attributes:
- AttributeName: item_id
AttributeType: string
- AttributeName: timestamp
AttributeType: timestamp
- AttributeName: target_value
AttributeType: integer
以下のようなデータセットグループおよびデータセットを作成できました。(データをインポートしてから画像を撮ったのでLatest import status
はActive
になっています。)
3. データインポート用のIAMロールの作成
今回は2種類のIAMロールが必要になります。
- Forecastがデータのインポートに使うIAMロール
- ステートマシンがデータのインポートを実行するために使うIAMロール
ここでは『Forecastがデータのインポートに使うIAMロール』を作成した方法を説明します。『ステートマシンがデータのインポートを実行するために使うIAMロール』は次で説明します。
検証に使ったIAMロールは、Forecastのデータセットインポートの作成より作成されるIAMロールを使いました。
定義は以下のように作成したため、同様のIAMロールを作成しても実行が可能です。
信頼ポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "forecast.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
許可ポリシー
バケットは特に指定しませんでしたが、運用する際はResource
で限られたバケットに制限することをお勧めします。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}
4. ワークフローの作成
Workflow Studioを使って作成しました。
作成した定義は以下のようになります。インポート対象のデータのS3パス
・Forecastがデータのインポートに使うロールのARN
・データをインポートするデータセットのARN
は自身の環境のものに置き換えて下さい。
MyStateMachine.asl.json
{
"Comment": "A description of my state machine",
"StartAt": "ImportData",
"States": {
"ImportData": {
"Type": "Task",
"Parameters": {
"DataSource": {
"S3Config": {
"Path": "インポート対象のデータのS3パス",
"RoleArn": "Forecastがデータのインポートに使うロールのARN"
}
},
"DatasetArn": "データをインポートするデータセットのARN",
"DatasetImportJobName.$": "$.DataImportJobName",
"ImportMode": "FULL",
"TimestampFormat": "yyyy-MM-dd"
},
"Resource": "arn:aws:states:::aws-sdk:forecast:createDatasetImportJob",
"Next": "Check-Data-Import",
"ResultPath": "$.createDatasetImportJobResult"
},
"Check-Data-Import": {
"Type": "Task",
"Parameters": {
"DatasetImportJobArn.$": "$.createDatasetImportJobResult.DatasetImportJobArn"
},
"Resource": "arn:aws:states:::aws-sdk:forecast:describeDatasetImportJob",
"Next": "Fork-Data-Import",
"ResultPath": "$.describeDatasetImportJobResult"
},
"Fork-Data-Import": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.describeDatasetImportJobResult.Status",
"StringEquals": "ACTIVE",
"Next": "Done-Data-Import"
}
],
"Default": "Wait-Data-Import"
},
"Done-Data-Import": {
"Type": "Pass",
"End": true
},
"Wait-Data-Import": {
"Type": "Wait",
"Seconds": 60,
"Next": "Check-Data-Import"
}
}
}
作成する上で注意した点は以下になります。
ImportModeの指定
ImportModeはFULL
としました。
CreateDatasetImportJobアクションのドキュメントのImportModeの箇所に記載があるように、現状ImportModeにはFULL
とINCREMENTAL
があります。
増分モードでインポートしたい場合はINCREMENTAL
でよいですが、それほどデータポイントが多くないケースを想定していたのでFULL
にしました。
Forecastは記事執筆時点ではデータ1GBあたり0.088USDのインポートしたデータ量に対しても課金が発生するため、洗い替えによる運用のしやすさを取るか、増分モードによるコスト節約を取るかはデータ量を見て判断になると思います。
ステートマシンがデータのインポートを実行するために使うIAMロールの作成
ステートマシンがデータのインポートを実行するために使うIAMロールはWorkflow Studioにてステートマシンを作成する際に自動作成されますが、Forecastを利用するための権限については後から自分で追加しました。
以下のように、IAMロールに追加でポリシーをアタッチしました。Forecastがデータのインポートに使うロールのARN
は自身の環境のものに置き換えて下さい。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"forecast:CreateDatasetImportJob",
"forecast:DescribeDatasetImportJob"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
"Forecastがデータのインポートに使うロールのARN"
]
}
]
}
特にiam:PassRole
のアクションは忘れていると以下のようなエラーが発生します。
インポートジョブ名
今回はインポートジョブ名は入力から渡すようにしました。インポートジョブ名が過去に実行したものと重複が許されないためです。
実際に運用する際は、後続の予測子の学習などと合わせてEventBridgeなどで呼び出されると思うので、入力として日付などを埋め込んだ文字列をインポートジョブ名として渡すとよいように思っています。
タイムアウト設定
ステートマシン側のタイムアウト設定はできますが、Forecastのインポートジョブのタイムアウト設定が分かりませんでした。ただし、現状はForecastのクォータが最悪の場合でも料金表と見比べてそれほど影響がない値に設定されているので、ひとまずステートマシン側のタイムアウト設定だけを考えれば十分そうかなと思いました。
動かしてみた
以下の入力でステートマシンを実行してみました。
{
"DataImportJobName": "SampleImportJob"
}
何回かインポートジョブが終わるのを待った後、無事成功しました!
以下のようにジョブが正常に完了したことを確認できました。
最後に
Amazon Forecastのデータセットにデータをインポートするステートマシンを作成してみました。
IAMロールの設定など少し注意が必要な点がありましたが、Workflow Studioを使ってとても簡単に作成ができました。
参考になりましたら幸いです。