Amazon SageMakerのScript modeを使ってKerasを動かしてみた #reinvent

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、小澤です。

re:Invent 2019で大々的に発表されたわけではありませんが、 非常に便利になった機能として、TensorflowとMXNetのScript mode対応があります。

今回は、Script modeを使ってTensorflowコンテナでKerasを使ってみたいと思います。

Script modeとは

SageMakerではBuilt-inアルゴリズムを使う、 Tensorflowなどのあらかじめ用意されているコンテナを利用して学習処理を記述する、 独自コンテナを利用して、用意されていないライブラリを利用する、という3種類に使い方があります。

これまで、あらかじめ用意されているコンテナを利用する場合は、SageMakerの書き方に則っていくつかの関数を用意する必要がありました。 Tensorflowを利用する際に用意する関数やその記述方法については以下をご覧ください。

これに対して、Script modeを利用すると、学習の処理の一連の流れをif __name__ == '__main__'の中で記述する形式になります。

具体的な流れとしては、

  1. ハイパーパラメータや学習データのパスなどをコマンドライン引数として受け取る
  2. 従来のライブラリ・フレームワークでの学習処理をそのまま記述する
  3. モデルを保存する

となります。

お気づきの方もいるかもしれませんが、先日書いたScikit-learnコンテナを使った処理もこのScript modeを使ったものとなっています。

従来の関数を定義する書き方(公式ドキュメントではLegacy Modeと記載されている)と比較して、既存のスクリプトをそのまま利用しやすい形式になっています。

実際に処理を行ってみる

では、このScript modeを使ってKerasを使ってみたいと思います。 用意するものは従来通り、学習処理を記述するPythonスクリプトと時データを渡して処理を実行するノートブックの2つとなります。

学習処理を記述したスクリプト

まずは、学習処理を記述したスクリプトを見ていきましょう。 処理で利用する以下のライブラリをインポートしています。

import argparse
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import keras

from keras.layers import Dense, Dropout
from keras.models import Sequential
from keras.optimizers import RMSprop
from keras import backend as K

最初に、各種パラメータをコマンドライン引数として受け取ります。

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # ハイパーパラメータとして与える値を引数で受け取る
    parser.add_argument('--batch-size', type=int, default=128)
    parser.add_argument('--epochs', type=int, default=20)
    parser.add_argument('--num-classes', type=int, default=10)

    # SageMakerの動作に関する引数
    # モデルの出力先パスを指定
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    # 学習データの入力パスを指定
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))

    args, _ = parser.parse_known_args()

通常のPythonコードとして、argparseを利用しており、ほとんどは上記のScikit-learnで紹介したものと同じですね。

ハイパーパラメータとして渡しているものと、validationチャネルも利用してみまた、という点が異なっています。

なお、今回は、validationという名前のチャネルにしていますが、チャネル名は何でも問題ありません。 環境変数SM_CHANNEL_<チャネル名>は指定したものに合わせて自動で作成されます。

続いて、学習に利用するデータを読み込みます。

    # データをPandasのDataFrameとして取得
    # ディレクトリに複数ファイルある場合にも対応
    train_inputs = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    train_raw_data = [ pd.read_csv(file, header=None, engine="python") for file in train_inputs ]
    train_data = pd.concat(train_raw_data)

    # 読み込んだデータを学習データと正解ラベルに分ける
    train_y = train_data.ix[:,0]
    train_y = keras.utils.to_categorical(train_y, args.num_classes)
    train_x = train_data.ix[:,1:]

    valid_inputs = [ os.path.join(args.validation, file) for file in os.listdir(args.validation) ]
    valid_raw_data = [ pd.read_csv(file, header=None, engine="python") for file in valid_inputs ]
    valid_data = pd.concat(valid_raw_data)

    valid_y = valid_data.ix[:,0]
    valid_y = keras.utils.to_categorical(valid_y, args.num_classes)
    valid_x = valid_data.ix[:,1:]

こちらも引数で与えられた、train, validationとチャネルごとのデータのパスを受け取って読み込んでいます。 また、読み込んだのち、正解ラベルをone-hotなベクトルに変換しています。

ここまで準備が整えば、あとは学習処理を行うのみにとなります。

    model = Sequential()
    model.add(Dense(512, activation='relu', name='inputs', input_shape=(784,)))
    model.add(Dropout(0.2))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(10, activation='softmax'))

    model.compile(loss='categorical_crossentropy',
                  optimizer=RMSprop(),
                  metrics=['accuracy'])

    model.fit(train_x, train_y,
              batch_size=args.batch_size,
              epochs=args.epochs,
              verbose=1,
              validation_data=(valid_x, valid_y))

