SageMakerの推論パイプラインについて調べてみた

概要

SageMakerでの推論パイプライン構築を試してみたので、備忘録的に記述しておきます。
「推論パイプラインってどんなものかよくわからない」、「やってみたいけどイマイチ内容を理解できていない」、といった方向けです。

まずは「推論パイプラインとは」といったあたりから整理して、その後にAWSが用意してくれているサンプルを使って理解を深めていきます。

目次

推論パイプラインとは

機械学習で推論するためには、生のデータをそのまま使うのではなく「前処理」が必要になることがとても多いです。
また、推論した結果を、後続のシステムが使いやすい形に「後処理」することも多いです。

そこで、「前処理」、「推論」、「後処理」等の処理を一連の流れとして実行できるようにしておく必要があるのですが、このように「複数の処理を指定した順番で一連の流れで処理させる」機能をSageMakerでは「推論パイプライン」として実装できます。

推論パイプラインを使用して、事前処理、予測、および後処理のデータサイエンスタスクを組み合わせることができます。

参照:推論パイプラインのデプロイ

推論パイプラインを使うと何が嬉しいのか

機械学習モデルは、前処理、推論、後処理のプロセスが変わり得るものです。
機械学習におけるそれぞれのプロセスが変わっても、「機械学習に付随するリソース以外」の修正範囲をできるだけ小さくしておけるように設計しておくことは、マイクロサービス化を考える上では重要でしょう。

上記のようなマイクロサービス的な設計を考慮する際に、SageaMakerの推論パイプラインを利用することは1つの方法として選択肢にいれることができます。 (機械学習の何かしらの処理が変わった際に、推論エンドポイントのパイプラインモデルをアップデートすればOK、という設計が可能になるので)

ただし、パイプラインの一部分のみを変更することはできないため、変更するならパイプライン全体をアップデートする必要がある、という点には注意してください。

パイプラインモデルは変更不可能ですが、UpdateEndpoint オペレーションを使用して新しいモデルをデプロイすることにより、推論パイプラインを更新できます。このモジュール性により、実験中の柔軟性が高まります。

参照:推論パイプラインのデプロイ

また、SageMakerの推論パイプラインを使うと「1つのインスタンスで処理を完結」させることができるので、「処理ごとにインスタンスが異なる」場合と比べると余計な通信が減るので処理も早くなる可能性があり、料金も削減できそうです。

パイプラインモデルをデプロイすると、Amazon SageMaker はエンドポイントまたは変換ジョブの各 Amazon Elastic Compute Cloud (Amazon EC2) インスタンスにすべてのコンテナをインストールし、実行します。機能の処理と推測は低レイテンシーで実行されます。これは、コンテナが同じ EC2 インスタンスに配置されているためです。

参照:推論パイプラインのデプロイ

ざっくりとした実装の流れ

ここまでで、SageMakerで推論パイプラインがどのような機能を果たしてくれるのか、について整理しました。
続いて、推論パイプラインの実装の流れをざっくりと整理していきます。

まず、「推論パイプライン」は2〜5個のコンテナを、

推論パイプラインは、データに対する推論要求を処理する 2〜5 個のコンテナの順番で構成される Amazon SageMaker モデルです。

参照:推論パイプラインのデプロイ

1つの推論エンドポイントにデプロイし、指定した順番で処理をしてくれる機能です。

パイプラインモデルをデプロイすると、Amazon SageMaker はエンドポイントまたは変換ジョブの各 Amazon Elastic Compute Cloud (Amazon EC2) インスタンスにすべてのコンテナをインストールし、実行します。機能の処理と推測は低レイテンシーで実行されます。これは、コンテナが同じ EC2 インスタンスに配置されているためです。

参照:推論パイプラインのデプロイ

なので、パイプライン内の処理内容ごとにそれぞれコンテナを用意する必要があります。
しかしながら、コンテナについては全部が全部自前で用意する必要はありません。
例えば、「SparkML Serving」のコンテナはAWSが公開している物を利用することもできます。
下記はAWSがリージョンごとに公開している「SparkML Serving」のECRのイメージ一覧です。

