Amazon Forecastに再入門してSDKを使って時系列予測

Amazon Forecast再入門ブログ第二弾!SDKを使って時系列予測をしてみました。
2023.02.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

皆さん、こんにちは。

クルトンです!

前回のブログに引き続き、Amazon Forecastの再入門ブログです。 今回は前回のブログでやっていた内容を「SDKを使って動かすには」という事で実行していきます。

一連の流れを実行する際の前提

時系列予測に使っているファイル

公式ドキュメントで配布されている個々の家庭の電力消費のデータセットを使用します。ローカルマシンへダウンロードして、zipファイルを解凍してください。

utilモジュールについて

公式GitHubからフォルダごとローカルへ持ってくるようお願いいたします。

実行環境

ローカルマシンに~/.aws/credentials~/.aws/configを設定しています。 設定方法についてはこちらをご確認ください。

本ブログの一連の流れ

前回のブログと同じく、S3バケットの作成からスタートして、作ったリソースの削除までを行ないます。 前回とは少し流れが違うところもありますが、作成するAWSリソースは同じです。

  1. モジュールのインポート
  2. Clientオブジェクトの作成
  3. IAM roleの作成
  4. ローカルにある時系列データをS3バケットへアップロード
  5. Datasetの作成
  6. Dataset Import Jobの作成
  7. Dataset Groupを作成
  8. AutoPredictorを使って予測子の学習
  9. 予測子のメトリクスを確認
  10. 時系列予測の実行と結果の確認
  11. 予測データをエクスポート
  12. 作ったリソースの全削除

注意点

MFAを設定しているアカウントを使っていると実行途中でRefreshWithMFAUnsupportedError: Cannot refresh credentials: MFA token required.のようなエラーが出るかもしれません。

このエラーが出た場合、実行が不完全に終わっているかどうかをマネージメントコンソールで確認すると、実行が完了している事もございます。念のためそちらもご確認ください。(特に予測子の学習と予測結果のエクスポートは時間が掛かるので、出てくる可能性があります。)

マネージメントコンソールで確認して実行が正常に完了している場合は、もう一度Clientオブジェクト作成のコード部分を実行して新しくセッションを作成し直して実行してください。

モジュールのインポート

import json
import util
import boto3
import pandas as pd
import dateutil

今回必要なモジュールを最初にインポートしておきます。ここで最後の削除まで必要なモジュールをインポートしています。

Clientオブジェクトの作成

region = 'ap-northeast-1'
session = boto3.Session(region_name=region)
forecast = session.client(service_name='forecast')
forecastquery = session.client(service_name='forecastquery')
s3 = session.client(service_name='s3')

# 作ったセッションで繋がるか確認
assert forecast.list_predictors()

最初にregionという変数で、どのリージョンに対して実行していくのかを格納しています。こちらの変数はS3バケットを作成などに使用しています。

boto3.Sessionオブジェクト作成やclientメソッドを実行する際にアクセスキーとシークレットアクセスキーをaws_access_key_idとaws_secret_access_keyという引数で渡す事が可能ではありますが、誤ってリポジトリに上げてしまうなどの事故の元ですので、今回はこちらを書かないように実装しています。

IAM roleの作成

既に同じ名前で存在すれば、IAMロールのArnを変数へ格納のみ行ないます。

role_name = <お好きな名前を設定ください>
print(f"{role_name}という名前でIAMロール作成中...")
role_arn = util.get_or_create_iam_role( role_name = role_name )

print(f"IAMロール名{role_arn.split('/')[1]}を作成完了です。")

ローカルにある時系列データをS3バケットへアップロード

resourceオブジェクトであれば、Bucketオブジェクトを作成し、creation_dateというメンバ変数にアクセスして、作成済みかどうかを確認できます。

しかし、resourceオブジェクトは今後のメンテナンスがされないというアナウンスがあるため、Clientオブジェクトで確認出来るように全バケット名を取得する関数を作成しています。

# 全バケットの名前を取得
def list_bucket_names(s3_client)->list:
    buckets_name = []
    for bucket in [*s3_client.list_buckets()['Buckets']]:
        buckets_name.append(bucket['Name'])
    return buckets_name

次に、S3バケットの作成とファイルのアップロードを行ないます。 ここでやっている処理としては、設定したバケット名が存在しているかチェックして、存在していなければバケットの作成を行ない、その後にファイルのアップロードをupload_fileメソッドを使って行ないます。

key="electricityusagedata.csv"

electricity_df = pd.read_csv(key, dtype = object, names=['timestamp','target_value', 'item_id'])

