Amazon Forecastに再入門してSDKを使って時系列予測
皆さん、こんにちは。
クルトンです!
前回のブログに引き続き、Amazon Forecastの再入門ブログです。 今回は前回のブログでやっていた内容を「SDKを使って動かすには」という事で実行していきます。
一連の流れを実行する際の前提
時系列予測に使っているファイル
公式ドキュメントで配布されている個々の家庭の電力消費のデータセットを使用します。ローカルマシンへダウンロードして、zipファイルを解凍してください。
utilモジュールについて
公式GitHubからフォルダごとローカルへ持ってくるようお願いいたします。
実行環境
ローカルマシンに~/.aws/credentials
と~/.aws/config
を設定しています。
設定方法についてはこちらをご確認ください。
本ブログの一連の流れ
前回のブログと同じく、S3バケットの作成からスタートして、作ったリソースの削除までを行ないます。 前回とは少し流れが違うところもありますが、作成するAWSリソースは同じです。
- モジュールのインポート
- Clientオブジェクトの作成
- IAM roleの作成
- ローカルにある時系列データをS3バケットへアップロード
- Datasetの作成
- Dataset Import Jobの作成
- Dataset Groupを作成
- AutoPredictorを使って予測子の学習
- 予測子のメトリクスを確認
- 時系列予測の実行と結果の確認
- 予測データをエクスポート
- 作ったリソースの全削除
注意点
MFAを設定しているアカウントを使っていると実行途中でRefreshWithMFAUnsupportedError: Cannot refresh credentials: MFA token required.
のようなエラーが出るかもしれません。
このエラーが出た場合、実行が不完全に終わっているかどうかをマネージメントコンソールで確認すると、実行が完了している事もございます。念のためそちらもご確認ください。(特に予測子の学習と予測結果のエクスポートは時間が掛かるので、出てくる可能性があります。)
マネージメントコンソールで確認して実行が正常に完了している場合は、もう一度Clientオブジェクト作成のコード部分を実行して新しくセッションを作成し直して実行してください。
モジュールのインポート
import json import util import boto3 import pandas as pd import dateutil
今回必要なモジュールを最初にインポートしておきます。ここで最後の削除まで必要なモジュールをインポートしています。
Clientオブジェクトの作成
region = 'ap-northeast-1' session = boto3.Session(region_name=region) forecast = session.client(service_name='forecast') forecastquery = session.client(service_name='forecastquery') s3 = session.client(service_name='s3') # 作ったセッションで繋がるか確認 assert forecast.list_predictors()
最初にregion
という変数で、どのリージョンに対して実行していくのかを格納しています。こちらの変数はS3バケットを作成などに使用しています。
boto3.Session
オブジェクト作成やclient
メソッドを実行する際にアクセスキーとシークレットアクセスキーをaws_access_key_idとaws_secret_access_keyという引数で渡す事が可能ではありますが、誤ってリポジトリに上げてしまうなどの事故の元ですので、今回はこちらを書かないように実装しています。
IAM roleの作成
既に同じ名前で存在すれば、IAMロールのArnを変数へ格納のみ行ないます。
role_name = <お好きな名前を設定ください> print(f"{role_name}という名前でIAMロール作成中...") role_arn = util.get_or_create_iam_role( role_name = role_name ) print(f"IAMロール名{role_arn.split('/')[1]}を作成完了です。")
ローカルにある時系列データをS3バケットへアップロード
resourceオブジェクトであれば、Bucketオブジェクトを作成し、creation_dateというメンバ変数にアクセスして、作成済みかどうかを確認できます。
しかし、resourceオブジェクトは今後のメンテナンスがされないというアナウンスがあるため、Clientオブジェクトで確認出来るように全バケット名を取得する関数を作成しています。
# 全バケットの名前を取得 def list_bucket_names(s3_client)->list: buckets_name = [] for bucket in [*s3_client.list_buckets()['Buckets']]: buckets_name.append(bucket['Name']) return buckets_name
次に、S3バケットの作成とファイルのアップロードを行ないます。
ここでやっている処理としては、設定したバケット名が存在しているかチェックして、存在していなければバケットの作成を行ない、その後にファイルのアップロードをupload_file
メソッドを使って行ないます。
key="electricityusagedata.csv" electricity_df = pd.read_csv(key, dtype = object, names=['timestamp','target_value', 'item_id']) print(electricity_df.head(3)) bucket_name = <お好きな名前を設定ください> # bucketの名前は大文字やスペースを受け付けない print(f"\nS3バケット'{bucket_name}'へ'{key}'ファイルをアップロード中...") buckets_name = list_bucket_names(s3) if bucket_name not in buckets_name: if region != "us-east-1": s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region}) else: s3.create_bucket(Bucket=bucket_name) s3.upload_file(Filename='./electricityusagedata.csv',Bucket=bucket_name,Key=key) ts_s3_path = f"s3://{bucket_name}/{key}" print(f"\nS3バケットの{ts_s3_path}へアップロード完了です。")
ここで、リージョンがus-east-1であればcreate_bucket
メソッドにデフォルトで設定されていますが、それ以外のリージョンであればCreateBucketConfiguration
引数を使って明示的に設定する必要があります。
Datasetの作成
前回ブログとはここが少し手順が異なります。その理由としては、Dataset GroupとDatasetを自分で対応づけしないといけないからです。
update_dataset_groupというメソッドを使えば、Dataset GroupとDatasetをそれぞれ作成した後で対応付けも可能です。
対応付けには他の方法もあり、Dataset Groupを作成する際に引数でDataset Arnを渡す事が可能です。そちらの方がコード量が少し減るので今回はDatasetから作成を行ないます。
DATASET_FREQUENCY = "H" # 時間単位を設定(Hはhourのこと) TS_DATASET_NAME = "ELECTRICITY_USAGE" # お好きな名前に変更ください TS_SCHEMA = { "Attributes":[ { "AttributeName":"timestamp", "AttributeType":"timestamp" }, { "AttributeName":"target_value", "AttributeType":"float" }, { "AttributeName":"item_id", "AttributeType":"string" } ] } create_dataset_response = forecast.create_dataset(Domain="CUSTOM", DatasetType='TARGET_TIME_SERIES', DatasetName=TS_DATASET_NAME, DataFrequency=DATASET_FREQUENCY, Schema=TS_SCHEMA) ts_dataset_arn = create_dataset_response['DatasetArn'] describe_dataset_response = forecast.describe_dataset(DatasetArn=ts_dataset_arn) print(f"DatasetのArnは{ts_dataset_arn}です。現在のステータスは {describe_dataset_response['Status']}です。")
Dataset Import Jobの作成
ここでは上で作成したDatasetに対して、時系列データのインポートを行ないます。 Datasetへのインポートには、データのフォーマット指定やどのS3バケットにファイルを保存しているのかも含めて記載する必要があります。
TIMESTAMP_FORMAT = "yyyy-MM-dd hh:mm:ss" TS_IMPORT_JOB_NAME = "ELECTRICITY_USAGE_IMPORT" # お好きな名前に変更ください ts_dataset_import_job_response = \ forecast.create_dataset_import_job(DatasetImportJobName=TS_IMPORT_JOB_NAME, DatasetArn=ts_dataset_arn, DataSource= { "S3Config" : { "Path": ts_s3_path, "RoleArn": role_arn } }, TimestampFormat=TIMESTAMP_FORMAT ) ts_dataset_import_job_arn = ts_dataset_import_job_response['DatasetImportJobArn'] describe_dataset_import_job_response = forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn) print(f"Arn {ts_dataset_import_job_arn}のDatasetImportJobがアクティブになるのを待ちます。しばらくお待ちください。\n\n現在のステータス:") status = util.wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn)) describe_dataset_import_job_response = forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn) print(f"\n\nArn {ts_dataset_import_job_arn}のDataset Import Job のステータスが {describe_dataset_import_job_response['Status']}になりました。")
Dataset Groupを作成
ここでDataset Groupを作成します。先に作成したDatasetとの対応付けには、create_dataset_group
メソッドの引数DatasetArns
にDatasetのArnを指定します。これで対応付けが出来ます。
DATASET_GROUP_NAME = "ELECTRICITY_USAGE_DATSET_GROUP" # お好きな名前に変更ください DATASET_ARNS = [ts_dataset_arn] create_dataset_group_response = \ forecast.create_dataset_group(Domain="CUSTOM", DatasetGroupName=DATASET_GROUP_NAME, DatasetArns=DATASET_ARNS) dataset_group_arn = create_dataset_group_response['DatasetGroupArn'] describe_dataset_group_response = forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn) print(f"Arn {dataset_group_arn}のDataset Groupは現在{describe_dataset_group_response['Status']}になりました。")
AutoPredictorを使って予測子の学習
AutoPredictorを使って学習するにはcreate_auto_predictor
メソッドを使用します。
学習時にはデータの頻度(時間単位で集めたものなのか)などを設定する必要があります。
また、ForecastHorizon
という引数で予測をどこまで行なうかを設定できます。
PREDICTOR_NAME = "ELECTRICITY_USAGE_PREDICTOR" # お好きな名前に変更ください FORECAST_HORIZON = 24 FORECAST_FREQUENCY = "H" create_auto_predictor_response = \ forecast.create_auto_predictor(PredictorName = PREDICTOR_NAME, ForecastHorizon = FORECAST_HORIZON, ForecastFrequency = FORECAST_FREQUENCY, DataConfig = { 'DatasetGroupArn': dataset_group_arn, }, ) predictor_arn = create_auto_predictor_response['PredictorArn'] print(f"Arn {predictor_arn}の予測子がACTIVE状態になるのを待ちます。 学習に使っているデータサイズによって掛かる時間が変わります。しばらくお待ちください。\n\n現在のStatus:") status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn)) describe_auto_predictor_response = forecast.describe_auto_predictor(PredictorArn=predictor_arn) print(f"\n\nArn {predictor_arn}の予測子は現在 {describe_auto_predictor_response['Status']}状態になりました。")
学習にはしばらく時間が掛かるため、冒頭の方でも書いたようにMFA認証を有効化している場合、セッションが切れる可能性があります。マネージメントコンソールで確認すると次のように、作成できている事が確認出来ます。
予測子のメトリクスを確認
マネージメントコンソールでも確認出来ますが、コードからでも確認出来ます。
get_accuracy_metrics_response = forecast.get_accuracy_metrics(PredictorArn=predictor_arn) wql = get_accuracy_metrics_response['PredictorEvaluationResults'][0]['TestWindows'][0]['Metrics']['WeightedQuantileLosses'] accuracy_scores = get_accuracy_metrics_response['PredictorEvaluationResults'][0]['TestWindows'][0]['Metrics']['ErrorMetrics'][0] print(f"Weighted Quantile Loss (wQL): {json.dumps(wql, indent=2)}\n\n") print(f"Root Mean Square Error (RMSE): {accuracy_scores['RMSE']}\n\n") print(f"Weighted Absolute Percentage Error (WAPE): {accuracy_scores['WAPE']}\n\n") print(f"Mean Absolute Percentage Error (MAPE): {accuracy_scores['MAPE']}\n\n") print(f"Mean Absolute Scaled Error (MASE): {accuracy_scores['MASE']}\n")
メトリクスの結果は次のようになりました。
Weighted Quantile Loss (wQL): [ { "Quantile": 0.9, "LossValue": 0.056634522209731405 }, { "Quantile": 0.5, "LossValue": 0.14265278464297812 }, { "Quantile": 0.1, "LossValue": 0.10436949161604908 } ] Root Mean Square Error (RMSE): 383.5410134250451 Weighted Absolute Percentage Error (WAPE): 0.1433429522943954 Mean Absolute Percentage Error (MAPE): 0.7292837731438584 Mean Absolute Scaled Error (MASE): 1.7022411288860237
前回マネージメントコンソールで実行した時と大きな差は無く、上手く学習が出来ていそうです。
時系列予測の実行と結果の確認
学習させた予測子を使って、予測の実行とその結果の確認を行ないます。 コードが長くなるので、予測の実行と予測結果の確認の2つにコードを分けています。
時系列予測の実行
FORECAST_NAME = "ELECTRICITY_USAGE_FORECAST" # お好きな名前に変更ください create_forecast_response = \ forecast.create_forecast(ForecastName=FORECAST_NAME, PredictorArn=predictor_arn) forecast_arn = create_forecast_response['ForecastArn'] print(f"Arn {forecast_arn}の予測のステータスがACTIVEになるまで待ちます。 データの大きさによって時間が掛かりますので、しばらくお待ちください。\n\n現在のStatus:") status = util.wait(lambda: forecast.describe_forecast(ForecastArn=forecast_arn)) describe_forecast_response = forecast.describe_forecast(ForecastArn=forecast_arn) print(f"\n\nArn {forecast_arn}の予測が現在{describe_forecast_response['Status']}です。")
予測結果をグラフ化して確認
予測結果の可視化を行ないます。forecastqueryのClientを使ってquery_forecast
メソッドの引数Filters
にアイテムのIDを設定する事で任意のアイテムに対する予測結果を取得する事が可能です。
ITEM_ID = "client_48" forecast_response = forecastquery.query_forecast( ForecastArn=forecast_arn, Filters={"item_id": ITEM_ID} ) forecasts_p10_df = pd.DataFrame.from_dict(forecast_response['Forecast']['Predictions']['p10']) forecasts_p50_df = pd.DataFrame.from_dict(forecast_response['Forecast']['Predictions']['p50']) forecasts_p90_df = pd.DataFrame.from_dict(forecast_response['Forecast']['Predictions']['p90']) results_df = pd.DataFrame(columns=['timestamp', 'value', 'source']) for index, row in forecasts_p10_df.iterrows(): clean_timestamp = dateutil.parser.parse(row['Timestamp']) results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p10'} , ignore_index=True) for index, row in forecasts_p50_df.iterrows(): clean_timestamp = dateutil.parser.parse(row['Timestamp']) results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p50'} , ignore_index=True) for index, row in forecasts_p90_df.iterrows(): clean_timestamp = dateutil.parser.parse(row['Timestamp']) results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p90'} , ignore_index=True) pivot_df = results_df.pivot(columns='source', values='value', index="timestamp") pivot_df.plot(figsize=(15, 7)) # 予測結果をエクスポートするためローカルに保存 results_df.to_csv(f'./forecast_{ITEM_ID}.csv')
上のコードで予測結果を確認した結果が次の画像です。マネージメントコンソールで確認していた時と同じように予測の結果をグラフ化して確認する事ができました。
予測データをエクスポート
作成し、ローカルマシンへ保存した予測結果をS3へアップロード(エクスポート)します。DBなどに格納する事で分析が出来ます。
export_key=f"export_data/forecast_{ITEM_ID}.csv" s3.upload_file(Filename=f'./forecast_{ITEM_ID}.csv',Bucket=bucket_name,Key=export_key)
ここでは、エクスポート用の階層としてexport_data
というものを用意しています。指定した階層が無い場合は新規作成されます。
作ったリソースの全削除
最後に、今回作ったAWSリソースを全て削除していきましょう。ここまで進めてきて作成したAWSリソースをマネージメントコンソールで確認します。
作成したリソースの確認
Dataset Group
Dataset
Dataset Import Job
Predictor(予測子)
Forecast(予測結果)
S3
IAMロール
サービスの検索窓からIAM
を検索して自身で設定したIAMロールの名前を左側の検索窓で検索ください。
リソースの削除
delete_resource_tree
メソッドでは関連するリソースも含めて削除できます。
# Amazon Forecast関連の削除 forecast.delete_resource_tree(ResourceArn = dataset_group_arn) forecast.delete_resource_tree(ResourceArn = ts_dataset_arn) # IAMロール削除 util.delete_iam_role(role_name) # util.delete_iam_roleメソッドで削除出来なかった場合はこちらをコメントアウト外して実行 # iam = session.client('iam') # iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonS3FullAccess", RoleName = role_name ) # iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonForecastFullAccess", RoleName = role_name ) # iam.delete_role(RoleName=role_name) # 全オブジェクトを削除 s3.delete_objects( Bucket=bucket_name, Delete={ 'Objects': [ { 'Key': key, # 予測子の学習に使用したファイル }, { 'Key':export_key, # 予測結果をエクスポートしたファイル }, ], } ) # バケットを削除 s3.delete_bucket(Bucket=bucket_name)
実行完了後、削除出来ているかマネージメントコンソールでご確認ください。表示されていない(つまり削除済みである)事が確認出来るかと思います。
終わりに
今回はSDKを使ってAmazon Forecastを操作してみました。簡単に操作ができますし、各種設定をする必要があるので、サービスへの理解を深めるという勉強のためにやってみるのも良いかもしれません。
今回はここまで。
それでは、また!