SageMakerの「パイプモード」を使ってTensorFlowの処理を高速化する:Amazon SageMaker Advent Calendar 2018

概要

こんにちは、データインテグレーション部のyoshimです。
この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の4日目の記事となります。

目次

1.やること

SageMakerの「パイプモード」という機能を調べたことがなかったので調べてみました。
また、チュートリアルが公開されていたので、このチュートリアルの内容をベースに紹介していきたいと思います。

2.「パイプモード」について

まず、「パイプモード」とはなんぞや、というところから調べました。
こちらを見てみると、下記のような機能のようです。

  • データセットを学習コンテナにダウンロードするのではなく、トレーニングインスタンスにS3から直接ストリーミングする。
  • その結果、学習の開始が早まり、必要なディスク容量も小さくて済む

この記述だけを見てもシンプルに良さそうな機能です。
また「通常のファイル読み込みよりもS3からのストリーミングの方が読み込みが早く」、「S3をソースにしているので実質無制限な量のデータを処理」できます。どうやら、大規模なデータを処理する時にはパイプモードを使った方が良さそうですね。

下記は、AWSがパイプモードのメリットを示すために、通常の「ファイルモード」とのパフォーマンスを比較したものです。
ニューヨーク市のタクシートリップレコードのデータセットを使ってPCAを学習させた際のパフォーマンスだそうです。
引用: Amazon SageMaker アルゴリズムのパイプ入力モードを使用する

JOB開始時間、IOのいずれもパフォーマンスが向上しており、結果として合計の学習時間が削減されています。

3.TensorFlowの自作コードで学習をする方法

続いて、「TensorFlow」の自作コードで学習をする方法について説明します。
今回のチュートリアルで用意したスクリプトは下記の通りです。
(「pipemode.py」)

import numpy as np
import os
import tensorflow as tf
from sagemaker_tensorflow import PipeModeDataset

from tensorflow.contrib.data import map_and_batch

PREFETCH_SIZE = 10
BATCH_SIZE = 64
NUM_PARALLEL_BATCHES = 2
DIMENSION = 1024
EPOCHS = 1

def estimator_fn(run_config, params):
    column = tf.feature_column.numeric_column('data', shape=(DIMENSION, ))
    return tf.estimator.LinearClassifier(feature_columns=[column], config=run_config)

def train_input_fn(training_dir, params):
    """Returns input function that would feed the model during training"""
    return _input_fn('train')

def eval_input_fn(training_dir, params):
    """Returns input function that would feed the model during evaluation"""
    return _input_fn('eval')

def _input_fn(channel):
    """Returns a Dataset for reading from a SageMaker PipeMode channel."""
    features = {
        'data': tf.FixedLenFeature([], tf.string),
        'labels': tf.FixedLenFeature([], tf.int64),
    }

    def parse(record):
        parsed = tf.parse_single_example(record, features)
        return ({
            'data': tf.decode_raw(parsed['data'], tf.float64)
        }, parsed['labels'])

    ds = PipeModeDataset(channel)
    if EPOCHS > 1:
        ds = ds.repeat(EPOCHS)
    ds = ds.prefetch(PREFETCH_SIZE)
    ds = ds.apply(map_and_batch(parse, batch_size=BATCH_SIZE,
                                num_parallel_batches=NUM_PARALLEL_BATCHES))

    return ds

「estimator_fn」がモデルの定義を、「train_input_fn」、「eval_input_fn」がトレーニング用/評価用データを読み込む処理を記述しています。
「train_input_fn」、「eval_input_fn」はそれぞれ必須ですが、「SageMaker TensorFlow docker image」を使う場合は「estimator_fn」ではなく、「model_fn」、「keras_model_fn」を使うこともできます。
引用:sagemaker-python-sdkのREADME

  • Exactly one of the following:
    • model_fn: defines the model that will be trained.
    • keras_model_fn: defines the tf.keras model that will be trained.
    • estimator_fn: defines the tf.estimator.Estimator that will train the model.
  • train_input_fn: preprocess and load training data.
  • eval_input_fn: preprocess and load evaluation data.

ちなみに、「estimator_fn」の基本骨格としては、下記のような関数を記述する必要があります。
(引用元にあったサンプルです)

