Amazon Connect AIエージェントのセルフサービスに質問リストを一括送信して回答とレイテンシをCSV出力するスクリプトを作成してみた
はじめに
Amazon Connect AIエージェントのセルフサービスを利用し、自動回答の精度検証を行うにあたり、以下のようなテスト手順を実施している方も多いのではないでしょうか。
- 20〜30個の想定質問を用意する。
- チャットインターフェースにて1問ずつ手動で入力し、回答を得る。
- 得られた回答の精度を人間が目視で評価する。
- プロンプトやナレッジベースを修正し、再度1〜3の手順を繰り返す。
この「手動での入力・再評価」の工数は非常に大きく、検証のボトルネックになりがちです。
そこで本記事では、AWS SDK (Boto3) と WebSocket を活用したスクリプトを用いて、用意した複数の質問リストに対して一括で回答を取得する方法を紹介します。
また、一括回答取得を行う際、各質問に対するレスポンス時間(レイテンシ)もあわせて計測し、パフォーマンス評価にも役立てます。
前提条件
本記事では、テスト実行用のスクリプト作成に焦点を当てています。そのため、以下の環境はすでに構築済みであることを前提とします。
- Amazon Connect インスタンスが作成済みであること。
- Amazon Connect AIエージェントを組み込んだチャット用のコンタクトフローが作成済みであること。
- AIエージェント(ナレッジベース等)が設定され、チャットで質問すれば回答が返ってくる状態であること。
CloudShellで実行
今回は環境構築の手間を省くため、AWS CloudShell 上で実行します。
CloudShell を開き、WebSocket 通信に必要なライブラリをインストールします。
pip3 install websocket-client
テストスクリプトの作成
Python の ThreadPoolExecutor を使用して並列処理を行うスクリプトを作成します。
挨拶メッセージのスキップ処理
本スクリプトでは、チャット開始時にボットから自動送信される挨拶(例:「お問い合わせ内容をお伝え下さい」)を無視するために、以下の定数を使用しています。
# このメッセージが含まれていたら「挨拶」とみなして無視する
WAIT_FOR_MESSAGE = "お問い合わせ内容をお伝え下さい"
この WAIT_FOR_MESSAGE に設定する文字列は、Amazon Connect のコンタクトフロー内で設定されている初期プロンプトの文言と一致させる必要があります(本スクリプトでは「部分一致」で判定しています)。
具体的には、Amazon Connect フローデザイナーの 「顧客の入力を取得する (Get customer input)」 ブロックにおいて、 「テキスト読み上げまたはチャットテキスト」に入力されたテキストです。