us-west-1 = 746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2
us-west-2 = 246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2
us-east-1 = 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-sparkml-serving:2.2
us-east-2 = 257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-sparkml-serving:2.2
ap-northeast-1 = 354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2
ap-northeast-2 = 366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2
ap-southeast-1 = 121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2
ap-southeast-2 = 783357654285.dkr.ecr.ap-southeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2
ap-south-1 = 720646828776.dkr.ecr.ap-south-1.amazonaws.com/sagemaker-sparkml-serving:2.2
eu-west-1 = 141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2
eu-west-2 = 764974769150.dkr.ecr.eu-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2
eu-central-1 = 492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2
ca-central-1 = 341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2
us-gov-west-1 = 414596584902.dkr.ecr.us-gov-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2

参考:With AWS SDK

例えば、「botoのcreate_model」APIの「PrimaryContainer」、もしくは「Containers」引数に上記のイメージを指定して利用することが可能です。

詳細についてはこちらをご参照ください。

If you are using AWS Java SDK or Boto to call SageMaker APIs, then you can pass the SageMaker provided Docker images for this container in all region as part of the CreateModel API call in the PrimaryContainer or Containers field.

参考:With AWS SDK

また、「SparkML Serving」のコンテナを「SageMaker Python SDK」で利用する場合は、「PipelineModel」の中の1つの処理として「SparkMLModel」を組み込むことでパイプラインの処理の1つとして指定することも可能です。 (この辺りはイメージがつきずらいかと思いますが、後ほどサンプルを実行していく過程で理解を深めていきます)

If you are using SageMaker Python SDK, then you need to pass an instance of SparkMLModel as one of the models in the PipelineModel instance that you will create.

参考:Using it in an inference pipeline

さて、実装の流れをざっくりとですが確認しました。
続いて、サンプルを実際にやってみて理解を深めていきます。

やってみた

SageMakerで推論パイプラインを実装するために、今回実施していることについてざっくりと整理します。

1.前処理をするMLeapバンドルファイルをS3にアップロード

まずは、Glueスクリプトを実行するところからです。

このGlueスクリプトでは、「データの前処理」、「前処理のMLeapバンドルファイルを作成」、「S3にMLeapバンドルファイルをアップロード」しています。
このMLeapバンドルファイルは、推論パイプラインに利用しますので後ほどまた出てきます。
(なので、一度MLeapバンドルファイルを作成した後はGlueは使いません。そのため、このMLeapバンドルファイルが用意できていればGlueを使う必要もないです)

また、このGlueスクリプトでは「MLeap」を利用しているので、こちらの通りに、依存関係性のあるパッケージをS3に事前にアップロードしています。

こんな感じ

!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

python_dep_location = sess.upload_data(path='python.zip', bucket=default_bucket, key_prefix='dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', bucket=default_bucket, key_prefix='dependencies/jar')

Glueの処理内容の要点についても少しだけ言及しておきます。

Glue上で複数の処理を束ねてパイプラインとしてモデルを生成して...

# The pipeline comprises of the steps added above
pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

# This step trains the feature transformers. We need to serialize this model with MLeap and save to S3
model = pipeline.fit(total_df)

モデルをバンドルファイル化して...

# Serialize and store the model via MLeap  
SimpleSparkSerializer().serializeToBundle(model, "jar:file:/tmp/model.zip", validation_df)

S3にこのバンドルファイルを出力しています。

s3 = boto3.resource('s3') 
file_name = os.path.join(args['S3_MODEL_KEY_PREFIX'], 'model.tar.gz')
s3.Bucket(args['S3_MODEL_BUCKET']).upload_file('/tmp/model.tar.gz', file_name)

2.上記で前処理したデータを使ってxgboostで学習したモデル(ファイル)をS3にアップロード

「1.前処理をするMLeapバンドルファイルをS3にアップロード」で前処理をしたデータをS3に準備できているので、このデータを使ってビルトインアルゴリズムのxgboostを使ってモデルを学習させます。

この点については、特に言及する点はありません。

xgb_model.fit(inputs=data_channels, logs=True)

