#Sagemaker の推論エンドポイントに対してLambdaから負荷試験してX-Rayでレスポンスを監視してみた

2022.03.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

せーのでございます。

前回までのエントリではSagemakerの推論エンドポイントを立て、Sagemaker Notebookから叩いてみました。

今回はこのエンドポイントに対してLambdaから同時実行を行い、そのレスポンスをAWS X-Rayにて可視化してみました。

背景

前回のエントリを見ていただければ分かる通り、タスクとなっているのは「画像による人物の骨格推定」となります。つまりゴールとしてはカメラから映像をAWSにアップロードし、それをフレームごとにループしながら推論し続ける、という運用になります。
推論エンドポイントはインスタンスですので、立ち上げている間は料金がかかります。なので気持ちとしては1台で済ませたいところですが、カメラが複数ある場合、複数の場所から推論を叩くことになるので、負荷試験をしておく、というのは避けられないところかと思います。

せっかく負荷試験をするのであれば、ログを仕込んで数値で確認するよりも、X-Rayを使って視覚的に確認できたほうが便利、ということで今回のエントリとなりました。

やってみた

ではやってみます。

まずはLambda上でX-Rayを叩けるようにSDKを入れましょう。

Lambdaを立ち上げ、Python3.8でFunctionを立ち上げます。

立ち上がったらX-Rayへのアクセスを許可するようにIAMロールを変更します。

設定タブよりロール名をクリックするとIAMが開きます。

ポリシーを開いてAWSXrayWriteOnlyAccessを追加します。

これでXrayにアクセスできるようになりました。

次にテスト用の画像とXrayに必要なライブラリをLambdaにアップロードします。

まずはLambdaを立ち上げた際にデフォルトでできる関数ファイルをローカルにダウンロードします。

ダウンロードしたzipを開くとデフォルトの関数ファイルが入っているので、そのファイルと同じ階層のフォルダにX-RayのSDKとテスト用の画像を入れます。

pip install aws-xray-sdk -t .
pip install jsonpickle -t .

mkdir test
cp test.jpg test/

X-Rayへの書き込みに必要ないライブラリを削除します。最終的には aws_xray_sdk wrapt jsonpickleの3つとテスト用の画像、関数ファイルがあればOKです。削除したらこれらのファイル群をまとめて圧縮し、Lambdaにアップロードします。

Lambdaに戻り、「設定」を開きます。

現時点ではX-Rayによるトレースは有効になっていません。編集ボタンをクリックしてこれを有効にします。

これでX-Rayにつなげる準備は整いました。後は関数ファイルを書き直して、SagemakerへのリクエストとX-Rayの監視処理を加えます。

import boto3
import json
import time
import os
from concurrent.futures import ThreadPoolExecutor
import multiprocessing

from aws_xray_sdk.core import patch
patch(['boto3'])

client = boto3.client('sagemaker-runtime')
endpoint_name = "testInferenceEndpoint"

filename = 'test/test.jpg'

payload = None
with open(filename, 'rb') as f:
    payload = f.read()
    

    
def lambda_handler(event, context):
    # Request.
    
    start = time.time()

    process_list = []
    parent_connection_list = []
    for i in range(10):
        parent_conn, child_conn = multiprocessing.Pipe()
        parent_connection_list.append(parent_conn)
        process_list.append(multiprocessing.Process(target=child_process_func, args=(child_conn,)))

    for process in process_list:
        process.start()

    for parent_connection in parent_connection_list:
        print(parent_connection.recv())
        
    t = time.time() - start
    

    return {
        'statusCode': 200,
        'body': json.dumps(t)
    }
    
def child_process_func(conn):
    i = 0
    
    while i < 10:
        start = time.time()
        response = client.invoke_endpoint(
            EndpointName=endpoint_name, 
            ContentType='application/x-image',
            Accept='application/json',
            Body=payload
            )
        t = time.time() - start
        print(str(os.getpid()) + ": " + str(t))
        i += 1
        
        response_dict = json.loads(response['Body'].read().decode('utf-8'))
    
    conn.send(os.getpid())
    conn.close()

このコードでは10回連続でリクエストを送る処理をマルチプロセスで10回回しています。

このコードを実行すると、このようなログが出てきます。

Test Event Name
test

Response
{
  "statusCode": 200,
  "body": "9.150954961776733"
}

Function Logs
12: 0.3001236915588379
......
......
END RequestId: 42980e0a-5ff3-4af6-99ee-303840cee989
REPORT RequestId: 42980e0a-5ff3-4af6-99ee-303840cee989	Duration: 9158.87 ms	Billed Duration: 9159 ms	Memory Size: 128 MB	Max Memory Used: 128 MB	Init Duration: 323.16 ms	
XRAY TraceId: 1-62265301-22c549c31458a43601d4d4cc	SegmentId: 6e99b3d80be9ca4a	Sampled: true

Request ID
42980e0a-5ff3-4af6-99ee-303840cee989

このXRAY TraceIdをコピーします。

X-Rayを立ち上げ、検索窓にそのTraceIdを貼り付けると監視結果が表示されます。

平均のリクエスト時間や個々のリクエストの様子などが一覧になって表示されます。これは便利ですね!

まとめ

今回はSagemakerへのリクエストをLambdaで実装し、そのレスポンスをX-Rayで監視してみました。

機械学習をやっていると特にブラックボックスに感じる事が多いので、こういうロギング、可視化ツールが簡単に組み込めるのはとても安心材料になります。
Sagemaker内部のログと比較してLambda Sagemaker間のラグがどれくらいになるのか、初回とそれ以降でもタイムが違うなど、色々Tipsが隠されていますので、ぜひ皆さん実装してみてください。