Amazon SageMakerでの学習や推論時に使用するストレージ上のデータを暗号化する – Amazon SageMaker Advent Calendar 2018

こんにちは、大阪DI部の大澤です。 この記事は「クラスメソッド Amazon SageMaker Advent Calendar」の18日目の記事です。

今回はAWS Key Management Service(KMS)のCustomer Managed Key(CMK)を使って、Amazon SageMakerでのモデルの学習やホスティングにおけるストレージ上のデータを暗号化するという内容のノートブックを試してみます。

AWS Key Management Service

AWS Key Management Service(KMS)は暗号化キーの作成と管理が簡単に行えるマネージドサービスです。

KMSについてはこちらの記事が詳しいです。よろしければご覧ください。

やってみる

概要

  • 暗号化キーの作成とIAMロールの設定
  • データの読み込みと前処理、S3へのアップロード
  • XGBoostによる回帰モデルの作成
  • モデルのホスティングして推論
  • バッチ推論

暗号化キーの作成とIAMロールの設定

今回はKMSのCMKを使って、学習時やモデルのホスティング時のストレージの暗号化を行います。CMKがない場合は新たに作成する必要がありますが、マネージドコンソールのKMSのページから簡単に作成できます。

その際、キーへのアクセス/使用権限管理を行うキーポリシー(CMKの設定)と各リソースに対するアクセス/操作権限管理を行うIAMポリシー(IAMロールの設定)に注意が必要です。

  • ノートブックインスタンスに関連づけているIAMロールがKMS:CreateGrantの権限を持っている必要があります。
    • 今回使用するCMKもそのIAMロールに対してKms:CreateGrant操作を許可している必要があります。
    • CMKのキーポリシーの条件設定でkms:GrantIsForAWSResourceはfalseである必要があります。
    • trueになっている場合は学習時等にパーミッションエラーが発生します。(私はここでハマりました...)
    • IsForAWSResourceとなってますが、これはEBSやRDS、RedShift、ACMが対象であり、SageMakerは含まれないためです。
    • AWS KMS でのポリシー条件の使用 - AWS Key Management Service
  • 学習やホスティングなどに使用するIAMロールがkms:Encryptkms:Decryptの権限を持っている必要があります。
    • 今回の場合はノートブックインスタンスに関連づけているIAMロールです。
    • 今回使用するCMKもそのIAMロールに対してkms:Encryptkms:Decryptを許可している必要があります。

※ 学習などでCMKに関するパーミッションエラーが出る場合にはキーポリシーによる許可もしくはIAMポリシーの権限が不足している可能性があります。それらを緩めに設定し何度か試してみて、上手くいったら不要そうな許可や権限を削除するという流れをお勧めします。

セットアップ

モデルの学習時やホスティング時に使用するIAMロールや使用するCMKのキー、データを保存するS3の場所などを指定します。

%%time

import os
import io
import boto3
import pandas as pd
import numpy as np
import re
from sagemaker import get_execution_role

region = boto3.Session().region_name

role = get_execution_role()

kms_key_arn = 'arn:aws:kms:ap-northeast-1:account_id:key/key_id'

bucket='bucket_name'
prefix = 'sagemaker/DEMO-kms'
bucket_path = 's3://{}'.format(bucket)

データ準備

今回はデータ分析の入門としてよく使われる、ボストンの住宅価格データを使います。

データの読み込みと前処理

scikit-learnにデータセットが組み込まれているので、そこから読み込みます。その後扱いやすいようにデータを変換します。 今回はXGBoostを使った回帰を行います。目的変数としてMEDV(住宅価格の中央値、単位は千ドル)、説明変数はそれ以外の特徴量です。

from sklearn.datasets import load_boston
boston = load_boston()
X = boston['data']
y = boston['target']
feature_names = boston['feature_names']
data = pd.DataFrame(X, columns=feature_names)
target = pd.DataFrame(y, columns={'MEDV'})
data['MEDV'] = y
local_file_name = 'boston.csv'
data.to_csv(local_file_name, header=False, index=False)

用途毎(チャネル毎)にデータを分割します。

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=1)
X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=0.5, random_state=1)

データをCSVとして書き出すための関数を定義します。

def write_file(X, y, fname, include_labels=True):
    feature_names = boston['feature_names']
    data = pd.DataFrame(X, columns=feature_names)
    if include_labels:
        data.insert(0, 'MEDV', y)
    data.to_csv(fname, header=False, index=False)

先ほどの関数を使って、各用途毎のデータをCSVに書き出します。

