Amazon SageMakerでPipelineを使って推論を行う #reinvent

こんにちは、小澤です。

re:Invent 2018期間中に、SageMakerのPlipelineModelという機能が公開されました。 今回は、これがどのような仕組みでどう動くのかをサンプルを見ながら確認していきたいと思います。

PipelineModelとは

このPipelineModelとはどういったものなのかを最初に簡単に説明したいと思います。

SageMakerでBuilt-inアルゴリズムを利用する際のフローを例に考えてみましょう。 通常、利用する場合は以下のような流れになるかと思います。

  • データに対する前処理や特徴抽出、フォーマットの変換などを行いS3に保存する
  • Built-inアルゴリズムを選択して学習処理を実行
  • 作成されたモデルを使った推論エンドポイントの作成

これらの処理の前段にはデータの収集、後段にはエンドポイントを使って推論を行う処理などが入りますが割愛しています。

機械学習の本体となる学習処理以外の部分はすべて、別途処理を実行している状態となっています。 これは、ライブラリのインターフェースの設計としては汎用的なものになっていますが、実際に機械学習をしようと思うと困る点もあります。 推論を行う際にAPIに与えるデータに対して学習時と同じ加工を事前に行っておく必要がある、という場面は多々あります。 それらの処理を学習用のコードと推論APIを実行するコードで2重に書いてしまうと問題がありますし、中には、データの平均や分散など同じ処理を実行するために必要なパラメータを持つものもありその管理も煩雑になりがちです。

scikit-learnやSparkのMLLibといった機械学習ライブラリではそれらの一連の流れをPipelineという形で定義しておくことが可能になっています。

一連処理のフローとしては、以下のような形式になります。

この図において、モデルのみを保存している状態だと処理A, Bは推論時も別途実装する必要が生じますが、処理フローそのものを保存しておけば推論時にもデータを与えて呼び出すのみとなります。 このフローそのもの一連の流れとして定義してを保存しておくのがPipelineとなります。

SageMakerにおけるPipeline

さて、このPipelineのような仕組みをSageMakerで実現するにはどうすればいいでしょうか?

Built-in以外の手法であれば、処理内容を記述したスクリプトにまとめてしまえばよさそうです。 それこそ、sckit-learnコンテナであればsckit-learnのPipelineがそのまま使えるでしょう。 しかしSageMakerの性質を考えるとそれでは、汎用性に欠けます。

SageMakerでは"ノートブックインスタンスとは異なるコンテナで学習する"という特徴をいかした仕組みでPipeline機能を実現しています。

先ほどの図でいうと、処理Aを実行するモデル、処理Bを実行するモデル、機械学習処理を実行するモデル、という3つのモデルを用意します。 それに対して、PipelineModelというかたちでそれぞれの処理のためのコンテナを立ち上げて、一つのエンドポイントでそれらを順に実行していくという仕組みで実現しているわけです。

サンプルで実際の処理フローを見てみる

では、実際のサンプルでその流れを見てみましょう。 今回見ていくサンプルは SAGEMAKER PYTHON SDK > Inference Pipeline with Scikit-learn and Linear Learner.ipynbとなります。

ノートブックの内容

このノートブックでは、scikit-learnの機能を使って特徴抽出を行うコンテナとBuilt-inのLinearLearnerを実行する2つを利用しています。

まずはいつもの準備から始めます。

# S3 prefix
s3_bucket = '< ENTER BUCKET NAME HERE >'
prefix = 'Scikit-LinearLearner-pipeline-abalone-example'

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

次にデータの取得をしています。

!wget --directory-prefix=./abalone_data https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

ここで利用するデータはXGBoostのサンプルでも使われているアワビの年齢予測データになります。

取得したデータはSageMakerで利用するためにそのままS3にアップロードしています。

WORK_DIRECTORY = 'abalone_data'

train_input = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'abalone.csv'),
    bucket=s3_bucket,
    key_prefix='{}/{}'.format(prefix, 'train'))

続いて、sckit-learnを使って特徴変換の処理を行ったのち、バッチ変換ジョブとしてデータの一括での変換処理を行っています。

from sagemaker.sklearn.estimator import SKLearn

script_path = 'sklearn_abalone_featurizer.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session)

sklearn_preprocessor.fit({'train': train_input})

# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1,
    instance_type='ml.m4.xlarge',
    assemble_with = 'Line',
    accept = 'text/csv')

# Preprocess training input
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

具体的な処理内容はentry_pointで指定したスクリプトの解説の際に行います。 この処理によって生成されるデータは後続のLinearLearnerの入力となるので、出力先のS3のパスをpreprocessed_trainに格納しています。

さて、この処理は機械学習を行ってモデルを作成するわけではありません。 先ほどの図でいうところの処理Aや処理Bに該当します。 ではfitでの学習は何を行っているのでしょうか? ここではコンテナから利用するモデルデータの生成を行っています。

scikit-learnコンテナを使った機械学習処理そのものについては以下の記事のような流れになっています。

script modeでの動作となっているため、Estimatorのfit関数を呼び出した際に学習とモデル保存の処理が実行されます。

