Amazon SageMaker Processingを試してみた #reinvent

Amazon SageMaker Processingを使ってみた様子を紹介

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どうも、DA事業本部の大澤です。

昨日発表されたAmazon SageMaker Processingを使ってみたので、その様子をご紹介します。

やってみた

今回は公式ブログで紹介されていた内容をもとに試してみました。

Irisデータセットを読み込み、分割し、出力するというシンプルな内容です。

準備

前処理実行時にSageMakerが使用するIAMロールのARNや処理後データの保存場所を決めておきます。

import sagemaker
from os import path
from sklearn import datasets
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import pandas as pd

bucket = 'osawa-test-ml'
key_prefix = 'sagemaker-processing-test'
prefix = f's3://{bucket}/{key_prefix}'

destination = {
    'train': path.join(prefix, 'train'),
    'validation': path.join(prefix, 'validation'),
    'test': path.join(prefix, 'test')
}

role = 'role_arn'
# role = sagemaker.get_execution_role()

データ準備

Irisのデータを読み込んで、CSVとして保存します。

iris = datasets.load_iris()
pd.DataFrame(iris.data, columns=iris.feature_names).to_csv('dataset.csv', index=False)

前処理用スクリプト

データを読み込み、ただデータを分割し、保存するだけの処理です。 ファイルの入出力場所については、inputsoutputsとして後ほどProcessorのrun時に設定するパスと合わせます。

preprocessing.py

import pandas as pd
import os
from sklearn.model_selection import train_test_split

# ローカルにデータを読み込む
df = pd.read_csv('/opt/ml/processing/input/dataset.csv')

# データ分割
train, test = train_test_split(df, test_size=0.2)
train, validation = train_test_split(train, test_size=0.2)

# データの出力先ディレクトリを作成
os.makedirs('/opt/ml/processing/output/train', exist_ok=True)
os.makedirs('/opt/ml/processing/output/validation', exist_ok=True)
os.makedirs('/opt/ml/processing/output/test', exist_ok=True)

# データをローカルに保存
train.to_csv("/opt/ml/processing/output/train/train.csv")
train.to_csv("/opt/ml/processing/output/train/train.csv", index=False)
validation.to_csv("/opt/ml/processing/output/validation/validation.csv", index=False)
test.to_csv("/opt/ml/processing/output/test/test.csv", index=False)

print('Finished running processing job')

前処理ジョブの作成

SKLearnProcessorを使って、環境情報を設定します。

設定可能な引数は次の通りです。

  • framework_version (str): 使用するscikit-learnのバージョン
  • role (str): SageMakerが使用するIAMロールのARN
  • instance_type (str): インスタンスタイプ
  • instance_count (int): インスタンス数
  • command ([str]): 実行時に使用するコマンド 例: ["python3", "-v"]
  • volume_size_in_gb (int): 処理実行時に使用するEBSのサイズ
  • volume_kms_key (str): 処理実行時に使用するストレージボリュームを暗号化するKMSキーのARN
  • output_kms_key (str): ProcessingOutputsを保存する際に使用するKMSキーのARN
  • max_runtime_in_seconds (int): タイムアウト秒数
  • base_job_name (str): 処理ジョブ
  • sagemaker_session (sagemaker.session.Session): SageMakerのセッション
  • env (dict): ジョブ実行時に設定する環境変数
  • tags ([dict]): 処理ジョブに指定するタグ
  • network_config (sagemaker.network.NetworkConfig): ネットワーク情報
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_count=1,
                                     instance_type='ml.m5.xlarge')

runによって、codeに指定したスクリプトファイルがSageMaker上で実行されます。

指定可能な引数は次の通りです。

  • code (str): 実行するスクリプトファイルのパス。ローカル or S3
  • inputs ([sagemaker.processing.ProcessingInput]): 入力ファイル情報
  • outputs ([str or sagemaker.processing.ProcessingOutput]): 出力ファイル情報
  • arguments ([str]): 実行時の引数
  • wait (bool): ジョブの完了を待つかどうか
  • logs (bool): ジョブ実行時のログを出力するかどうか(wait=Trueの時に有効)
  • job_name (str):ジョブ名
  • experiment_config (dict[str, str]): 実験管理情報。キーは'ExperimentName', 'TrialName','TrialComponentDisplayName'
sklearn_processor.run(
    code='preprocessing.py',
    inputs=[ProcessingInput(
        source='dataset.csv',
        destination='/opt/ml/processing/input')],
    outputs=[ProcessingOutput(source='/opt/ml/processing/output/train', destination=destination['train']),
        ProcessingOutput(source='/opt/ml/processing/output/validation', destination=destination['validation']),
        ProcessingOutput(source='/opt/ml/processing/output/test', destination=destination['test'])]
)

5分弱ほどでジョブが完了しました

ジョブの確認

ジョブ情報はjobsを参照することで確認できます。

sklearn_processor.jobs[0].describe().keys()

次のような情報が格納されています。

S3からデータを取ってきて、データが出力されているか確認します。

for output in sklearn_processor.jobs[0].outputs:
    dest = output.destination
    basename = path.basename(dest)
    !aws s3 cp {dest} ./{basename} --recursive

無事出力されているようです。

さいごに

Amazon SageMaker Processingを使って処理してみた内容をお伝えしました。思っていたより簡単に実行でき、便利そうです。これからどんどん活用していきたいと思います。

参考