SageMakerの「パイプモード」を使ってTensorFlowの処理を高速化する:Amazon SageMaker Advent Calendar 2018
概要
こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の4日目の記事となります。
目次
1.やること
SageMakerの「パイプモード」という機能を調べたことがなかったので調べてみました。
また、チュートリアルが公開されていたので、このチュートリアルの内容をベースに紹介していきたいと思います。
2.「パイプモード」について
まず、「パイプモード」とはなんぞや、というところから調べました。
こちらを見てみると、下記のような機能のようです。
- データセットを学習コンテナにダウンロードするのではなく、トレーニングインスタンスにS3から直接ストリーミングする。
- その結果、学習の開始が早まり、必要なディスク容量も小さくて済む
この記述だけを見てもシンプルに良さそうな機能です。
また「通常のファイル読み込みよりもS3からのストリーミングの方が読み込みが早く」、「S3をソースにしているので実質無制限な量のデータを処理」できます。どうやら、大規模なデータを処理する時にはパイプモードを使った方が良さそうですね。
下記は、AWSがパイプモードのメリットを示すために、通常の「ファイルモード」とのパフォーマンスを比較したものです。
ニューヨーク市のタクシートリップレコードのデータセットを使ってPCAを学習させた際のパフォーマンスだそうです。
引用:
Amazon SageMaker アルゴリズムのパイプ入力モードを使用する
JOB開始時間、IOのいずれもパフォーマンスが向上しており、結果として合計の学習時間が削減されています。
3.TensorFlowの自作コードで学習をする方法
続いて、「TensorFlow」の自作コードで学習をする方法について説明します。
今回のチュートリアルで用意したスクリプトは下記の通りです。
(「pipemode.py」)
import numpy as np import os import tensorflow as tf from sagemaker_tensorflow import PipeModeDataset from tensorflow.contrib.data import map_and_batch PREFETCH_SIZE = 10 BATCH_SIZE = 64 NUM_PARALLEL_BATCHES = 2 DIMENSION = 1024 EPOCHS = 1 def estimator_fn(run_config, params): column = tf.feature_column.numeric_column('data', shape=(DIMENSION, )) return tf.estimator.LinearClassifier(feature_columns=[column], config=run_config) def train_input_fn(training_dir, params): """Returns input function that would feed the model during training""" return _input_fn('train') def eval_input_fn(training_dir, params): """Returns input function that would feed the model during evaluation""" return _input_fn('eval') def _input_fn(channel): """Returns a Dataset for reading from a SageMaker PipeMode channel.""" features = { 'data': tf.FixedLenFeature([], tf.string), 'labels': tf.FixedLenFeature([], tf.int64), } def parse(record): parsed = tf.parse_single_example(record, features) return ({ 'data': tf.decode_raw(parsed['data'], tf.float64) }, parsed['labels']) ds = PipeModeDataset(channel) if EPOCHS > 1: ds = ds.repeat(EPOCHS) ds = ds.prefetch(PREFETCH_SIZE) ds = ds.apply(map_and_batch(parse, batch_size=BATCH_SIZE, num_parallel_batches=NUM_PARALLEL_BATCHES)) return ds
「estimator_fn」がモデルの定義を、「train_input_fn」、「eval_input_fn」がトレーニング用/評価用データを読み込む処理を記述しています。
「train_input_fn」、「eval_input_fn」はそれぞれ必須ですが、「SageMaker TensorFlow docker image」を使う場合は「estimator_fn」ではなく、「model_fn」、「keras_model_fn」を使うこともできます。
引用:sagemaker-python-sdkのREADME
- Exactly one of the following:
- model_fn: defines the model that will be trained.
- keras_model_fn: defines the tf.keras model that will be trained.
- estimator_fn: defines the tf.estimator.Estimator that will train the model.
- train_input_fn: preprocess and load training data.
- eval_input_fn: preprocess and load evaluation data.
ちなみに、「estimator_fn」の基本骨格としては、下記のような関数を記述する必要があります。
(引用元にあったサンプルです)
def estimator_fn(run_config, hyperparameters): # Logic to the following: # 1. Defines the features columns that will be the input of the estimator # 2. Returns an instance of a ``tensorflow.estimator`` passing in, the input run_config in the # constructor. # Defines the features columns that will be the input of the estimator feature_columns = [tf.feature_column.numeric_column(INPUT_TENSOR_NAME, shape=[4])] # Returns the instance of estimator. return tf.estimator.DNNClassifier(feature_columns=feature_columns, hidden_units=[10, 20, 10], n_classes=3, config=run_config)
引用:sagemaker-python-sdkのREADME
「train_input_fn」、「eval_input_fn」については、下記のような処理を記述する必要があります。
def train_input_fn(training_dir, hyperparameters): # Logic to the following: # 1. Reads the **training** dataset files located in training_dir # 2. Preprocess the dataset # 3. Return 1) a dict of feature names to Tensors with # the corresponding feature data, and 2) a Tensor containing labels return features, labels
引用:sagemaker-python-sdkのREADME
また、「_input_fn」では「パイプモードでデータを読み込むための特別な処理」をしています。
具体的には、「PipeModeDataset」というデータとして変換する処理が必要になります。
詳細はsagemaker-tensorflow-extensionをご参照いただきたいのですが、ここでは「recordio」形式のファイルを「PipeModeDataset」形式に変換しています。
(2018年11月末時点では、こちらのドキュメントを見ると「TFRecord」、「RecordIO」、「txt」の3種類のファイルしか対応できていないようです)
def _input_fn(channel): """Returns a Dataset for reading from a SageMaker PipeMode channel.""" features = { 'data': tf.FixedLenFeature([], tf.string), 'labels': tf.FixedLenFeature([], tf.int64), } def parse(record): parsed = tf.parse_single_example(record, features) return ({ 'data': tf.decode_raw(parsed['data'], tf.float64) }, parsed['labels']) ds = PipeModeDataset(channel) # 「PipeModeDataset」に変換 if EPOCHS > 1: ds = ds.repeat(EPOCHS) ds = ds.prefetch(PREFETCH_SIZE) ds = ds.apply(map_and_batch(parse, batch_size=BATCH_SIZE, num_parallel_batches=NUM_PARALLEL_BATCHES)) return ds
SageMakerでTensorFlowの自作コードを利用する方法の詳細については、下記もご参照ください。
Amazon SageMakerでTensorFlowを使ってIris分類してみた
sagemaker-python-sdkのREADME
続いて、学習用のパラメータを設定します。
「input_mode」が「Pipe」になっているのがポイントです。
from sagemaker.tensorflow import TensorFlow # https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/tf-example1-train.html tensorflow = TensorFlow(entry_point='pipemode.py', role=role, framework_version='1.11.0', input_mode='Pipe', # 「Pipe」にすることでパイプモードに設定可能。「File」にするとファイルモード。 output_path=model_artifacts_location, # 学習した結果が格納されるS3パス code_location=custom_code_upload_location, # 実行したスクリプトを保存するS3パス train_instance_count=1, training_steps=1000, evaluation_steps=100, train_instance_type='ml.c4.xlarge')
最後に利用するデータセットのS3パスを指定して学習を開始すればOKです!!!
%%time import boto3 # use the region-specific sample data bucket region = boto3.Session().region_name train_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/train'.format(region) eval_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/eval'.format(region) tensorflow.fit({'train':train_data, 'eval':eval_data})
4.まとめ
パイプモードを使うことで、TensorFlowを使ったモデル開発を効率化することができそうですが、そのためには下記の対応をする必要があるようです。
- 1.データを「PipeModeDataset」形式に変換する(2018年11月末時点では、「TFRecord」、「RecordIO」、「txt」のいずれかからのみ可能)
- 2.エントリーポイントファイルの作成
- 3.学習パラメータ指定時に「input_mode」パラメータに「Pipe」を指定
学習にかかる時間、金額等のコスト削減を考えている方がいらっしゃったら是非「パイプモード」を試してみてください。