print(electricity_df.head(3))

bucket_name = <お好きな名前を設定ください> # bucketの名前は大文字やスペースを受け付けない
print(f"\nS3バケット'{bucket_name}'へ'{key}'ファイルをアップロード中...")

buckets_name = list_bucket_names(s3)
if bucket_name not in buckets_name:
    if region != "us-east-1":
        s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region})
    else:
        s3.create_bucket(Bucket=bucket_name)

s3.upload_file(Filename='./electricityusagedata.csv',Bucket=bucket_name,Key=key)
ts_s3_path = f"s3://{bucket_name}/{key}"

print(f"\nS3バケットの{ts_s3_path}へアップロード完了です。")

ここで、リージョンがus-east-1であればcreate_bucketメソッドにデフォルトで設定されていますが、それ以外のリージョンであればCreateBucketConfiguration引数を使って明示的に設定する必要があります。

Datasetの作成

前回ブログとはここが少し手順が異なります。その理由としては、Dataset GroupとDatasetを自分で対応づけしないといけないからです。

update_dataset_groupというメソッドを使えば、Dataset GroupとDatasetをそれぞれ作成した後で対応付けも可能です。

対応付けには他の方法もあり、Dataset Groupを作成する際に引数でDataset Arnを渡す事が可能です。そちらの方がコード量が少し減るので今回はDatasetから作成を行ないます。

DATASET_FREQUENCY = "H" # 時間単位を設定(Hはhourのこと)
TS_DATASET_NAME = "ELECTRICITY_USAGE" # お好きな名前に変更ください
TS_SCHEMA = {
   "Attributes":[
      {
         "AttributeName":"timestamp",
         "AttributeType":"timestamp"
      },
      {
         "AttributeName":"target_value",
         "AttributeType":"float"
      },
      {
         "AttributeName":"item_id",
         "AttributeType":"string"
      }
   ]
}

create_dataset_response = forecast.create_dataset(Domain="CUSTOM",
                                                  DatasetType='TARGET_TIME_SERIES',
                                                  DatasetName=TS_DATASET_NAME,
                                                  DataFrequency=DATASET_FREQUENCY,
                                                  Schema=TS_SCHEMA)

ts_dataset_arn = create_dataset_response['DatasetArn']
describe_dataset_response = forecast.describe_dataset(DatasetArn=ts_dataset_arn)

print(f"DatasetのArnは{ts_dataset_arn}です。現在のステータスは {describe_dataset_response['Status']}です。")

Dataset Import Jobの作成

ここでは上で作成したDatasetに対して、時系列データのインポートを行ないます。 Datasetへのインポートには、データのフォーマット指定やどのS3バケットにファイルを保存しているのかも含めて記載する必要があります。

TIMESTAMP_FORMAT = "yyyy-MM-dd hh:mm:ss"
TS_IMPORT_JOB_NAME = "ELECTRICITY_USAGE_IMPORT" # お好きな名前に変更ください

ts_dataset_import_job_response = \
    forecast.create_dataset_import_job(DatasetImportJobName=TS_IMPORT_JOB_NAME,
                                       DatasetArn=ts_dataset_arn,
                                       DataSource= {
                                         "S3Config" : {
                                             "Path": ts_s3_path,
                                             "RoleArn": role_arn
                                         } 
                                       },
                                       TimestampFormat=TIMESTAMP_FORMAT
                                       )

ts_dataset_import_job_arn = ts_dataset_import_job_response['DatasetImportJobArn']
describe_dataset_import_job_response = forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn)

print(f"Arn {ts_dataset_import_job_arn}のDatasetImportJobがアクティブになるのを待ちます。しばらくお待ちください。\n\n現在のステータス:")

status = util.wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn))

describe_dataset_import_job_response = forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn)
print(f"\n\nArn {ts_dataset_import_job_arn}のDataset Import Job のステータスが {describe_dataset_import_job_response['Status']}になりました。")

Dataset Groupを作成

ここでDataset Groupを作成します。先に作成したDatasetとの対応付けには、create_dataset_groupメソッドの引数DatasetArnsにDatasetのArnを指定します。これで対応付けが出来ます。

DATASET_GROUP_NAME = "ELECTRICITY_USAGE_DATSET_GROUP" # お好きな名前に変更ください
DATASET_ARNS = [ts_dataset_arn]

create_dataset_group_response = \
    forecast.create_dataset_group(Domain="CUSTOM",
                                  DatasetGroupName=DATASET_GROUP_NAME,
                                  DatasetArns=DATASET_ARNS)

