サンプルコードで学ぶAmazon Forecast – 機械学習 on AWS Advent Calendar 2019

こんにちは、Mr.Moです。

当エントリは『機械学習 on AWS Advent Calendar 2019』の8日目のエントリです。

下記のForecast関連の記事を見てAmazon Forecastに非常に興味が湧いたので、早速触ってみました。

https://dev.classmethod.jp/referencecat/amazon-forecast/

今回は「Amazon Forecast Samples」にあるノートブックを参考に触っていきます。

さっそくノートブックを進めていく

下記はAmazon Forecast APIを詳しく知るために用意されているものです。 その中でもアイテムメタデータセットを使用している下記のノートブックをベースに作業を進めて行きます。

https://github.com/aws-samples/amazon-forecast-samples/blob/master/notebooks/7.Incorporating_Item_Metadata_Dataset_to_your_Predictor.ipynb

また、前提としてAWS SageMaker上で作業をしています。

ライブラリのインポート

最初にboto3を最新のバージョンにしておきます。

!pip uninstall boto3 -y
!pip install boto3
!pip show boto3

image.png

必要なライブラリや設定をしておきましょう。 util.fcst_utilsはgithub上に置いてあるpythonファイルです。

%load_ext autoreload
%autoreload 2
from util.fcst_utils import *
import warnings
import boto3
import s3fs
plt.rcParams['figure.figsize'] = (15.0, 5.0)
warnings.filterwarnings('ignore')


region = 'us-west-2'
bucket = 'item-metadata-demo'
version = 'prod'

session = boto3.Session(region_name='us-west-2') 
forecast = session.client(service_name='forecast') 
forecast_query = session.client(service_name='forecastquery')

role_arn = get_or_create_role_arn()


freq = "H"
forecast_horizon = 24
timestamp_format = "yyyy-MM-dd HH:mm:ss"
delimiter = ','

データセットの準備

UCI machine learning repositoryで公開されているデータセットの中から、各家庭の電力消費量データセットを使います。使用状況データは時間単位で集計されているようです。

time_series_df = pd.read_csv("./data/item-demand-time.csv", dtype = object, names = ['timestamp', 'target_value', 'item'])
time_series_df.head(3)

image.png

今回はメタデータ(アイテムメタデータ)のデータセットも扱います。アイテムメタデータはターゲットの時系列データに適用可能なカテゴリの情報を提供します。たとえば、特定の商品の売上を予測している場合、ブランド、色、ジャンルなどの商品の属性は アイテムメタデータの一部になります。今回はアイテムのメタデータとしてregionを使用します。

item_metadata_df = pd.read_csv("./data/item-meta.csv", dtype = object, names = ['item', 'region'])
item_metadata_df.head(3)

image.png

ちなみにアイテムメタデータデータセットを使用するには、ターゲットの時系列データのアイテム項目のセットとアイテムメタデータに存在するアイテム項目のセットが同じである必要があります。

target_items = set(time_series_df["item"].tolist())
metadata_items = set(item_metadata_df["item"].tolist())
print(target_items)
print(metadata_items)
assert len(target_items - metadata_items) == 0, "items do not match"

image.png

s3にデータをアップロード

Amazon Forecast はトレーニングデータのインポートにCSVファイルを利用できますが、そのCSVファイルはS3のバケットに保存されている必要があります。

下記は参考情報です。

s3 = session.client('s3')
account_id = boto3.client('sts').get_caller_identity().get('Account')

bucket_name = f"amazon-forecast-data-{account_id}"
key = "item_metadata_demo_small"

s3.upload_file(Filename="./data/item-demand-time.csv", Bucket = bucket_name, Key = f"{key}/target.csv")
s3.upload_file(Filename="./data/item-meta.csv", Bucket = bucket_name, Key = f"{key}/metadata.csv")

データセットおよびデータセットグループの作成

データセットをグルーピングするためのデータセットグループを作成します。

dataset_group = f"{project}_gp_{idx}"
dataset_arns = []
create_dataset_group_response = forecast.create_dataset_group(DatasetGroupName=dataset_group,
                                                              DatasetArns=dataset_arns,
                                                              Domain="CUSTOM")

logging.info(f'Creating dataset group {dataset_group}')

dataset_group_arn = create_dataset_group_response['DatasetGroupArn']

forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn)

データセットには下記のようにJSON形式でマッピングされたスキーマ定義が必要です。 詳細は下記を参考にしてください。

<br />ts_dataset_name = f"{project}_ts_{idx}"
print(ts_dataset_name)

ts_schema_val = [{"AttributeName": "timestamp", "AttributeType": "timestamp"},
                {"AttributeName": "target_value", "AttributeType": "float"},
                {"AttributeName": "item_id", "AttributeType": "string"}]
ts_schema = {"Attributes": ts_schema_val}

logging.info(f'Creating target dataset {ts_dataset_name}')


response = forecast.create_dataset(Domain="CUSTOM",
                               DatasetType='TARGET_TIME_SERIES',
                               DatasetName=ts_dataset_name,
                               DataFrequency=freq,
                               Schema=ts_schema
                              )

