SageMakerのバッチ変換でPCAを実行してみた

バッチ変換用のインスタンスを起動して変換が実行できるのでシステム化するときにとても便利でした。
2023.06.12

データアナリティクス事業本部の鈴木です。

SageMakerのバッチ変換を使ってみたかったので、SageMaker Studioから試してみました。

バッチ変換とは

データセットを前処理や、大規模なデータセットに対する推論、永続的な推論エンドポイントが不要なときに使う機能になります。

よくJupyter Notebookなどでモデルの訓練を手動で実行し、テスト用の入力を使って推論結果を出力してみたりしますが、それをそのままSageMakerでしたいときに使うものですね。

バッチ変換に関しては以下のドキュメントに詳細の説明があります。

今回は『PCA および DBSCAN ムービークラスターを使用したバッチ変換』のサンプルを試してみました。上記ドキュメントのバッチ変換のサンプルノートブックにて紹介されているものになります。

バッチ変換など、SageMakerの推論の仕組みの使い分けは、以下のドキュメントに整理されているのでご確認ください。

前提

今回はSageMaker Studioから実行しました。実験に必要なドメイン・ユーザー・実行ロールは準備ができているものとします。

訓練およびテストデータはサンプルに記載の通り、Amazon Customer Reviews Datasetを使用しました。このデータはパブリックなS3バケット上から取得できます。推論結果などは自アカウントのバケットに保存したいため、そのためのバケットは自分で作成しました。

実行のイメージ

やってみた

1. Notebookの作成

SageMaker Studioへアクセスしました。

SageMaker Studioへのアクセス

FileよりNotebookを新規作成しました。

ノートの新規作成

カーネルを起動するために、リソースの設定を聞かれました。以下のデフォルト設定のままSelectを押すと起動しました。

起動設定

2. ライブラリおよびデータの読み込み

以降は、基本的に記事執筆時点で『PCA および DBSCAN ムービークラスターを使用したバッチ変換』に紹介されているコードを引用しています。試した結果など補足部分をあわせて記載します。

ライブラリの読み込み

まずライブラリをインポートしました。出力先のバケットは自分で指定したかったので、『Amazon SageMakerでS3のどこに何が出力されるかをちゃんと制御したい | DevelopersIO』を参考にsagemaker.Sessionで指定しました。

import sagemaker

sess = sagemaker.Session(default_bucket="作成した出力先S3バケット名")
bucket = sess.default_bucket()
prefix = "sagemaker/DEMO-batch-transform"
role = sagemaker.get_execution_role()


import boto3
import sagemaker
import sagemaker.amazon.common as smac
from sagemaker import image_uris
from sagemaker.transformer import Transformer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.serializers import CSVSerializer
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import scipy.sparse
import os
import json

なお、sagemakerライブラリのバージョンは2.145.0でした。

sagemaker.__version__
# 以下は表示結果
# '2.145.0'

データの取得と読み込み

Amazon Customer Reviews DatasetをパブリックのS3バケットから取得しました。

!mkdir /tmp/reviews/
!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz /tmp/reviews/

取得したデータをデータフレームとして読み込み、データの概要を確認しました。

df = pd.read_csv(
    "/tmp/reviews/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz",
    delimiter="\t",
    error_bad_lines=False,
)
df.head()

df = df[["customer_id", "product_id", "star_rating", "product_title"]]

df.head()では以下のように表示されました。

データ例

データの傾向の確認

customer_idproduct_idについてはユニークな値の個数のパーセンタイルを取得して分布について確認していました。

customers = df["customer_id"].value_counts()
products = df["product_id"].value_counts()

quantiles = [
    0,
    0.1,
    0.25,
    0.5,
    0.75,
    0.8,
    0.85,
    0.9,
    0.95,
    0.96,
    0.97,
    0.98,
    0.99,
    0.995,
    0.999,
    0.9999,
    1,
]
print("customers\n", customers.quantile(quantiles))
print("products\n", products.quantile(quantiles))

# 以下は表示結果
# customers
#  0.0000       1.0
# 0.1000       1.0
# 0.2500       1.0
# 0.5000       1.0
# 0.7500       2.0
# 0.8000       2.0
# 0.8500       3.0
# 0.9000       4.0
# 0.9500       5.0
# 0.9600       6.0
# 0.9700       7.0
# 0.9800       9.0
# 0.9900      13.0
# 0.9950      18.0
# 0.9990      37.0
# 0.9999      97.0
# 1.0000    2704.0
# Name: customer_id, dtype: float64
# products
#  0.0000        1.000
# 0.1000        1.000
# 0.2500        1.000
# 0.5000        3.000
# 0.7500        9.000
# ...
# 0.9990     1993.901
# 0.9999     7522.637
# 1.0000    32790.000
# Name: product_id, dtype: float64