train_file = 'train.csv'
validation_file = 'val.csv'
test_file = 'test.csv'
test_no_labels_file = 'test_no_labels.csv'
write_file(X_train, y_train, train_file)
write_file(X_val, y_val, validation_file)
write_file(X_test, y_test, test_file)
write_file(X_test, y_test, test_no_labels_file, False)

アップロード

データをS3にアップロードします。S3上ではデータがCMKで暗号化されるように設定します。

s3 = boto3.client('s3')

data_train = open(train_file, 'rb')
key_train = '{}/train/{}'.format(prefix,train_file)
kms_key_id = kms_key_arn.split(':key/')[1]

print("Put object...")
s3.put_object(Bucket=bucket,
              Key=key_train,
              Body=data_train,
              ServerSideEncryption='aws:kms',
              SSEKMSKeyId=kms_key_id)
print("Done uploading the training dataset")

data_validation = open(validation_file, 'rb')
key_validation = '{}/validation/{}'.format(prefix,validation_file)

print("Put object...")
s3.put_object(Bucket=bucket,
              Key=key_validation,
              Body=data_validation,
              ServerSideEncryption='aws:kms',
              SSEKMSKeyId=kms_key_id)

print("Done uploading the validation dataset")

data_test = open(test_no_labels_file, 'rb')
key_test = '{}/test/{}'.format(prefix,test_no_labels_file)

print("Put object...")
s3.put_object(Bucket=bucket,
              Key=key_test,
              Body=data_test,
              ServerSideEncryption='aws:kms',
              SSEKMSKeyId=kms_key_id)

print("Done uploading the test dataset")

モデルの学習

SageMakerでXGBoostの回帰モデルの学習を行います。

まずは今回の学習と推論時に使用するコンテナイメージを取得します。

from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost')

今回はboto3のSageMakerクライアントを使って学習ジョブを作成します。

ResourceConfigVolumeKmsKeyIdでCMKのARNを設定することで、学習時のストレージ上のデータを暗号化することができます。 

%%time
from time import gmtime, strftime
import time

job_name = 'DEMO-xgboost-single-regression' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Training job", job_name)

create_training_params = \
{
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "RoleArn": role, # 学習時に使用するIAMロール
    "OutputDataConfig": { # モデルアーティファクトの出力先
        "S3OutputPath": bucket_path + "/"+ prefix + "/output" 
    },
    "ResourceConfig": { # 学習にどういうリソースで学習するか
        "InstanceCount": 1,
        "InstanceType": "ml.m4.4xlarge",
        "VolumeSizeInGB": 5,
        "VolumeKmsKeyId": kms_key_arn # ここでCMKのARNを設定
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.7",
        "silent":"0",
        "objective":"reg:linear",
        "num_round":"5"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 86400
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": bucket_path + "/"+ prefix + '/train',
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "csv",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": bucket_path + "/"+ prefix + '/validation',
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "csv",
            "CompressionType": "None"
        }
    ]
}

client = boto3.client('sagemaker')
client.create_training_job(**create_training_params)

try:
    # wait for the job to finish and report the ending status
    client.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
    training_info = client.describe_training_job(TrainingJobName=job_name)
    status = training_info['TrainingJobStatus']
    print("Training job ended with status: " + status)
except:
    print('Training failed to start')
     # if exception is raised, that means it has failed
    message = client.describe_training_job(TrainingJobName=job_name)['FailureReason']
    print('Training failed with the following error: {}'.format(message))

数分ほどで学習が終了します。IAMポリシーやCMKのキーポリシーが正しく設定されていない場合は学習を開始する途中でパーミッションエラーが出て停止します。その場合は改めて設定を確認し、修正する必要があります。

モデルのホスティング

エンドポイントを作成し、先ほど学習させたモデルをホスティングします。

そのためにまずはモデルアーティファクトからモデルを作成します。

%%time
import boto3
from time import gmtime, strftime

model_name=job_name + '-model'
print(model_name)

# 学習ジョブデータからモデルアーティファクトの場所を取得
info = client.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)

# コンテナ情報の定義
primary_container = {
    'Image': container, # モデルをホストする際に使用するコンテナイメージ(学習時と同じ)
    'ModelDataUrl': model_data # 学習させたモデルアーティファクトのS3パス
}

# モデルを作成
create_model_response = client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

エンドポイントを作るときに使用するエンドポイントコンフィグを作成します。エンドポイントのストレージ上のデータを暗号化するために、KmsKeyIdにCMKのARNを設定します。

from time import gmtime, strftime

endpoint_config_name = 'DEMO-XGBoostEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    KmsKeyId = kms_key_arn,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