3.パイプライン処理するモデルを作成する

「1」、「2」でS3にアップロードされたファイルを使って、SageMakerの「推論パイプライン」用の「PipelineModel」を作成します。 (最後の「sm_model」変数)

from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel


## 「1.前処理をするMLeapバンドルファイルをS3にアップロード」で作成したMLeapバンドルファイルのS3パス
sparkml_data = 's3://{}/{}/{}'.format(s3_model_bucket, s3_model_key_prefix, 'model.tar.gz')

## 「MLeapバンドルファイル」、「INPUT&OUTPUTのスキーマ情報」を指定した上で、SparkMLModelをイニシャライズ
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})

## xgboostのモデル(sagemaker.model.Model)
xgb_model = Model(model_data=xgb_model.model_data, image=training_image)

## パイプラインのモデル名
model_name = 'inference-pipeline-' + timestamp_prefix

## パイプラインモデルの初期化(models引数にList形式でパイプライン処理したいモデルを順番に指定する)
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

ここで、推論パイプラインを使う上では「複数のコンテナをエンドポイントにデプロイ」とあるにも関わらず、「sparkml_model」については特にコンテナイメージを指定していない点が気になる方もいるかと思いますが、これは「SparkMLModel」でイニシャライズした際に、コンテナのイメージが自動的に指定されているためです。
(xgboostの方は、ちゃんと「image=training_image」コンテナイメージが指定されていますね)

        region_name = (sagemaker_session or Session()).boto_region_name
        image = "{}/{}:{}".format(registry(region_name, framework_name), repo_name, spark_version)
        super(SparkMLModel, self).__init__(
            model_data,
            image,
            role,
            predictor_cls=SparkMLPredictor,
            sagemaker_session=sagemaker_session,
            **kwargs
        )

参考:model.py

また、処理実行の順番は「PipelineModel」作成時に「models」引数で指定した通りですね。

4.推論

リアルタイム推論、バッチ変換のいずれにおいても、「3」のモデルを利用することができます。
(SageMakerの「モデル」として利用できるので)
いずれの場合においても、1つのエンドポイント内で処理が完結します。

例えばリアルタイム推論の場合は、こんな感じですね。

## エンドポイントのデプロイ
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

## 推論
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV)
print(predictor.predict(payload))

バッチ変換の場合はこんな感じ

from sagemaker.content_types import CONTENT_TYPE_CSV
import sagemaker

## セッション
sess = sagemaker.Session()


## 変数の指定
input_data_path  = 's3://<your-input-data-path>'
output_data_path = 's3://<your-output-data-path>'
job_name         = '<your-batch-transform-job-name>'
model_name       = '<your-pipeline-model-name>'


## バッチ変換JOB用Classの作成
transformer = sagemaker.transformer.Transformer(
    # This was the model created using PipelineModel and it contains feature processing and XGBoost
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sess,
    accept = CONTENT_TYPE_CSV
)


## バッチ変換JOBの実行
transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = CONTENT_TYPE_CSV, 
                      split_type = 'Line')

## 終わるのを待つ
transformer.wait()

今回の確認内容については、以上で終了です。

まとめ

推論パイプラインについて調べてみました。
今までもなんとなく存在は認識していたのですが、ちゃんと自分の手を動かして調べることでより理解を深めることができました。

推論パイプラインはとても便利そうですが、標準化等のステートフルな前処理が必要な場合は少し注意が必要ですね。

そのような場合のベストプラクティスはまだ整理しきれていないのですが、一番単純な対応としては「開発時に必要なパラメータを確認して、パイプライン作成時に値を指定して実装する」あたりになるのかもしれません。

いい方法があるよ、という方がいたら教えていただけると嬉しいです。
長くなってしまいましたが、ここまで読んでくれてありがとうございます。

参考

今回実施したサンプル
推論パイプラインのデプロイ
With AWS SDK
botoのcreate_model
PipelineModel
SparkMLModel
Using it in an inference pipeline
Glueで依存パッケージを利用する MLeap
Amazon SageMakerでPipelineを使って推論を行う #reinvent