今回ほしいのは学習結果のモデルではなく、変換処理の内容をシリアライズしたものですが、この仕組みの上にそれを乗せるために、fit関数でそれを生成しているというわけです。

また、バッチ変換の処理を行っている部分についても解説します。 SageMakerでPipelineModelを利用する場合、推論時にそれぞれのコンテナの処理を順に実行していく、と最初に解説しました。 推論時は全体を1つにまとめて処理が実行可能ですが、学習時には1つ1つのコンテナで実行する処理と利用するモデルを定義してやる必要があります。

そのため

  • 処理A用のモデルをfit関数で作成 → 入力データをバッチ変換したものをS3に出力
  • 処理Aの出力を入力として、処理B用のモデルをfit関数でモデルを作成 -> バッチ変換したものをS3に出力
  • 処理Bの出力を入力として、機械学習処理のモデルをfit関数で作成

のような流れを順に行う必要があるわけです。 そのため、ここではscikit-learnコンテナで行う特徴抽出処理のfitとtransform処理を行っているわけです。

では、続いてこの処理の出力を使ってLinearLearnerでの学習処理を実行しましょう。 この部分は、通常の使い方と同様になります。

import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri

# LinearLearnerのコンテナ名取得
ll_image = get_image_uri(boto3.Session().region_name, 'linear-learner')

# モデルの出力先を設定
s3_ll_output_key_prefix = "ll_training_output"
s3_ll_output_location = 's3://{}/{}/{}/{}'.format(s3_bucket, prefix, s3_ll_output_key_prefix, 'll_model')

# Estimatorインスタンスの作成
ll_estimator = sagemaker.estimator.Estimator(
    ll_image,
    role,
    train_instance_count=1,
    train_instance_type='ml.m4.2xlarge',
    train_volume_size = 20,
    train_max_run = 3600,
    input_mode= 'File',
    output_path=s3_ll_output_location,
    sagemaker_session=sagemaker_session)

# ハイパーパラメータの設定
ll_estimator.set_hyperparameters(feature_dim=10, predictor_type='regressor', mini_batch_size=32)

# 先ほどのバッチ変換の出力先を入力として指定
ll_train_data = sagemaker.session.s3_input(
    preprocessed_train,
    distribution='FullyReplicated',
    content_type='text/csv',
    s3_data_type='S3Prefix')

# 学習処理の実行
data_channels = {'train': ll_train_data}
ll_estimator.fit(inputs=data_channels, logs=True)

これで変換後のデータを使った学習処理が実行されます。

ここまでで、PipelineModel作成に必要ものは整いました。 scikit-learnコンテナでの変換処理とLinearLearnerでの推論処理を順に行うPipelineの作成をします。

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# PipelineModelを作成するには各コンテナのモデルオブジェクトを渡す必要がある。
# モデルオブジェクトはEstimatorのcreate_model関数で取得可能。
scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = ll_estimator.create_model()

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model = PipelineModel(
    name=model_name,
    role=role,
    # model引数にリストでモデルオブジェクトを渡す
    models=[
        scikit_learn_inferencee_model,
        linear_learner_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

Pipelineの処理の流れを定義したPipelineModelを作成したのちは通常のモデルと同様にdeploy関数で推論エンドポイントが作成可能です。

最後にこのエンドポイントを使って推論を行ってみましょう。

from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = 'M, 0.44, 0.365, 0.125, 0.516, 0.2155, 0.114, 0.155'
actual_rings = 10
predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

print(predictor.predict(payload))

payloadとして指定している値はもちろん変換処理を行っていない最初のデータと同じ形式のものです。

こちらもエンドポイント名を指定してRealTimePredictorを作成するという他と変わらないやり方で推論エンドポイントを取得しています。 注意すべき点は入出力の形式として与えるcontent_type引数とaccept引数です。 これらは最初に実行されるコンテナの入力をcontent_type、最後のコンテナの出力をacceptと指定します。

出力結果は以下のようになりました。

b'{"predictions": [{"score": 9.528051376342773}]}'

特徴変換処理のスクリプト

では、scikit-learnコンテナ側で行っている学習処理についてみてみましょう。

まず、必要なライブラリをインポートします。

from __future__ import print_function

import time
import sys
from io import StringIO
import os
import shutil

import argparse
import csv
import json
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)

最初にmain部分の処理を行う前にデータの各列名と型を定義しています。 今回はCSVとして受け取りますが、これはヘッダなしのものとなるためここで定義しているわけです。

# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    'sex', # M, F, and I (infant)
    'length', # Longest shell measurement
    'diameter', # perpendicular to length
    'height', # with meat in shell
    'whole_weight', # whole abalone
    'shucked_weight', # weight of meat
    'viscera_weight', # gut weight (after bleeding)
    'shell_weight'] # after being dried

label_column = 'rings'

feature_columns_dtype = {
    'sex': str,
    'length': np.float64,
    'diameter': np.float64,
    'height': np.float64,
    'whole_weight': np.float64,
    'shucked_weight': np.float64,
    'viscera_weight': np.float64,
    'shell_weight': np.float64}

label_column_dtype = {'rings': np.float64} # +1.5 gives the age in years