フローの文言を修正した際は、必ずスクリプト側の WAIT_FOR_MESSAGE も合わせて更新してください。
コード
以下のコマンドをコピー&ペーストして 、AWS CloudShellにてtest_chat.py を作成してください。
以下の変数はご自身の環境に合わせて変更してください。
INSTANCE_ID: ConnectインスタンスIDCONTACT_FLOW_ID: フローIDREGION: リージョンWAIT_FOR_MESSAGE: フローで設定している最初の挨拶メッセージQUESTIONS: 質問内容
cat << 'EOF' > test_chat.py
import boto3
import json
import time
import websocket
import csv
import sys
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- 設定値 (環境に合わせて変更してください) ---
INSTANCE_ID = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
CONTACT_FLOW_ID = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
REGION = 'ap-northeast-1'
# このメッセージが含まれていたら「挨拶」とみなして無視する
# ※Amazon Connectのフローブロック「顧客の入力の取得」に設定しているメッセージです
WAIT_FOR_MESSAGE = "お問い合わせ内容をお伝え下さい"
# テストしたい質問リスト
QUESTIONS = [
"クラスメソッドという会社について教えてください",
"クラスメソッドはどのような会社ですか?",
"株式会社クラスメソッドの概要を知りたいです"
]
# テスト実行時の設定
MAX_WORKERS = 5 # 同時実行数
ANSWER_IDLE_SEC = 2.0 # 最後のメッセージからこの秒数経過したら「回答完了」とみなす
OVERALL_TIMEOUT = 60.0 # 1問あたりの最大待機時間
# -------------
def test_one_question(question_text):
# APIレートリミット回避のためのジッター
time.sleep(random.uniform(0.5, 2.0))
# Boto3クライアントをスレッドごとに作成
connect_client = boto3.client('connect', region_name=REGION)
participant_client = boto3.client('connectparticipant', region_name=REGION)
ws = None
connection_token = None
contact_id = None
phase = "init"
try:
# 1. チャット開始
phase = "StartChatContact"
start_resp = connect_client.start_chat_contact(
InstanceId=INSTANCE_ID,
ContactFlowId=CONTACT_FLOW_ID,
ParticipantDetails={'DisplayName': 'TestUser'},
Attributes={'test_mode': 'true'}
)
token = start_resp['ParticipantToken']
contact_id = start_resp['ContactId']
# 2. 接続情報取得
phase = "CreateParticipantConnection"
connection_resp = participant_client.create_participant_connection(
Type=['WEBSOCKET', 'CONNECTION_CREDENTIALS'],
ParticipantToken=token
)
ws_url = connection_resp['Websocket']['Url']
connection_token = connection_resp['ConnectionCredentials']['ConnectionToken']
# 3. WebSocket接続
phase = "WebSocketConnect"
ws = websocket.create_connection(ws_url)
ws.send(json.dumps({"topic": "aws/subscribe", "content": {"topics": ["aws/chat"]}}))
# --- フェーズ1: 挨拶メッセージ待ち ---
phase = "WaitGreeting"
ws.settimeout(5.0)
try:
while True:
result = ws.recv()
data = json.loads(result)
if 'content' in data and isinstance(data['content'], str):
content_json = json.loads(data['content'])
# 挨拶文が含まれていたらループを抜けて次へ
if WAIT_FOR_MESSAGE in content_json.get('Content', ''):
break
except Exception:
pass # 挨拶が来なくても(タイムアウトしても)次へ進む
# --- フェーズ2: 質問送信 ---
phase = "SendMessage"
time.sleep(0.5)
# 送信直前の時刻を記録(ここを基準にレイテンシを計測)
send_time = time.time()
participant_client.send_message(
ContentType='text/plain',
Content=question_text,
ConnectionToken=connection_token
)
# --- フェーズ3: 回答受信(バッファリング方式) ---
phase = "ReceiveAnswer"
received_messages = []
last_msg_time = None # 最後のメッセージ受信時刻
while True:
# 全体タイムアウト判定
if (time.time() - send_time) > OVERALL_TIMEOUT:
raise TimeoutError("Overall timeout exceeded")
# Idle判定(最後のメッセージから一定時間経過したら完了とみなす)
if last_msg_time and (time.time() - last_msg_time) > ANSWER_IDLE_SEC:
break
# WebSocket受信(短いタイムアウトでポーリング)
ws.settimeout(1.0)
try:
result = ws.recv()
data = json.loads(result)
if 'content' in data and isinstance(data['content'], str):
content_json = json.loads(data['content'])
# メッセージタイプ判定
if (content_json.get('Type') == 'MESSAGE' and
content_json.get('ContentType') == 'text/plain'):
content = content_json.get('Content', '')
role = content_json.get('ParticipantRole', '')
display_name = content_json.get('DisplayName', '')
# 自分の送信メッセージを除外するフィルタ
# AWS仕様に基づき、Roleが'CUSTOMER'のものは除外
if role == 'CUSTOMER': continue
# 念のためDisplayNameや内容でもガード
if display_name == 'TestUser': continue
if content == question_text: continue
# 挨拶文の再送などが混ざった場合も除外
if WAIT_FOR_MESSAGE in content: continue
# メッセージをバッファに追加
received_messages.append(content)
# 時刻記録
last_msg_time = time.time()
except websocket.WebSocketTimeoutException:
# 受信なし(沈黙)。ループを回してIdle判定へ
continue
except Exception:
break
# 結果の集計
if received_messages:
bot_answer = "\n".join(received_messages)
# Latency: 最後の文字を受信するまでの時間
latency = last_msg_time - send_time
else:
bot_answer = "No Response"
latency = 0.0
return bot_answer, latency
except Exception as e:
return f"Error ({phase}): {str(e)}", 0.0
finally:
# クリーンアップ処理
if ws:
try: ws.close()
except: pass
if connection_token:
try: participant_client.disconnect_participant(ConnectionToken=connection_token)
except: pass
# コンタクトの強制終了 (StopContact)
if contact_id:
try:
connect_client.stop_contact(
ContactId=contact_id,
InstanceId=INSTANCE_ID
)
except Exception: pass
if __name__ == "__main__":
results = []
print(f"=== 並行テスト開始 (全{len(QUESTIONS)}問 / {MAX_WORKERS}並列) ===")
print(f"※ タイムアウト設定: {OVERALL_TIMEOUT}秒 / Idle判定: {ANSWER_IDLE_SEC}秒")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Futureと「質問インデックス」を紐付ける
future_to_info = {
executor.submit(test_one_question, q): (i, q)
for i, q in enumerate(QUESTIONS, 1)
}
for future in as_completed(future_to_info):
index, question = future_to_info[future]
try:
ans, lat = future.result()
lat_rnd = round(lat, 2)
# 画面表示用に見やすく整形
clean_ans = ans.replace('\n', ' ').replace('\r', '')
display_ans = (clean_ans[:40] + '...') if len(clean_ans) > 40 else clean_ans
print(f"[No.{index} 完了] Latency:{lat_rnd}s -> {display_ans}")
results.append([index, question, ans, lat_rnd])
except Exception as exc:
print(f"[No.{index} エラー] {question} : {exc}")
results.append([index, question, f"Error: {exc}", 0.0])
# インデックス順(No.1, No.2...)にソート
results.sort(key=lambda x: x[0])
filename = 'result.csv'
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# CSVヘッダー
writer.writerow(['No', 'Question', 'Answer', 'Latency'])
writer.writerows(results)
print("=== 完了 ===")
print(f"結果ファイル '{filename}' が作成されました(質問順にソート済み)。")
EOF
処理の概要
このスクリプトは、以下のフローを質問の数だけ並列で実行します。
- チャット開始 (
StartChatContact):- Amazon Connect とのセッションを開始し、トークンとコンタクトIDを取得します。
- APIレート制限(スロットリング)を回避するため、各スレッドの開始タイミングをランダムにずらしています。
- 接続確立 (
CreateParticipantConnection):- WebSocket の接続 URL を取得し、接続します。
- 挨拶のスキップ:
- 前述の
WAIT_FOR_MESSAGEで設定した定型文を受信した場合、それを無視して次の処理へ進みます。
- 前述の
- 質問送信 (
SendMessage):- 用意した質問テキストを送信します。
- 回答受信と計測:
- ボットからの回答を受信します。生成AIは回答を分割して送信する場合があるため、「メッセージ受信後、一定時間(2秒)沈黙したら回答完了」とみなすロジックを採用しています。
- AWSの仕様に基づき、
ParticipantRoleがCUSTOMER(自分)のメッセージは除外し、ボットの回答のみを抽出します。 - 回答完了までの時間(レイテンシ)を計測します。
- 終了処理 (
StopContact):- テスト終了後、サーバー側のセッションを確実に終了させ、同時接続数のクォータを解放します。
- 結果のソート:
- 並列実行のため完了順序はバラバラになりますが、最終的に質問リストの順番(No順)にソートしてCSVに出力します。
実行結果
スクリプトを実行すると、以下のようにレイテンシと回答が表示されます。
並列実行のため完了順序はバラバラですが、最終的なCSVファイルは質問順(No.1〜)にソートされて出力されます。
python3 test_chat.py
=== 並行テスト開始 (全3問 / 5並列) ===
※ タイムアウト設定: 60.0秒 / Idle判定: 2.0秒
[No.2 完了] Latency:15.06s -> クラスメソッドの本社は日比谷本社オフィスで、〒105-000...
[No.1 完了] Latency:13.75s -> クラスメソッド株式会社は、AWS総合支援サービス「クラスメソ...
[No.3 完了] Latency:13.79s -> 株式会社クラスメソッドは、AWS総合支援サービス「クラスメソ...
=== 完了 ===
結果ファイル 'result.csv' が作成されました(質問順にソート済み)。
CSV出力例
作成された result.csv は以下のようになります。
質問番号(No)順にソートされており、回答内容とレイテンシ(秒)が記録されています。
No,Question,Answer,Latency
1,クラスメソッドという会社について教えてください,"クラスメソッド株式会社は、AWS総合支援サービス「クラスメソッドメンバーズ」を提供する会社です。AWSプレミアティアサービスパートナーとして2014年から継続認定されており...(省略)...",13.75
2,クラスメソッドはどのような会社ですか?,"クラスメソッドの本社は日比谷本社オフィスで、〒105-0003 東京都港区西新橋1-1-1 日比谷フォートタワー26階に所在しています。...(省略)...",15.06
3,株式会社クラスメソッドの概要を知りたいです,"株式会社クラスメソッドは、AWS総合支援サービス「クラスメソッドメンバーズ」を提供する企業です。AWSプレミアティアサービスパートナーとして...(省略)...",13.79
参考
StartChatContact
CreateParticipantConnection
SendMessage