dataset_group_arn = create_dataset_group_response['DatasetGroupArn']
describe_dataset_group_response = forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn)

print(f"Arn {dataset_group_arn}のDataset Groupは現在{describe_dataset_group_response['Status']}になりました。")

AutoPredictorを使って予測子の学習

AutoPredictorを使って学習するにはcreate_auto_predictorメソッドを使用します。 学習時にはデータの頻度(時間単位で集めたものなのか)などを設定する必要があります。

また、ForecastHorizonという引数で予測をどこまで行なうかを設定できます。

PREDICTOR_NAME = "ELECTRICITY_USAGE_PREDICTOR" # お好きな名前に変更ください
FORECAST_HORIZON = 24
FORECAST_FREQUENCY = "H"

create_auto_predictor_response = \
    forecast.create_auto_predictor(PredictorName = PREDICTOR_NAME,
                                   ForecastHorizon = FORECAST_HORIZON,
                                   ForecastFrequency = FORECAST_FREQUENCY,
                                   DataConfig = {
                                       'DatasetGroupArn': dataset_group_arn, 
                                    },
                                    )

predictor_arn = create_auto_predictor_response['PredictorArn']
print(f"Arn {predictor_arn}の予測子がACTIVE状態になるのを待ちます。 学習に使っているデータサイズによって掛かる時間が変わります。しばらくお待ちください。\n\n現在のStatus:")

status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn))

describe_auto_predictor_response = forecast.describe_auto_predictor(PredictorArn=predictor_arn)
print(f"\n\nArn {predictor_arn}の予測子は現在 {describe_auto_predictor_response['Status']}状態になりました。")

学習にはしばらく時間が掛かるため、冒頭の方でも書いたようにMFA認証を有効化している場合、セッションが切れる可能性があります。マネージメントコンソールで確認すると次のように、作成できている事が確認出来ます。

予測子のメトリクスを確認

マネージメントコンソールでも確認出来ますが、コードからでも確認出来ます。

get_accuracy_metrics_response = forecast.get_accuracy_metrics(PredictorArn=predictor_arn)
wql = get_accuracy_metrics_response['PredictorEvaluationResults'][0]['TestWindows'][0]['Metrics']['WeightedQuantileLosses']
accuracy_scores = get_accuracy_metrics_response['PredictorEvaluationResults'][0]['TestWindows'][0]['Metrics']['ErrorMetrics'][0]

print(f"Weighted Quantile Loss (wQL): {json.dumps(wql, indent=2)}\n\n")

print(f"Root Mean Square Error (RMSE): {accuracy_scores['RMSE']}\n\n")

print(f"Weighted Absolute Percentage Error (WAPE): {accuracy_scores['WAPE']}\n\n")

print(f"Mean Absolute Percentage Error (MAPE): {accuracy_scores['MAPE']}\n\n")

print(f"Mean Absolute Scaled Error (MASE): {accuracy_scores['MASE']}\n")

メトリクスの結果は次のようになりました。

Weighted Quantile Loss (wQL): [
  {
    "Quantile": 0.9,
    "LossValue": 0.056634522209731405
  },
  {
    "Quantile": 0.5,
    "LossValue": 0.14265278464297812
  },
  {
    "Quantile": 0.1,
    "LossValue": 0.10436949161604908
  }
]

Root Mean Square Error (RMSE): 383.5410134250451
Weighted Absolute Percentage Error (WAPE): 0.1433429522943954
Mean Absolute Percentage Error (MAPE): 0.7292837731438584
Mean Absolute Scaled Error (MASE): 1.7022411288860237

前回マネージメントコンソールで実行した時と大きな差は無く、上手く学習が出来ていそうです。

時系列予測の実行と結果の確認

学習させた予測子を使って、予測の実行とその結果の確認を行ないます。 コードが長くなるので、予測の実行と予測結果の確認の2つにコードを分けています。

時系列予測の実行

FORECAST_NAME = "ELECTRICITY_USAGE_FORECAST" # お好きな名前に変更ください

create_forecast_response = \
    forecast.create_forecast(ForecastName=FORECAST_NAME,
                             PredictorArn=predictor_arn)

forecast_arn = create_forecast_response['ForecastArn']
print(f"Arn {forecast_arn}の予測のステータスがACTIVEになるまで待ちます。 データの大きさによって時間が掛かりますので、しばらくお待ちください。\n\n現在のStatus:")

status = util.wait(lambda: forecast.describe_forecast(ForecastArn=forecast_arn))