Kerasの使い方がわかる方には本当にそのまま記述しているだけ、というがわかっていただけるかと思います。

Kerasがよくわからないよー>< って方は以下をご参照ください。

なお、スクリプト内で記述したprint文はCloudWatch Logsに出力されるので、validationデータに対して以下のように評価をしておくことも可能です。

    score = model.evaluate(valid_x, valid_y)
    print('loss : {}'.format(score[0]))
    print('acc : {}'.format(score[1]))

最後にモデルの保存を行います。 Script modeでは、このモデルの保存を行わないと学習コンテナの破棄とともにモデルも失われてしまうので注意しましょう。

今回、Tensorflowコンテナを使っているのでTensorflowのモデルとして保存しておく必要があります。

    sess = K.get_session()
    tf.saved_model.simple_save(
        sess,
        os.path.join(args.model_dir, 'model/1'),
        inputs={'inputs': model.input},
        outputs={t.name: t for t in model.outputs})

処理を実行するノートブック

続いて、ノートブック側の処理を見てみましょう。 とはいえ、こちらは既存のLegacy Modeとほとんど変わりません。

まずはSageMakerを使う時の"いつもの"準備をします。

import os
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow
from tensorflow.python.keras.datasets import mnist
from tensorflow.python import keras

sagemaker_session = sagemaker.Session()
role = get_execution_role()

学習データの用意をします。

(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255

# S3に保存するためにデータを正解ラベルを1つにまとめる
joined_data = np.insert(x_train, 0, y_train, axis=1)

# 学習データをtrainチャネル用とvalidationチャネル用に分ける
df = pd.DataFrame(joined_data)
train_df = df[:50000]
valid_df = df[50000:]
train_df.to_csv('./train/train.csv', index=False, header=False)
valid_df.to_csv('./valid/valid.csv', index=False, header=False)

# それぞれをS3にアップロード
train_s3 = sagemaker_session.upload_data(path='./train', bucket='<S3 Bucket>', key_prefix='data/mnist_train')
valid_s3 = sagemaker_session.upload_data(path='./valid', bucket='<S3 Bucket>', key_prefix='data/mnist_valid')

あとは、学習処理を実行します。

batch_size = 128
num_classes = 10
epochs = 20

estimator = TensorFlow(entry_point='mnist_script.py',
                       role=role,
                       framework_version='1.12.0',
                       hyperparameters={
                           'batch-size' : batch_size,
                           'epochs' : epochs,
                           'num-classes' : num_classes
                       },
                       train_instance_count=1,
                       train_instance_type='ml.m5.xlarge',
                       script_mode=True,
                       py_version='py3')

estimator.fit({'train' : train_s3, 'validation' : valid_s3})

これで学習処理が実行されます。 学習が完了したのちは、他で作成したモデルと同様エンドポイントの作成が可能です。

今回は、バッチ変換ジョブを使って1000件のデータで評価してみましょう。

# train, validationと同じ方法で、テストデータを1000件S3にアップロードする
df = pd.DataFrame(x_test)
df = df.iloc[:1000, :]
df.to_csv('./test/test.csv', index=False, header=False)
test_s3 = sagemaker_session.upload_data(path='./test', bucket='<S3 Bucket>', key_prefix='data/mnist_test')

# transformerを使ってバッチ変換ジョブを実行
transformer = estimator.transformer(instance_count=1, instance_type='ml.m5.xlarge')
transformer.transform(test_s3, content_type='text/csv')
transformer.wait()

# 出力結果をS3から取得する
import json
import boto3
from urllib import parse

parsed_url = parse.urlparse(transformer.output_path)
bucket_name = parsed_url.netloc
file_name = parsed_url.path[1:] + "/test.csv.out"

s3 = boto3.resource('s3')
output_obj = s3.Object(bucket_name, file_name)
output = output_obj.get()["Body"].read().decode('utf-8')

# 出力結果はエンドポイント推論の際と同じフォーマットのJSON
# パースして最も確率の高い推論結果のみを取得する
result = json.loads(output)['predictions']
predict = np.array([np.array(r) for r in result]).argmax(axis=1)

# 正解率を出力
result_df = pd.DataFrame({'predict' : predict, 'label' : y_test[:1000]})
acc = sum(result_df.apply(lambda x: x['predict'] == x['label'], axis=1)) / 1000
print(acc)

手元の環境では0.984となりました。 これでテストデータ1000件に対しての正解率が求まりました。

おわりに

今回はTensorflowコンテナのScript modeでKerasを使ってみました。 学習処理は入出力部分のみをつけ足せば既存のスクリプトがそのままSageMakerに移植可能な状態になっており非常に便利ですね。