3. 訓練・テストデータ作成

データを加工し、PCAを適用するのに適した形にしました。

customers = customers[customers >= 35]
products = products[products >= 20]

reduced_df = df.merge(pd.DataFrame({"customer_id": customers.index})).merge(
    pd.DataFrame({"product_id": products.index})
)

customers = reduced_df["customer_id"].value_counts()
products = reduced_df["product_id"].value_counts()

test_products = products.sample(frac=0.005)
train_products = products[~(products.index.isin(test_products.index))]

customer_index = pd.DataFrame(
    {"customer_id": customers.index, "user": np.arange(customers.shape[0])}
)
train_product_index = pd.DataFrame(
    {"product_id": train_products.index, "item": np.arange(train_products.shape[0])}
)
test_product_index = pd.DataFrame(
    {"product_id": test_products.index, "item": np.arange(test_products.shape[0])}
)

train_df = reduced_df.merge(customer_index).merge(train_product_index)
test_df = reduced_df.merge(customer_index).merge(test_product_index)

S3に加工したデータをアップロードしました。

train_sparse = scipy.sparse.csr_matrix(
    (
        np.where(train_df["star_rating"].values >= 4, 1, 0),
        (train_df["item"].values, train_df["user"].values),
    ),
    shape=(train_df["item"].nunique(), customers.count()),
)

test_sparse = scipy.sparse.csr_matrix(
    (
        np.where(test_df["star_rating"].values >= 4, 1, 0),
        (test_df["item"].values, test_df["user"].values),
    ),
    shape=(test_df["item"].nunique(), customers.count()),
)

np.savetxt("/tmp/reviews/train.csv", train_sparse.todense(), delimiter=",", fmt="%i")
np.savetxt("/tmp/reviews/test.csv", test_sparse.todense(), delimiter=",", fmt="%i")

train_s3 = sess.upload_data(
    "/tmp/reviews/train.csv", bucket=bucket, key_prefix="{}/pca/train".format(prefix)
)
test_s3 = sess.upload_data(
    "/tmp/reviews/test.csv", bucket=bucket, key_prefix="{}/pca/test".format(prefix)
)

ここまででS3バケットにデータが出力されたことを確認できました。

作成した訓練データ1

作成した訓練データ2

4. モデルのトレーニング

Use a SageMaker estimator to run a training jobも参考にしつつ、EstimatorのAPIを使ってモデルを訓練しました。

train_inputs = sagemaker.inputs.TrainingInput(train_s3, content_type="text/csv;label_size=0")

container = image_uris.retrieve(framework='pca',region=boto3.Session().region_name)

pca = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path="s3://{}/{}/pca/output".format(bucket, prefix),
    sagemaker_session=sess,
)

pca.set_hyperparameters(
    feature_dim=customers.count(),
    num_components=100,
    subtract_mean=True,
    algorithm_mode="randomized",
    mini_batch_size=500,
)

pca.fit({"train": train_inputs})

# 以下は表示結果
# INFO:sagemaker:Creating training-job with name: pca-2023-05-21-05-18-47-173
# 2023-05-21 05:18:47 Starting - Starting the training job...
# 2023-05-21 05:19:12 Starting - Preparing the instances for training.........
# 2023-05-21 05:20:30 Downloading - Downloading input data...
# 2023-05-21 05:21:00 Training - Downloading the training image......
# 2023-05-21 05:22:05 Training - Training image download completed. Training in progress..Docker entrypoint called with argument(s): train
# Running default environment configuration script
# (略)
# ...
# 2023-05-21 05:22:52 Uploading - Uploading generated training model
# 2023-05-21 05:22:52 Completed - Training job completed
# Training seconds: 142
# Billable seconds: 142

モデルアーティファクトがS3バケットに出力されました。

アーティファクト

トレーニングジョブから個別の訓練の詳細を確認することができました。スクロールするともっと多くの情報を確認できました。

トレーニングジョブの詳細

ここまでの操作で、訓練データに対して適用するPCAの具体的な変換を定めることができました。

5. 訓練データへのPCAの実行

訓練データに対して、先ほど作成した変換を適用します。ここで初めてバッチ変換が登場します。

pca_transformer = pca.transformer(
    instance_count=1,
    instance_type="ml.m4.xlarge",
    strategy="MultiRecord",
    assemble_with="Line",
    output_path="s3://{}/{}/pca/transform/train".format(bucket, prefix),
)

# 以下は表示結果
# INFO:sagemaker:Creating model with name: pca-2023-05-21-05-23-31-581