ts_dataset_arn = response['DatasetArn']

forecast.describe_dataset(DatasetArn=ts_dataset_arn)

メタデータの方も同様に。

meta_dataset_name = f"{project}_meta_{idx}"
print(meta_dataset_name)

meta_schema_val = [{"AttributeName": "item_id", "AttributeType": "string"},
              {"AttributeName": "region_id", "AttributeType": "string"}]
meta_schema = {"Attributes": meta_schema_val}

logging.info(f'Creating item_metadata dataset {meta_dataset_name}')

response = forecast.create_dataset(Domain="CUSTOM",
                               DatasetType='ITEM_METADATA',
                               DatasetName=meta_dataset_name,
                               Schema=meta_schema
                              )

meta_dataset_arn = response['DatasetArn']

forecast.describe_dataset(DatasetArn = meta_dataset_arn)

データセットグループにデータをインポート

s3にアップロードしたターゲットデータとアイテムデータをデータセットに追加するためのインポートジョブを作成します。これによりAmazon Forecast に学習用のデータをインポートできるようになります。

dataset_arns = []
dataset_arns.append(ts_dataset_arn)
dataset_arns.append(meta_dataset_arn)
forecast.update_dataset_group(DatasetGroupArn=dataset_group_arn, DatasetArns=dataset_arns)

forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn)
ts_s3_data_path = f"{s3_data_path}/target.csv"

ts_dataset_import_job_response = forecast.create_dataset_import_job(DatasetImportJobName=dataset_group,
                                                             DatasetArn=ts_dataset_arn,
                                                             DataSource= {
                                                                 "S3Config" : {
                                                                     "Path": ts_s3_data_path,
                                                                     "RoleArn": role_arn
                                                                 } 
                                                             },
                                                             TimestampFormat=timestamp_format)


ts_dataset_import_job_arn=ts_dataset_import_job_response['DatasetImportJobArn']

status = wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=ts_dataset_import_job_arn))
assert status
meta_s3_data_path = f"{s3_data_path}/metadata.csv"

meta_dataset_import_job_response = forecast.create_dataset_import_job(DatasetImportJobName=dataset_group,
                                                             DatasetArn=meta_dataset_arn,
                                                             DataSource= {
                                                                 "S3Config" : {
                                                                     "Path": meta_s3_data_path,
                                                                     "RoleArn": role_arn
                                                                 } 
                                                             })

meta_dataset_import_job_arn=meta_dataset_import_job_response['DatasetImportJobArn']

status = wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=meta_dataset_import_job_arn))
assert status

予測子の作成

Amazon Forecast では、予測子と呼ばれる予測モデルをトレーニングしていきます。 予測子を作成する際に、アルゴリズムを選択しますがここでは DeepAR+ アルゴリズムを使用します。これは、カテゴリ機能をサポートするアルゴリズムになります。

algorithm_arn = 'arn:aws:forecast:::algorithm/'

algorithm = 'Deep_AR_Plus'
algorithm_arn_deep_ar_plus = algorithm_arn + algorithm
predictor_name_deep_ar = f'{project}_{algorithm.lower()}_{idx}'

logging.info(f'[{predictor_name_deep_ar}] Creating predictor {predictor_name_deep_ar} ...')

create_predictor_response = forecast.create_predictor(PredictorName=predictor_name_deep_ar,
                                                  AlgorithmArn=algorithm_arn_deep_ar_plus,
                                                  ForecastHorizon=forecast_horizon,
                                                  PerformAutoML=False,
                                                  PerformHPO=False,
                                                  InputDataConfig= {"DatasetGroupArn": dataset_group_arn},
                                                  FeaturizationConfig= {"ForecastFrequency": freq}
                                                 )

predictor_arn_deep_ar = create_predictor_response['PredictorArn']

status = wait(lambda: forecast.describe_predictor(PredictorArn=predictor_arn_deep_ar))
assert status

forecast.describe_predictor(PredictorArn=predictor_arn_deep_ar)

ちなみに上記の設定でPerformAutoMLにTrueを指定すると、AutoML機能を利用することができます。 詳しくは下記の記事もご覧ください。

AutoMLを利用した場合は全アルゴリズムのメトリクスを取得できるので例えば下記のように比較することも容易ですね。

acc = forecast.get_accuracy_metrics(PredictorArn=predictor_arn_deep_ar)

import seaborn as sns
import pprint
import json

scores = pd.DataFrame(columns=['predictor', 'RMSE'])

for a in acc['PredictorEvaluationResults']:
    key, item = a.items()
    score = a['TestWindows'][0]['Metrics']['RMSE']
    scores = scores.append(pd.DataFrame({'predictor':[key[1].split('/')[1]], 'RMSE':[score]}), ignore_index=True)

fig = sns.barplot(data=scores, x='predictor', y='RMSE').set_title('Root Mean Square Error')

image.png

予測の作成

Amazon Forecast の予測子を作成したら、CreateForecast オペレーションを呼び出して予測を作成します。