def estimator_fn(run_config, hyperparameters):
  # Logic to the following:
  # 1. Defines the features columns that will be the input of the estimator
  # 2. Returns an instance of a ``tensorflow.estimator`` passing in, the input run_config in the
  #    constructor.

  # Defines the features columns that will be the input of the estimator
  feature_columns = [tf.feature_column.numeric_column(INPUT_TENSOR_NAME, shape=[4])]
  # Returns the instance of estimator.
  return tf.estimator.DNNClassifier(feature_columns=feature_columns,
                                      hidden_units=[10, 20, 10],
                                      n_classes=3,
                                      config=run_config)

引用:sagemaker-python-sdkのREADME

「train_input_fn」、「eval_input_fn」については、下記のような処理を記述する必要があります。

def train_input_fn(training_dir, hyperparameters):
  # Logic to the following:
  # 1. Reads the **training** dataset files located in training_dir
  # 2. Preprocess the dataset
  # 3. Return 1)  a dict of feature names to Tensors with
  # the corresponding feature data, and 2) a Tensor containing labels
  return features, labels

引用:sagemaker-python-sdkのREADME

また、「_input_fn」では「パイプモードでデータを読み込むための特別な処理」をしています。
具体的には、「PipeModeDataset」というデータとして変換する処理が必要になります。

詳細はsagemaker-tensorflow-extensionをご参照いただきたいのですが、ここでは「recordio」形式のファイルを「PipeModeDataset」形式に変換しています。
(2018年11月末時点では、こちらのドキュメントを見ると「TFRecord」、「RecordIO」、「txt」の3種類のファイルしか対応できていないようです)

def _input_fn(channel):
    """Returns a Dataset for reading from a SageMaker PipeMode channel."""
    features = {
        'data': tf.FixedLenFeature([], tf.string),
        'labels': tf.FixedLenFeature([], tf.int64),
    }

    def parse(record):
        parsed = tf.parse_single_example(record, features)
        return ({
            'data': tf.decode_raw(parsed['data'], tf.float64)
        }, parsed['labels'])

    ds = PipeModeDataset(channel) # 「PipeModeDataset」に変換
    if EPOCHS > 1:
        ds = ds.repeat(EPOCHS)
    ds = ds.prefetch(PREFETCH_SIZE)
    ds = ds.apply(map_and_batch(parse, batch_size=BATCH_SIZE,
                                num_parallel_batches=NUM_PARALLEL_BATCHES))

    return ds

SageMakerでTensorFlowの自作コードを利用する方法の詳細については、下記もご参照ください。
Amazon SageMakerでTensorFlowを使ってIris分類してみた
sagemaker-python-sdkのREADME

続いて、学習用のパラメータを設定します。
「input_mode」が「Pipe」になっているのがポイントです。

from sagemaker.tensorflow import TensorFlow

# https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/tf-example1-train.html
tensorflow = TensorFlow(entry_point='pipemode.py',
                        role=role,
                        framework_version='1.11.0',
                        input_mode='Pipe', # 「Pipe」にすることでパイプモードに設定可能。「File」にするとファイルモード。
                        output_path=model_artifacts_location, # 学習した結果が格納されるS3パス
                        code_location=custom_code_upload_location, # 実行したスクリプトを保存するS3パス
                        train_instance_count=1,
                        training_steps=1000,
                        evaluation_steps=100,
                        train_instance_type='ml.c4.xlarge')

最後に利用するデータセットのS3パスを指定して学習を開始すればOKです!!!

%%time
import boto3

# use the region-specific sample data bucket
region = boto3.Session().region_name

train_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/train'.format(region)
eval_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/eval'.format(region)

tensorflow.fit({'train':train_data, 'eval':eval_data})

4.まとめ

パイプモードを使うことで、TensorFlowを使ったモデル開発を効率化することができそうですが、そのためには下記の対応をする必要があるようです。

  • 1.データを「PipeModeDataset」形式に変換する(2018年11月末時点では、「TFRecord」、「RecordIO」、「txt」のいずれかからのみ可能)
  • 2.エントリーポイントファイルの作成
  • 3.学習パラメータ指定時に「input_mode」パラメータに「Pipe」を指定

学習にかかる時間、金額等のコスト削減を考えている方がいらっしゃったら是非「パイプモード」を試してみてください。