pca_transformer.transform(train_s3, content_type="text/csv", split_type="Line")
pca_transformer.wait()

# INFO:sagemaker:Creating transform job with name: pca-2023-05-21-05-23-39-772
# ........................................Docker entrypoint called with argument(s): serve
# Docker entrypoint called with argument(s): serve
# Running default environment configuration script
# Running default environment configuration script
# [05/21/2023 05:30:24 INFO 139673570338624] loaded entry point class algorithm.serve.server_config:config_api
# [05/21/2023 05:30:24 INFO 139673570338624] loaded entry point class algorithm.serve.server_config:config_api
# [05/21/2023 05:30:24 INFO 139673570338624] nvidia-smi: took 0.032 seconds to run.
# (略)
# ...
# [05/21/2023 05:30:33 INFO 139673570338624] The default executor is <PCAModel on cpu(0)>.
# [05/21/2023 05:30:33 INFO 139673570338624] <PCAModel on cpu(0)> is assigned to batch slice from 0 to 1300.
# #metrics {"StartTime": 1684647032.8784883, "EndTime": 1684647033.9421868, "Dimensions": {"Algorithm": "AlgorithmModel", "Host": "UNKNOWN", # "Operation": "scoring"}, "Metrics": {"invocations.count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}}}
# #metrics {"StartTime": 1684647033.1706922, "EndTime": 1684647034.0191453, "Dimensions": {"Algorithm": "AlgorithmModel", "Host": "UNKNOWN", # "Operation": "scoring"}, "Metrics": {"invocations.count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}}}

以下のようにS3バケットに変換結果が出力されました。

訓練データの変換結果

実行したバッチ変換は、バッチ変換ジョブ画面から確認できました。以下の画像は訓練データおよびテストデータ向けに実行した後なので2件分載っています。

バッチ変換ジョブ画面

6. テストデータへのPCAの実行

テストデータに対して、先ほど作成した変換を適用します。ここではsagemaker.transformer.TransformerのAPIを使いました。

pca_model = sess.create_model_from_job(
    pca._current_job_name, name="{}-test".format(pca._current_job_name)
)

pca_test_transformer = Transformer(
    pca_model,
    1,
    "ml.m4.xlarge",
    output_path="s3://{}/{}/pca/transform/test".format(bucket, prefix),
    sagemaker_session=sess,
    strategy="MultiRecord",
    assemble_with="Line",
)
pca_test_transformer.transform(test_s3, content_type="text/csv", split_type="Line")
pca_test_transformer.wait()

# 以下は表示結果
# INFO:sagemaker:Creating transform job with name: pca-2023-05-21-05-31-25-244
# ......................................Docker entrypoint called with argument(s): serve
# Running default environment configuration script
# Docker entrypoint called with argument(s): serve
# Running default environment configuration script
# [05/21/2023 05:37:45 INFO 140519090681664] loaded entry point class algorithm.serve.server_config:config_api
# (略)
# ...
# [05/21/2023 05:37:48 INFO 140519090681664] The default executor is <PCAModel on cpu(0)>.
# [05/21/2023 05:37:48 INFO 140519090681664] <PCAModel on cpu(0)> is assigned to batch slice from 0 to 93.
# #metrics {"StartTime": 1684647468.3064656, "EndTime": 1684647468.5905228, "Dimensions": {"Algorithm": "AlgorithmModel", "Host": "UNKNOWN", "Operation": "scoring"}, # # # "Metrics": {"invocations.count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}}}
# 2023-05-21T05:37:48.320:[sagemaker logs]: MaxConcurrentTransforms=4, MaxPayloadInMB=6, BatchStrategy=MULTI_RECORD

以下のようにS3バケットに変換結果が出力されました。

テストデータの変換結果

7. 片づけ

処理を実行するインスタンスを停止しました。

これはStudioのRUNNING INSTANCESからか、

削除1

ユーザーの詳細から削除できました。

削除2

削除されると、ステータスがDeletedになりました。

削除後

Experimentsの確認

Experimentsでは、今回のサンプル通りに処理を実行すると、 Unassigned runsに処理が記録されました。

Experimentsからの確認1

パラメータなども確認できました。

Experimentsからの確認2

最後に

今回は『PCA および DBSCAN ムービークラスターを使用したバッチ変換』に沿って、PCAを例にSageMakerのバッチ変換を試してみました。

バッチ変換を使うと、SageMaker Studioなどから変換用のインスタンスを起動し、そのインスタンス上で変換を実行できるのでとても便利ですね。

バッチ変換の実行のされ方のイメージを掴みたい方の参考になりましたら幸いです。