describe_forecast_response = forecast.describe_forecast(ForecastArn=forecast_arn)
print(f"\n\nArn {forecast_arn}の予測が現在{describe_forecast_response['Status']}です。")

予測結果をグラフ化して確認

予測結果の可視化を行ないます。forecastqueryのClientを使ってquery_forecastメソッドの引数FiltersにアイテムのIDを設定する事で任意のアイテムに対する予測結果を取得する事が可能です。

ITEM_ID = "client_48"

forecast_response = forecastquery.query_forecast(
    ForecastArn=forecast_arn,
    Filters={"item_id": ITEM_ID}
)

forecasts_p10_df = pd.DataFrame.from_dict(forecast_response['Forecast']['Predictions']['p10'])
forecasts_p50_df = pd.DataFrame.from_dict(forecast_response['Forecast']['Predictions']['p50'])
forecasts_p90_df = pd.DataFrame.from_dict(forecast_response['Forecast']['Predictions']['p90'])

results_df = pd.DataFrame(columns=['timestamp', 'value', 'source'])

for index, row in forecasts_p10_df.iterrows():
    clean_timestamp = dateutil.parser.parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p10'} , ignore_index=True)
for index, row in forecasts_p50_df.iterrows():
    clean_timestamp = dateutil.parser.parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p50'} , ignore_index=True)
for index, row in forecasts_p90_df.iterrows():
    clean_timestamp = dateutil.parser.parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p90'} , ignore_index=True)

pivot_df = results_df.pivot(columns='source', values='value', index="timestamp")

pivot_df.plot(figsize=(15, 7))

# 予測結果をエクスポートするためローカルに保存
results_df.to_csv(f'./forecast_{ITEM_ID}.csv')

上のコードで予測結果を確認した結果が次の画像です。マネージメントコンソールで確認していた時と同じように予測の結果をグラフ化して確認する事ができました。

amazon_forecast_result_used_sdk

予測データをエクスポート

作成し、ローカルマシンへ保存した予測結果をS3へアップロード(エクスポート)します。DBなどに格納する事で分析が出来ます。

export_key=f"export_data/forecast_{ITEM_ID}.csv"

s3.upload_file(Filename=f'./forecast_{ITEM_ID}.csv',Bucket=bucket_name,Key=export_key)

ここでは、エクスポート用の階層としてexport_dataというものを用意しています。指定した階層が無い場合は新規作成されます。 Amazon Forecast S3 export folder with sdk

作ったリソースの全削除

最後に、今回作ったAWSリソースを全て削除していきましょう。ここまで進めてきて作成したAWSリソースをマネージメントコンソールで確認します。

作成したリソースの確認

Dataset Group

Amazon Forecast Dataset Group with sdk

Dataset

Dataset Import Job

Amzon Forecast Dataset Import with sdk

Predictor(予測子)

Amazon Forecast Predictor with sdk

Forecast(予測結果)

Amazon Forecast Forecast with sdk

S3

Amazon Forecast final status S3 with sdk

IAMロール

サービスの検索窓からIAMを検索して自身で設定したIAMロールの名前を左側の検索窓で検索ください。 IAM Role Search text box

リソースの削除

delete_resource_treeメソッドでは関連するリソースも含めて削除できます。

# Amazon Forecast関連の削除
forecast.delete_resource_tree(ResourceArn = dataset_group_arn)
forecast.delete_resource_tree(ResourceArn = ts_dataset_arn)

# IAMロール削除
util.delete_iam_role(role_name)

# util.delete_iam_roleメソッドで削除出来なかった場合はこちらをコメントアウト外して実行
# iam = session.client('iam')
# iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonS3FullAccess", RoleName = role_name )
# iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonForecastFullAccess", RoleName = role_name )
# iam.delete_role(RoleName=role_name)


# 全オブジェクトを削除
s3.delete_objects(
    Bucket=bucket_name,
    Delete={
        'Objects': [
            {
                'Key': key, # 予測子の学習に使用したファイル
            },
            {
                'Key':export_key, # 予測結果をエクスポートしたファイル
            },
        ],
    }
)
# バケットを削除
s3.delete_bucket(Bucket=bucket_name)

実行完了後、削除出来ているかマネージメントコンソールでご確認ください。表示されていない(つまり削除済みである)事が確認出来るかと思います。

終わりに

今回はSDKを使ってAmazon Forecastを操作してみました。簡単に操作ができますし、各種設定をする必要があるので、サービスへの理解を深めるという勉強のためにやってみるのも良いかもしれません。

今回はここまで。

それでは、また!

参考にしたサイト