先ほど作成したエンドポイントコンフィグを使用して、エンドポイントを作成します。 数分ほどでエンドポイントの作成は完了します。

%%time
import time

endpoint_name = 'DEMO-XGBoostEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)

# エンドポイントの作成リクエスト
create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])


print('EndpointArn = {}'.format(create_endpoint_response['EndpointArn']))


# エンドポイントの状態を確認
response = client.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print('EndpointStatus = {}'.format(status))


# エンドポイントの作成が完了するまで待機
client.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)


# 改めてエンドポイントの状態確認
endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response['EndpointStatus']
print('Endpoint creation ended with EndpointStatus = {}'.format(status))

if status != 'InService':
    raise Exception('Endpoint creation failed.')

推論

先ほど作成したエンドポイントに推論リクエストを投げて動作確認します。

推論時にはこれまでと異なりSageMakerのランタイムクライアントを使用します。

runtime_client = boto3.client('runtime.sagemaker')

テストデータを読み込んで推論リクエストを投げます。

import sys
import math

# 推論処理用関数
def do_predict(data, endpoint_name, content_type):
    response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType=content_type, 
                                   Body=data)
    result = response['Body'].read()
    result = result.decode("utf-8")
    return result

# 推論テスト用データ読み込み
with open('test.csv') as f:
    first_line = f.readline()
    features = first_line.split(',')[1:]
    feature_str = ','.join(features)

# 推論
prediction = do_predict(feature_str, endpoint_name, 'text/csv')
print('Prediction: ' + prediction)

推論できているようです。

エンドポイントの削除

確認が終了し、不要になったのでエンドポイントを削除します。

client.delete_endpoint(EndpointName=endpoint_name)

バッチ変換

次はバッチ変換の機能を使って、S3上にあるデータをまとめて推論します。この場合には推論用エンドポイント一時的に立ち上がって、処理が終わると自動的に停止します。

今回もバッチ変換処理を行うインスタンスのストレージ上のデータを暗号化するために、TransformResourcesVolumeKmsKeyIdにCMKのARNを設定します。

%%time
transform_job_name = 'DEMO-xgboost-batch-prediction' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Transform job", transform_job_name)

transform_params = \
{
    "TransformJobName": transform_job_name,
    "ModelName": model_name, # 使用するモデル名
    "TransformInput": { # バッチ変換の対象となるデータ設定
        "ContentType": "text/csv",
        "DataSource": {
            "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": bucket_path + "/"+ prefix + '/test'
            }
        },
        "SplitType": "Line"
    },
    "TransformOutput": { # バッチ変換後の出力設定
        "AssembleWith": "Line",
        "S3OutputPath": bucket_path + "/"+ prefix + '/predict'
    },
    "TransformResources": { # バッチ変換を行う際のリソース設定
        "InstanceCount": 1,
        "InstanceType": "ml.c4.xlarge",
        "VolumeKmsKeyId": kms_key_arn # ストレージ上のデータをCMKを使って暗号化する
    }
}

# バッチ変換ジョブの作成
client.create_transform_job(**transform_params)

# 処理が終了するまで待機
while True:
    response = client.describe_transform_job(TransformJobName=transform_job_name)
    status = response['TransformJobStatus']
    if status == 'InProgress':
        time.sleep(15)
    elif status == 'Completed':
        print("Transform job completed!")
        break
    else:
        print("Unexpected transform job status: " + status)
        break

バッチ変換が終了したらS3から推論結果をダウンロードしてきて、各点における答えと推論結果の誤差割合の中央値を計算します。

print("Downloading prediction object...")
s3.download_file(Bucket=bucket,
                 Key=prefix + '/predict/' + test_no_labels_file + '.out',
                 Filename='./predictions.csv')

preds = np.loadtxt('predictions.csv')
print('\nMedian Absolute Percent Error (MdAPE) = ', np.median(np.abs(y_test - preds) / y_test))

さいごに

今回は 「クラスメソッド Amazon SageMaker Advent Calendar」 の18日目として、 SageMakerにおけるモデルの学習時やホスティング時のストレージをAWS KMSの暗号化キーで暗号化するという内容をお伝えしました。 セキュリティ的な制約によりストレージ上のデータは常に暗号化が必要というケースもあると思います。そういった場合でも簡単に対応できることが分かりました。IAMポリシーにしてもキーポリシーにしても、緩くしすぎるとセキュリティ的に問題ですし、厳しくしすぎるとうまく動作しない、もしくは不便になったり、設定が面倒になってしまいます。丁度いい塩梅に設定するってのは難しいですね。

お読みくださりありがとうございました〜!明日もお楽しみに〜!