では、main内の処理を見ていきましょう。 まずはデータを読み込みます。

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [ pd.read_csv(
        file,
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
    concat_data = pd.concat(raw_data)

argparseを使って引数を取得し、S3からCSVファイルをDataFrameとして取得する流れはscikit-learn単体で学習するときと同じです。

DataFrameのヘッダと型を定義するのに先ほど定義したものを利用してnamesおよびdtypeに与えています。 dypeで使っているmerge_two_dicts関数は以下のように定義されており、特徴量のdictとラベルのdictをマージしています。

def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

続いて、scikit-learnの機能を使って特徴変換を行っています。

    # 数値型の列情報の設定
    # すべての列の名前を取った後に文字列型であるsex列を取り除く
    numeric_features = list(feature_columns_names)
    numeric_features.remove('sex')
    # 数値型に対する特徴変換
    # 欠損値を中央値で保管するSimpleImputerと
    #スケールを平均0、標準偏差1に変換するStandardScalerを利用
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    # カテゴリカルな列情報の設定
    categorical_features = ['sex']
    # カテゴリカル変数に対する特徴変換
    # 欠損値には"missing"を入れ、
    # One-Hotなベクトルに変換する
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    # 定義した2つの特徴変換処理を行うColumnTransformerの作成
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)],
        remainder="drop")

    # 特徴変換処理の実行
    preprocessor.fit(concat_data)

    # 推論コンテナで利用するモデルデータとして保存
    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))

scikit-learnの機能をそのまま記述するという点では、通常のscikit-learnコンテナを使った学習処理と違いありません。 機械学習モデルのそのものではなく、Transeformerを保存しているのが特徴となります。

続いて、各種必要な関数を定義します。 まずは、scikit-learnコンテナを使う場合、model_fn関数でモデルをデシリアライズする方法を定義する必要がありました。

def model_fn(model_dir):
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

こちらはモデルを保存した際と同様読み込んで返しているだけですね。

scikit-learnコンテナではこれ以外に以下の3つの関数を任意で定義できます。

  • input_fn
  • predict_fn
  • output_fn

今回の場合、これらを個別に定義する必要があるので実装していきたいと思います。

input_fn関数

input_fn関数は推論時の入力データの受け取り方を定義する関数になります。 script modeでは、学習時には引数で受け取ったパスを利用して好きなように読み込めばいいのですが、推論時にもデータの受け取り方を指定したい場合この関数でそれを定義する必要があります。

処理内容は以下のようになっています。

# 引数として実際に受け取った値と
# データの形式が渡される
def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    # 受け取った形式ごとに処理を分岐
    # 今回はcsv形式のみを受け取るがjsonなどほかの形式にも対応したい場合はif-elseで分岐してそれぞれのパース方法を定義する
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), header=None)

        # 受け取る可能性のあるデータの列数は2種類ある
        # バッチ変換処理時は正解ラベルを含むデータ、
        # 推論時には含まないデータが与えられる
        # 変換を行う際に数値型かカテゴリカル型を列名で指定しているため、ヘッダなしで受け取ったcsvに対して列名を付与する必要がある
        # DataFrameの列名を設定する際に列数で判断して実際に与える値を決めている
        if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
            df.columns = feature_columns_names

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))

受け取ったデータをColumnTransformerが適切に処理可能なDataFrameに変換しています。

predict_fn関数

こちらは推論処理を行う処理を定義します。 モデルのpredict関数を呼び出すだけでいい場合は必要ないのですが、今回は変換処理なのでpredict関数ではなく、transform関数を利用する必要があるため定義しています。

# input_data引数は対象となるデータ(input_fnの戻り値)
# model引数は推論を行うモデル(mainの戻り値)
def predict_fn(input_data, model):
    """Preprocess input data

    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:

        rest of features either one hot encoded or standardized
    """
    # ColumnTransformerのtransform処理を実行
    features = model.transform(input_data)

    # LinerLearnerでは学習時に先頭列が正解ラベルである必要があるので、順番を入れ替える
    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features

こちらの処理内容自体はあまり難しいことはないかと思います。

output_fn関数

最後はoutput_fn関数となります。 acceptに指定されたフォーマットに合わせた形式でデータを返すように設定する関数です。 デフォルトではJSONとなっていますが、バッチ変換した結果はCSVとして出力してほしいのでその定義をしています。

def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """

    # 出力としてjsonが求められている場合はデータをjson形式に整形
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), mimetype=accept)
    # csv形式が求められている場合はcsvで出力
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))

json出力を作成するのが人力ですが、if-elseで分岐しているだけということがわかるかと思います。

レスポンスはsagemaker_containers.beta.framework.workerを使って行えます。

おわりに

今回は、SageMakerのサンプルでPipelineModelを作成する方法を見ていきました。

処理する内容が多くなるので、どうしても全体として長いプログラムのように見えますが、1つ1つはSageMakerの基本的な機能の組み合わせであることが確認できるかと思います。 また、今回は解説していませんが、シリアライズしたオブジェクトをモデルデータとして出力できれば独自コンテナを作るなどして任意の特徴抽出ライブラリを利用することも可能です。