Amazon DynamoDBを使って予測モデルのレプリカを作ってみた

2015.11.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

平田です。最近は頑張ってPythonを覚えようとしています。

Amazon MLの予測問い合わせ用API を利用する場合、要件によっては問い合わせの応答速度が課題となる場合があります。

特にAmazon MLは現在(2015年11月時点)では、リージョン対応がヴァージニアとアイルランドのみとなっているため、日本から問い合わせた場合には応答時間がそれなりにかかってしまいます。

今回は応答速度を改善するために、DynamoDBを使って東京リージョン内にAmazon MLの予測モデルのレプリカを作ってみました。

構成図

大まかな構成は以下の図のようになります。

aml-replica

まず、事前に予測モデルに対してバッチ予測を実行し、入力値と予測結果のペアを取得しておきます。そして、このバッチ予測の結果をもとに、入力値がキー、予測結果がバリューとなるようにDynamoDB上のテーブルにデータを格納していきます。

クライアントに対しては、予測問い合わせの窓口としてLambda Functionを用意しておき、その中でDynamoDBやAmazon MLとの橋渡しを行います。まずは入力値をキーとして、DynamoDBにすでに予測値が格納されていないか確認を行います。すでに予測値があればその結果を返し、なければ予測モデルに対して再度結果を問い合わせ、その値を返却します。

非常に単純な仕組みのレプリカですが、基本的に予測モデルに入力される値と予測結果は一意であるため、予測モデルの代用品として十分に働いてくれます。

作成方法

ここでは、入力値と予測結果をCSVの形で取得したという前提で、その取得結果からDynamoDBへ結果を格納し、Lambda Functionを繋げるまでをスクリプトで作成します。

サンプルとなるCSVは以下のような形式のものを使用します。

var1,var2,result
1,2,10
2,3,23
...

まずはDynamoDBにテーブルを作成し、CSVのデータを格納していきます。以下の設定ファイルにCSVのパスやテーブルの定義などをまとめておき、スクリプトに読み込ませることでテーブルの作成とCSVデータの格納を行います。

# config.yaml
csv_path: './data/prediction_data.csv'

dynamo:
table_definition:
AttributeDefinitions:
-
AttributeName: 'key'
AttributeType: 'S'
TableName: 'predictionReplica'
KeySchema:
-
AttributeName: 'key'
KeyType: 'HASH'
ProvisionedThroughput:
ReadCapacityUnits : 1
WriteCapacityUnits : 1
# create_table.py
import boto3
import yaml

config_path = './config/config.yaml'
definition = yaml.load(open(config_path,'r'))['dynamo']['table_definition']
client = boto3.client('dynamodb')

client.create_table(**definition) # テーブルの作成
# insert_csv_data.py
import boto3
import yaml

config_path = './config/config.yaml'
definition = yaml.load(open(config_path,'r'))['dynamo']['table_definition']
csv_path = yaml.load(open(config_path,'r'))['csv_path']

table_name = definition['TableName']

table = boto3.resource('dynamodb').Table(table_name)

reader = csv.reader(open(csv_path))
header = next(reader)

keys = set()
with table.batch_writer() as batch:
for row in reader:
key = '' + row[0] + '#' + row[1]

if key in keys: continue # keyの重複チェック
keys.add(key)

item = {'key': key, 'result': row[2]}
batch.put_item(Item=item) # 予測データの追加

データ格納後は、アクセスするためのlambda functionを作成します。関数に渡された入力値からkeyを作成し、DynamoDB上のレプリカに問い合わせます。結果が取得できればそのまま返し、取得できなければ再度MLに問い合わせます。

import json
import boto3

def lambda_handler(event, context):
key = '' + event[‘var1'] + '#' + event[‘var2']
table = boto3.resource('dynamodb').Table('predictionReplica')
data = table.get_item(Key={'key' : key}) # レプリカに問い合わせ

if 'Item' in data:
result = { 'Prediction': {'predictedValue': float(data['Item']['result']) }}
return trim_output(result) # レプリカ上にあれば、その結果を返す

ml = boto3.client('machinelearning', region_name='us-east-1')
params = {
'MLModelId': 'ml-xxxxxxxx',
'PredictEndpoint': 'https://realtime.machinelearning.us-east-1.amazonaws.com',
'Record': convert_params_for_ml(event)
};

result = ml.predict(**params) # レプリカ上になければ、結果をMLに問い合わせる
return trim_output(result)

def trim_output(data):
result = data['Prediction']

if 'details' in result: del result['details']

result['predictedValue'] = round(result['predictedValue'])
return result

結果について

LambdaからAmazon MLに直接問い合わせた場合、私の環境では応答におおむね1500ms程度かかる印象でしたが、今回のレプリカを用いることで、これが200ms〜400msに改善することができました。

もし予測の問い合わせ速度が問題になった場合には、対策の一つとしてご一考いただければと思います。

注意点

この方法ですが、事前に問い合わせがくると思われる入力値の組み合わせの分だけ事前にバッチ予測にかける必要があります。 例えば、入力値として100パターンを持つ変数3つの組み合わせの場合は、100^3で100万通りの入力値に対して問い合わせを行う必要があります。AMLは予測 1,000 件当たり 0.10 USDなので、100万回問い合わせを行う場合には100 USDがかかる計算となります。

これが4変数、5変数の場合には、さらにバッチ予測にかかる費用がスケールしていくので、レプリカを作成する際には、変数の組み合わせ数から費用を概算で出しておく必要があります。