ForecastTypesに分位数(quantile)を設定できるので「'0.1', '0.3', '0.5', '0.7', '0.9'」と指定してみました。(設定しなかった場合のデフォルトは「"0.1", "0.5", "0.9"」)

後ほど出てくるグラフでも確認していきます。

logging.info(f"Done fetching accuracy numbers. Creating forecaster for DeepAR+ ...")


forecast_name_deep_ar = f'{project}_deep_ar_plus_{idx}'


create_forecast_response_deep_ar = forecast.create_forecast(ForecastName=forecast_name_deep_ar,
                                                            PredictorArn=predictor_arn_deep_ar,
                                                            ForecastTypes=['0.1', '0.3', '0.5', '0.7', '0.9'])

forecast_arn_deep_ar = create_forecast_response_deep_ar['ForecastArn']

status = wait(lambda: forecast.describe_forecast(ForecastArn=forecast_arn_deep_ar))
assert status


forecast.describe_forecast(ForecastArn=forecast_arn_deep_ar)

予測結果の確認

グラフの右側に表示されているP◯◯が今後の電力消費量の予測を表したものです。ちょっと同じ時間帯で見ていきましょう。 これまでの電力消費量(actual)の傾向を参考にするとP70のラインで電力消費を見込むのが良さそうでしょうか?電気は生活インフラなのでP50より下のラインで見込むと電気が足りなくなる可能性が出て危なそうです。

item_id = 'client_111'

forecastResponse = forecast_query.query_forecast(
    ForecastArn=forecast_arn_deep_ar,
    Filters={"item_id": item_id})
actual_df = pd.read_csv("../data/item-demand-time-validation.csv", names=['timestamp','value','item'])
actual_df.sort_values('timestamp', ascending=False).head(5)

actual_df = actual_df[(actual_df['timestamp'] >= '2014-11-28') & (actual_df['timestamp'] < '2014-11-29')]
actual_df = actual_df[(actual_df['item'] == item_id)]
actual_df.head()

prediction_df_p10 = pd.DataFrame.from_dict(forecastResponse['Forecast']['Predictions']['p10'])
prediction_df_p30 = pd.DataFrame.from_dict(forecastResponse['Forecast']['Predictions']['p30'])
prediction_df_p50 = pd.DataFrame.from_dict(forecastResponse['Forecast']['Predictions']['p50'])
prediction_df_p70 = pd.DataFrame.from_dict(forecastResponse['Forecast']['Predictions']['p70'])
prediction_df_p90 = pd.DataFrame.from_dict(forecastResponse['Forecast']['Predictions']['p90'])

results_df = pd.DataFrame(columns=['timestamp', 'value', 'source'])

from dateutil.parser import parse

for index, row in actual_df.iterrows():
    clean_timestamp = parse(row['timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['value'], 'source': 'actual'} , ignore_index=True)

# Now add the P10, P30, P50, P70 and P90 Values
for index, row in prediction_df_p10.iterrows():
    clean_timestamp = parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p10'} , ignore_index=True)
for index, row in prediction_df_p30.iterrows():
    clean_timestamp = parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p30'} , ignore_index=True)
for index, row in prediction_df_p50.iterrows():
    clean_timestamp = parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p50'} , ignore_index=True)
for index, row in prediction_df_p70.iterrows():
    clean_timestamp = parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p70'} , ignore_index=True)
for index, row in prediction_df_p90.iterrows():
    clean_timestamp = parse(row['Timestamp'])
    results_df = results_df.append({'timestamp' : clean_timestamp , 'value' : row['Value'], 'source': 'p90'} , ignore_index=True)

results_df.head()

pivot_df = results_df.pivot(columns='source', values='value', index="timestamp")

pivot_df.head()

pivot_df.plot()

image.png

予測のエクスポート

予測結果はCSVファイルにエクスポートする事が可能です。s3上にエクスポートされます。 出力された予測結果を見てみるとアイテムメタデータの「region」情報も入っていますね。

forecast_export_name_deep_ar = f'{project}_forecast_export_deep_ar_plus_{idx}'
forecast_export_name_deep_ar_path = f"{s3_data_path}/{forecast_export_name_deep_ar}"

create_forecast_export_response_deep_ar = forecast.create_forecast_export_job(ForecastExportJobName=forecast_export_name_deep_ar,
                                                        ForecastArn=forecast_arn_deep_ar,
                                                        Destination={
                                                            "S3Config" : {
                                                                "Path": forecast_export_name_deep_ar_path,
                                                                "RoleArn": role_arn
                                                            }
                                                        })
forecast_export_arn_deep_ar = create_forecast_export_response_deep_ar['ForecastExportJobArn']

forecast.describe_forecast_export_job(ForecastExportJobArn=forecast_export_arn_deep_ar)

image.png

まとめ

今回のようにメタデータを用いることで、カテゴリにより時系列データをグループ化することも可能ですね。そこからグループ同士を比較するといった見方もできそうです。 Amazon Forecast は非常に簡単に使うことができるので、色んなデータで試してみたくなりますね!