Amazon SageMakerのDeepARを使って電力消費量を予測する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、小澤です。

当エントリではAmazon SageMakerの組み込みアルゴリズムの1つである「DeepAR」についての解説を書かせていただきます。

目次

DeepARとは

DeepARはRNNを使った時系列予測の手法です。 RNNについては、Sequence to Sequenceの際に簡単な解説を記載していますのでそちらをご覧ください。

RNNでは系列データを扱うことが可能ですので、それを文字列の並びではなく時間の並びに対してのモデル作成に利用するのがDeepARとなっています。 詳細については、参考文献をご参照ください。

Exampleを実行してみる

では、Exampleベースでどのような動きをするのか見ていきましょう。

利用するExample

今回利用するExampleはノートブックの上部メニューから SageMaker Examples > Introduction to Amazon algorithms > DeepAR-Electricity.ipynb とたどったものになります。

データの準備

まずは、いつも通り学習をするための準備から始めます。 最初は必要なライブラリのインポートやロールの設定などを行います。

%matplotlib inline

import sys
from urllib.request import urlretrieve
import zipfile
from dateutil.parser import parse
import json
from random import shuffle
import random
import datetime
import os

import boto3
import s3fs
import sagemaker
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox
# 結果の再現性を担保するために乱数のシードを設定する
# 元のExampleではシードを42にしてるが、縁起のいい71に変更しておくw
np.random.seed(71)
random.seed(71)
sagemaker_session = sagemaker.Session()
# sagemaker.Session().default_bucket()ではSageMaker-xxxxxxxのような名前のバケットが生成される
# 都合が悪い場合はここの値を書き換える
s3_bucket = sagemaker.Session().default_bucket()
s3_prefix = 'deepar-electricity-demo-notebook'

role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)
# 学習用コンテナイメージの設定
image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, "forecasting-deepar", "latest")

この辺りはいつも通りな感じなので、特段解説は必要ないかと思います。 続いて、今回のExampleで利用するデータの取得を行います。

DATA_HOST = "https://archive.ics.uci.edu"
DATA_PATH = "/ml/machine-learning-databases/00321/"
ARCHIVE_NAME = "LD2011_2014.txt.zip"
# 解凍したテキストファイルの名前
FILE_NAME = ARCHIVE_NAME[:-4]
def progress_report_hook(count, block_size, total_size):
    mb = int(count * block_size // 1e6)
    if count % 500 == 0:
        sys.stdout.write("\r{} MB downloaded".format(mb))
        sys.stdout.flush()

if not os.path.isfile(FILE_NAME):
    print("downloading dataset (258MB), can take a few minutes depending on your connection")
    urlretrieve(DATA_HOST + DATA_PATH + ARCHIVE_NAME, ARCHIVE_NAME, reporthook=progress_report_hook)

    print("\nextracting data archive")
    zip_ref = zipfile.ZipFile(ARCHIVE_NAME, 'r')
    zip_ref.extractall("./")
    zip_ref.close()
else:
    print("File found skipping download")

まずは、データの取得先の設定を行なっています。 今回は、UCI Machine Learning Repositoryにあるデータを利用しています。

このデータは電力消費量の時系列データになっています。 内容は以下のようになっており、列ごとに1つのクライアントとなっています。

URLを設定してのち、データがダウンロードされていない状態であれば取得します。 取得したファイルはzip形式なので、これを解凍してtextファイルを取得します。

このデータでは15分間隔でデータを取得しています。 今回は2時間間隔でのデータを利用するため、変換を行なっています。

data = pd.read_csv(FILE_NAME, sep=";", index_col=0, parse_dates=True, decimal=',')
num_timeseries = data.shape[1]
data_kw = data.resample('2H').sum() / 8
timeseries = []
for i in range(num_timeseries):
    timeseries.append(np.trim_zeros(data_kw.iloc[:,i], trim='f'))

2時間間隔のデータに変換するため、Pandasのresampleメソッドを使って2時間ごとの平均値を計算しています。 その後、この後の処理で利用するため、列ごとのSeriesを入れたlitに変換しています。

続いてはデータの確認を行います。

fig, axs = plt.subplots(5, 2, figsize=(20, 20), sharex=True)
axx = axs.ravel()
for i in range(0, 10):
    timeseries[i].loc["2014-01-01":"2014-01-14"].plot(ax=axx[i])
    axx[i].set_xlabel("date")    
    axx[i].set_ylabel("kW consumption")   
    axx[i].grid(which='minor', axis='x')

先頭10列のデータの一定期間をグラフ表示します。 結果は以下のようになっています。

続いて、Estimatorで学習するための設定やデータの変換を行います。

# データの時間間隔
freq = '2H'

# どのくらい先まで予測するか
# 今回は時間間隔が2Hなので、1日で12個x7日
prediction_length = 7 * 12

# 学習時にどの範囲のデータを利用するか
# 先ほどのprediction_lengthに対して短すぎてはいけない
context_length = 7 * 12

データの時間間隔や学習・推論の際にどの範囲まで行うのかといった設定を行います。

start_dataset = pd.Timestamp("2014-01-01 00:00:00", freq=freq)
end_training = pd.Timestamp("2014-09-01 00:00:00", freq=freq)

この値を元に学習データ、テストデータを設定します。 学習データは先ほど指定したstart, endの範囲のデータにします。

training_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training - 1].tolist()  # We use -1, because pandas indexing includes the upper bound 
    }
    for ts in timeseries
]
print(len(training_data))

テストデータはそからかさらに先のデータまで利用します。

num_test_windows = 4

test_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training + k * prediction_length].tolist()
    }
    for k in range(1, num_test_windows + 1) 
    for ts in timeseries
]
print(len(test_data))

DeepARでは、入力がJSON Lines形式となるので、その形式でデータを保存します。

def write_dicts_to_file(path, data):
    with open(path, 'wb') as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode('utf-8'))
%%time
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", test_data)

最後にこれをS3にアップロードして準備が完了となります。

s3 = boto3.resource('s3')
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    path = '/'.join(split[3:])
    buk = s3.Bucket(bucket)
    
    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print('File s3://{}/{} already exists.\nSet override to upload anyway.\n'.format(s3_bucket, s3_path))
            return
        else:
            print('Overwriting existing file')
    with open(local_file, 'rb') as data:
        print('Uploading file to {}'.format(s3_path))
        buk.put_object(Key=path, Body=data)
%%time
copy_to_s3("train.json", s3_data_path + "/train/train.json")
copy_to_s3("test.json", s3_data_path + "/test/test.json")

S3のデータの一部を確認すると以下のようになっています。

s3filesystem = s3fs.S3FileSystem()
with s3filesystem.open(s3_data_path + "/train/train.json", 'rb') as fp:
    print(fp.readline().decode("utf-8")[:100] + "...")

学習の実行

さて、ではいよいよ学習を行なっていきましょう。 まずは、Estimatorのインスタンスを作成します。

estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_name=image_name,
    role=role,
    train_instance_count=1,
    train_instance_type='ml.c4.2xlarge',
    base_job_name='deepar-electricity-demo',
    output_path=s3_output_path
)

続いて、ハイパーパラメータの設定を行います。

hyperparameters = {
    "time_freq": freq,
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length)
}
estimator.set_hyperparameters(**hyperparameters)

先ほど設定してたcontent_length, prediction_lengthもここで指定しています。 この辺りは他の手法と変わらずといった感じですね。

利用可能なハイパーパラメータについてはドキュメントをご参照ください。

あとはfitで学習処理を実行するのみです。

%%time
data_channels = {
    "train": "{}/train/".format(s3_data_path),
    "test": "{}/test/".format(s3_data_path)
}

estimator.fit(inputs=data_channels, wait=True)

エンドポイント作成

学習が完了したら、続いてはエンドポイントの作成を行います。

まずは、Predictorのクラスを作成しています。 これは、何をしているかというと、通常DeepARではJSON形式でデータをやりとりします。 ここで作成したPredictorをエンドポイント作成時に指定しておくことで、PandasのSeriesを受け取ってDataFrameで結果を返すような仕組みにしています。 その相互変換をこのクラスの内部で行なっています。 クラス内部の実装については、

  • 受け取ったデータをJSONにエンコードする
  • エンコードしたデータで推論を行う
  • 行なった結果をデコードする

という流れをpredictメソッド内で実行するような流れとなっています。 推論時にpredictメソッドが呼び出される、という部分のみがSageMakerの仕様となっており、それ以外は通常のPythonコードなので詳細については割愛します。

class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, content_type=sagemaker.content_types.CONTENT_TYPE_JSON, **kwargs)
        
    def predict(self, ts, cat=None, dynamic_feat=None, 
                num_samples=100, return_samples=False, quantiles=["0.1", "0.5", "0.9"]):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.
        
        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])
        
        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + 1
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)
    
    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None)

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles
        }
        
        http_request_data = {
            "instances": [instance],
            "configuration": configuration
        }
        
        return json.dumps(http_request_data).encode('utf-8')
    
    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode('utf-8'))['predictions'][0]
        prediction_length = len(next(iter(predictions['quantiles'].values())))
        prediction_index = pd.DatetimeIndex(start=prediction_time, freq=freq, periods=prediction_length)        
        if return_samples:
            dict_of_samples = {'sample_' + str(i): s for i, s in enumerate(predictions['samples'])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(data={**predictions['quantiles'], **dict_of_samples}, index=prediction_index)

    def set_frequency(self, freq):
        self.freq = freq
        
def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]        

def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat        
    return obj

このPredictorを使ってエンドポイントを作成します。

predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    predictor_cls=DeepARPredictor)

predictor_clsを指定している以外は他の手法と同様です。

結果の確認

では、エンドポイントが作成されたので推論を行なってみましょう。

predictor.predict(ts=timeseries[120], quantiles=[0.10, 0.5, 0.90]).head()

結果は以下のようになります。

quantiles引数によって区間推定を行うことも可能になっています。

さて、時系列といえば、グラフで予測結果を出して欲しいですね? Jupyter Notebookの機能を使ってそれを実現しています。

def plot(
    predictor, 
    target_ts, 
    cat=None, 
    dynamic_feat=None, 
    forecast_date=end_training, 
    show_samples=False, 
    plot_history=7 * 12,
    confidence=80
):
    print("calling served model to generate predictions starting from {}".format(str(forecast_date)))
    assert(confidence > 50 and confidence < 100)
    low_quantile = 0.5 - confidence * 0.005
    up_quantile = confidence * 0.005 + 0.5
        
    # we first construct the argument to call our model
    args = {
        "ts": target_ts[:forecast_date],
        "return_samples": show_samples,
        "quantiles": [low_quantile, 0.5, up_quantile],
        "num_samples": 100
    }


    if dynamic_feat is not None:
        args["dynamic_feat"] = dynamic_feat
        fig = plt.figure(figsize=(20, 6))
        ax = plt.subplot(2, 1, 1)
    else:
        fig = plt.figure(figsize=(20, 3))
        ax = plt.subplot(1,1,1)
    
    if cat is not None:
        args["cat"] = cat
        ax.text(0.9, 0.9, 'cat = {}'.format(cat), transform=ax.transAxes)

    # call the end point to get the prediction
    prediction = predictor.predict(**args)

    # plot the samples
    if show_samples: 
        for key in prediction.keys():
            if "sample" in key:
                prediction[key].plot(color='lightskyblue', alpha=0.2, label='_nolegend_')
                
                
    # plot the target
    target_section = target_ts[forecast_date-plot_history:forecast_date+prediction_length]
    target_section.plot(color="black", label='target')
    
    # plot the confidence interval and the median predicted
    ax.fill_between(
        prediction[str(low_quantile)].index, 
        prediction[str(low_quantile)].values, 
        prediction[str(up_quantile)].values, 
        color="b", alpha=0.3, label='{}% confidence interval'.format(confidence)
    )
    prediction["0.5"].plot(color="b", label='P50')
    ax.legend(loc=2)    
    
    # fix the scale as the samples may change it
    ax.set_ylim(target_section.min() * 0.5, target_section.max() * 1.5)
    
    if dynamic_feat is not None:
        for i, f in enumerate(dynamic_feat, start=1):
            ax = plt.subplot(len(dynamic_feat) * 2, 1, len(dynamic_feat) + i, sharex=ax)
            feat_ts = pd.Series(
                index=pd.DatetimeIndex(start=target_ts.index[0], freq=target_ts.index.freq, periods=len(f)),
                data=f
            )
            feat_ts[forecast_date-plot_history:forecast_date+prediction_length].plot(ax=ax, color='g')
style = {'description_width': 'initial'}
@interact_manual(
    customer_id=IntSlider(min=0, max=369, value=91, style=style), 
    forecast_day=IntSlider(min=0, max=100, value=51, style=style),
    confidence=IntSlider(min=60, max=95, value=80, step=5, style=style),
    history_weeks_plot=IntSlider(min=1, max=20, value=1, style=style),
    show_samples=Checkbox(value=False),
    continuous_update=False
)
def plot_interact(customer_id, forecast_day, confidence, history_weeks_plot, show_samples):
    plot(
        predictor,
        target_ts=timeseries[customer_id],
        forecast_date=end_training + datetime.timedelta(days=forecast_day),
        show_samples=show_samples,
        plot_history=history_weeks_plot * 12 * 7,
        confidence=confidence
    )

このプログラムを実行すると以下のような出力が得られます。

上部のフォームから値を変更して、「Run Interact」を押すことで、その値でのグラフが表示されます。 Jupyterの機能となりますので、内容に関しての詳細は割愛しますが、変更された値を受け取ってその値で予測するような仕組みになっています。

おわりに

今回はSageMakerの組み込みアルゴリズムのうち、DeepARについてExampleの流れを追ってみました。 時系列分析は需要の多い領域であるため、興味を持たれた方も多いのではないでしょうか? 一方で扱いが難しいような場面も多くあったりするので、注意が必要な領域でもあります。

実は、今回はExampleの全てを解説しきれていません。 そのため、この後の処理に関してはPart2という形で紹介できればと思います。

参考文献