[アップデート]Amazon Bedrock AgentCore Runtime の双方向ストリーミングを試してみた

[アップデート]Amazon Bedrock AgentCore Runtime の双方向ストリーミングを試してみた

2025.12.07

はじめに

こんにちは、スーパーマーケットが好きなコンサルティング部の神野です。re:Inventも終わって無事帰国できてホッとしております。

先日、Amazon Bedrock AgentCore Runtime (以下AgentCore Runtime)が双方向ストリーミング(Bidirectional Streaming)に対応したというアップデートが発表されました!

https://aws.amazon.com/about-aws/whats-new/2025/12/bedrock-agentcore-runtime-bi-directional-streaming/

概要(翻訳)

Amazon Bedrock AgentCore Runtime が双方向ストリーミングをサポートし、エージェントが会話中に中断やコンテキストの変化に対応しながら、リアルタイムで同時に対話できるようになりました。この機能により、インタラクション全体を通してコンテキストが維持される継続的な双方向コミュニケーションが可能になり、会話における摩擦が解消されます。

従来のエージェントでは、ユーザーはエージェントの応答が完了するまで明確化や修正を提供できず、会話の流れが途切れ、特に音声アプリケーションでは不自然に感じられる、停止と開始を繰り返すようなインタラクションが発生していました。双方向ストリーミングは、継続的なコンテキスト処理を可能にすることで、この制限に対処します。これにより、ユーザーが会話中に中断したり、明確化したり、方向転換したりできる、自然な会話体験を提供する音声エージェントを強化するとともに、応答性の向上を通じてテキストベースのインタラクションも強化します。AgentCore Runtime に組み込まれているこの機能により、リアルタイムストリーミング機能を構築するために必要だった数ヶ月のエンジニアリング作業が不要になるため、開発者は複雑なストリーミングインフラストラクチャの管理ではなく、革新的なエージェントエクスペリエンスの構築に集中できます。

この機能は、Amazon Bedrock AgentCore Runtime が利用可能なすべての 9 つの AWS リージョン(米国東部(バージニア北部)、米国東部(オハイオ)、米国西部(オレゴン)、アジアパシフィック(ムンバイ)、アジアパシフィック(シンガポール)、アジアパシフィック(シドニー)、アジアパシフィック(東京)、ヨーロッパ(フランクフルト)、ヨーロッパ(アイルランド))で利用可能です。

AgentCore Runtime の双方向ストリーミングの詳細については、ブログ記事をお読みいただき、AgentCore のドキュメントを参照して、AgentCore Starter Toolkit を使い始めてください。AgentCore Runtime の従量課金制の料金体系では、エージェントの実行中に消費されたアクティブなリソースに対してのみ料金が発生し、アイドル時間や初期費用はかかりません。

WebSocket を使ったリアルタイムな双方向通信が可能になったということですね!
音声会話エージェントやインタラクティブなチャットアプリケーションの実装が楽になりそうです!今回は AgentCore Runtime の双方向ストリーミング機能を使って、リアルタイム会話エージェントを作成・デプロイして挙動を確認してみました!

双方向ストリーミング

従来の HTTP 通信との違い

従来の AgentCore Runtime では、/invocations エンドポイントを使った HTTP リクエスト/レスポンス型の通信が基本でした。
この方式では、クライアントがリクエストを送信し、エージェントの処理が完了するまで待ってからレスポンスを受け取る、という流れになります。もちろんレスポンスをストリーミングで受け取ることは可能でした。

一方、双方向ストリーミングでは WebSocket を使用することで、以下のような特徴があります。

  • クライアントとエージェントが同時にデータを送受信できる
  • コネクションを維持したまま通信するため、接続のオーバーヘッドが少ない
  • ユーザーが会話の途中でエージェントを中断したり、方向転換したりできる

特に音声エージェントのようなリアルタイム性が求められるアプリケーションでは、この双方向ストリーミングが便利ですね!

AgentCore Runtime での WebSocket サポート

AgentCore Runtime は、コンテナのポート 8080/ws パスで WebSocket エンドポイントを待ち受ける仕様になっています。

wss://bedrock-agentcore.<region>.amazonaws.com/runtimes/<EncodedRuntimeArn>/ws

認証方式としては、以下の3つがサポートされています。

  • SigV4 ヘッダー
    • AWS 認証情報を使った署名付きリクエスト
  • SigV4 事前署名 URL
    • 署名をクエリパラメータに含めた URL
  • OAuth 2.0
    • Bearer トークンによる認証

その他

AgentCore Runtime でWebsocketを使用する場合は下記要件を満たす必要があります。
Websocketの場合は/wsエンドポイントを用意するのが特徴ですね。

  • プラットフォーム
    • ARM64(linux/arm64)
  • ポート
    • 8080 を公開
  • エンドポイント
    • /ws - WebSocket エンドポイント
    • /ping - ヘルスチェックエンドポイント(必須)

前提条件

本記事の手順を実行するにあたり、以下の環境を使用しました。

  • Python
    • 3.13.6
  • uv
  • AWS アカウント
    • AgentCore Runtime および Bedrock モデルを使用するための適切な IAM 権限
  • リージョン
    • us-west-2(オレゴン)
  • 使用モデル
    • Claude Haiku 4.5(us.anthropic.claude-haiku-4-5-20251001-v1:0

実際にやってみる

それでは、実際に Strands Agents を使った双方向ストリーミング対応のエージェントを作成していきます。

プロジェクトのセットアップ

まずは、uv を使ってプロジェクトを作成し、必要なパッケージをインストールします。

# プロジェクトの作成と初期化
uv init agentcore-websocket-strands
cd agentcore-websocket-strands

# 必要なパッケージのインストール
uv add bedrock-agentcore strands-agents bedrock-agentcore-starter-toolkit

これで pyproject.toml が自動生成され、依存関係が追加されます。

Strands エージェントのコード作成

次に、WebSocket 双方向ストリーミングに対応した Strands エージェントのコードを作成します。

websocket_strands_agent.py という名前でファイルを作成します。

import asyncio
import json
from bedrock_agentcore import BedrockAgentCoreApp
from strands import Agent
from strands.models.bedrock import BedrockModel
from starlette.websockets import WebSocketDisconnect

app = BedrockAgentCoreApp()

# Bedrock モデルの設定(Claude Haiku 4.5)
model = BedrockModel(
    model_id="us.anthropic.claude-haiku-4-5-20251001-v1:0",
    region_name="us-west-2"
)

# Strands エージェントの初期化
agent = Agent(
    model=model,
    system_prompt="""あなたは親切で知識豊富なAIアシスタントです。
ユーザーの質問に対して、簡潔かつ分かりやすく回答してください。
日本語で回答してください。""",
    callback_handler=None
)

@app.websocket
async def websocket_handler(websocket, context):
    """割り込み対応の WebSocket ハンドラー"""
    await websocket.accept()
    print("WebSocket 接続を受け入れました")

    # 現在実行中のストリーミングタスク
    current_task: asyncio.Task | None = None

    async def stream_response(user_message: str):
        """ストリーミングレスポンスを送信(キャンセル可能)"""
        full_response = ""
        try:
            async for event in agent.stream_async(user_message):
                if "data" in event:
                    chunk = event["data"]
                    full_response += chunk
                    await websocket.send_json({
                        "type": "chunk",
                        "data": chunk
                    })

            # 正常完了
            await websocket.send_json({
                "type": "complete",
                "fullResponse": full_response
            })
            print("レスポンス完了")

        except asyncio.CancelledError:
            # 割り込みによるキャンセル
            await websocket.send_json({
                "type": "interrupted",
                "partialResponse": full_response
            })
            print("割り込みでキャンセルされました")
            raise  # CancelledError は必ず再送出

    try:
        while True:
            # クライアントからのメッセージを受信
            raw_data = await websocket.receive_text()
            data = json.loads(raw_data)
            user_message = data.get("inputText", "")

            if not user_message:
                await websocket.send_json({"error": "inputText が見つかりません"})
                continue

            print(f"受信: {user_message}")

            # 既存のタスクがあればキャンセル(割り込み)
            if current_task and not current_task.done():
                print("割り込み: 現在の回答をキャンセルします")
                current_task.cancel()
                try:
                    await current_task  # キャンセル完了を待機
                except asyncio.CancelledError:
                    pass  # 期待通りの例外

            # 新しいストリーミングタスクをバックグラウンドで開始
            current_task = asyncio.create_task(stream_response(user_message))

    except WebSocketDisconnect:
        print("クライアントが切断しました")
    except Exception as e:
        print(f"エラー: {e}")
        try:
            await websocket.send_json({"error": str(e)})
        except Exception:
            pass
    finally:
        # クリーンアップ: 残存タスクをキャンセル
        if current_task and not current_task.done():
            current_task.cancel()
            try:
                await current_task
            except asyncio.CancelledError:
                pass

if __name__ == "__main__":
    app.run(log_level="info")

このコードでは以下のことを行っています。

  • バックグラウンドタスク
    • asyncio.create_task() でストリーミング処理をバックグラウンドで実行
  • 割り込み対応
    • 新しいメッセージを受信したら、既存のタスクを cancel() してから新しいタスクを開始
  • キャンセル時の通知
    • asyncio.CancelledError をキャッチして interrupted メッセージを送信

また特徴としては@app.websocketデコレータでwebsocketのエントリーポイントを指定するだけで難しいことを考えずとも、処理が待ち受けできるのはメリットの1つかと思います。要件に記載した/wsエンドポイントが実装されるイメージです。

ローカルでのテスト

デプロイする前に、ローカルで動作確認を行います。

エージェントの起動

ターミナルを開いて、エージェントを起動します。

uv run websocket_strands_agent.py

正常に起動すると、以下のようなログが出力されます。

INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

テストクライアントの作成

別のターミナルを開いて、WebSocket クライアントを作成します。割り込み機能が期待通り動くかテストしてみます。websocket_client_local.py という名前でファイルを作成します。

websocket_client_local.py
import asyncio
import websockets
import json

async def test_interruption():
    """割り込みテスト用クライアント"""
    uri = "ws://localhost:8080/ws"

    async with websockets.connect(uri) as websocket:

        async def receive_responses():
            """レスポンス受信ループ"""
            while True:
                response = await websocket.recv()
                data = json.loads(response)

                if data.get("type") == "chunk":
                    print(data["data"], end="", flush=True)
                elif data.get("type") == "complete":
                    print("\n\n[完了]")
                elif data.get("type") == "interrupted":
                    print(f"\n\n[割り込み中断] 途中経過: {data['partialResponse'][:50]}...")
                elif "error" in data:
                    print(f"\n[エラー] {data['error']}")

        # 受信タスクをバックグラウンドで開始
        receive_task = asyncio.create_task(receive_responses())

        try:
            # 最初の質問を送信(長い回答が返ってくる質問)
            print("送信: 日本の歴史について詳しく教えてください")
            print("-" * 50)
            await websocket.send(json.dumps({
                "inputText": "日本の歴史について詳しく教えてください"
            }))

            # 2秒後に割り込み
            await asyncio.sleep(2)

            print("\n" + "=" * 50)
            print(">>> 割り込み送信: やっぱりPythonについて教えて")
            print("=" * 50 + "\n")
            await websocket.send(json.dumps({
                "inputText": "やっぱりPythonについて教えて"
            }))

            # 回答完了を待つ(サンプルのため固定時間で待機)
            await asyncio.sleep(15)

        finally:
            receive_task.cancel()
            try:
                await receive_task
            except asyncio.CancelledError:
                pass

if __name__ == "__main__":
    asyncio.run(test_interruption())

このクライアントの実装ポイントは3つです。

  • 並行送受信

    • asyncio.create_task() で受信ループをバックグラウンドで実行
  • 割り込みテスト

    • 最初の質問の回答中に、2秒後に別の質問を送信して割り込み
  • メッセージタイプの処理

    • chunkcompleteinterrupted を適切に処理
      • Runtime側で取り決めたレスポンスタイプを分類して処理
        • chunkならそのまま表示、completeなら[完了]と表示、interruptedなら割り込み中断して今までの途中メッセージを送信して50文字取り出して表示

テストクライアントを実行します。

uv run websocket_client_local.py

以下のような出力が得られれば成功です!

送信: 日本の歴史について詳しく教えてください
--------------------------------------------------
# 日本の歴史について

日本の歴史を主要な時代別にご説明します。

## 主な時代区分

**古代**
- 縄文時代(約12,000年前~):狩猟採集社会
- 弥生時代(紀元前3世紀~3世紀):稲作農業の開始
- 古墳時代~飛鳥時代:国家形
==================================================
>>> 割り込み送信: やっぱりPythonについて教えて
==================================================

[割り込み中断] 途中経過: # 日本の歴史について

日本の歴史を主要な時代別にご説明します。

## 主な時代区分

**古代...

エージェントの回答途中で割り込みができていますね![割り込み中断] メッセージが表示され、途中までの回答が返ってきています。

AgentCore Runtime へのデプロイ

ローカルでの動作確認ができたら、次は AgentCore Runtime にデプロイします。
デプロイには、セットアップ時にインストールした Amazon Bedrock AgentCore Starter Toolkit を使用します。

設定とデプロイ

エージェントの設定を行います。

uv run agentcore configure -e websocket_strands_agent.py

このコマンドを実行すると、対話形式でいくつかの質問に回答します。
基本的にはデフォルト値で問題ありません。設定情報は .bedrock_agentcore.yaml に保存されます。

次に、デプロイを実行します。

uv run agentcore launch

デプロイが完了すると、以下のような ARN が出力されます。

arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/websocket_strands_agent-abc123

この ARN は後で使用するので、メモしておいてください!

デプロイしたエージェントの呼び出し

デプロイが完了したら、実際にデプロイしたエージェントに接続してみましょう。

環境変数の設定

まず、エージェントの ARN を環境変数に設定します。

export AGENT_ARN="arn:aws:bedrock-agentcore:us-west-2:123456789012:runtime/websocket_strands_agent-abc123"

クライアントの実装

client.generate_ws_connectionでRuntimeへの接続URLを作成し、WebSocketで接続します。
接続して、2秒後に異なる質問をして割り込みを行います。

websocket_client_deployed.py
import asyncio
import json
import os

import websockets
from bedrock_agentcore.runtime import AgentCoreRuntimeClient

async def main():
    # 環境変数から ARN を取得
    runtime_arn = os.getenv("AGENT_ARN")
    if not runtime_arn:
        raise ValueError("AGENT_ARN 環境変数が設定されていません")

    # AgentCoreRuntimeClient の初期化
    client = AgentCoreRuntimeClient(region="us-west-2")

    # 接続URL、ヘッダーを作成
    ws_url, headers = client.generate_ws_connection(
        runtime_arn=runtime_arn, session_id=None
    )

    print("AgentCore Runtime に接続します...\n")

    try:
        async with websockets.connect(ws_url, additional_headers=headers) as ws:

            async def receive_responses():
                """レスポンス受信ループ"""
                while True:
                    response = await ws.recv()
                    data = json.loads(response)

                    if data.get("type") == "chunk":
                        print(data["data"], end="", flush=True)
                    elif data.get("type") == "complete":
                        print("\n\n[完了]")
                    elif data.get("type") == "interrupted":
                        print(
                            f"\n\n[割り込み中断] 途中経過: {data['partialResponse'][:50]}..."
                        )
                    elif "error" in data:
                        print(f"\n[エラー] {data['error']}")

            # 受信タスクをバックグラウンドで開始
            receive_task = asyncio.create_task(receive_responses())

            try:
                # 最初の質問を送信
                print("送信: AWSの主要サービスについて詳しく教えてください")
                print("-" * 50)
                await ws.send(
                    json.dumps(
                        {"inputText": "AWSの主要サービスについて詳しく教えてください"}
                    )
                )

                # 2秒後に割り込み
                await asyncio.sleep(2)

                print("\n" + "=" * 50)
                print(">>> 割り込み送信: やっぱりEC2だけ教えて")
                print("=" * 50 + "\n")
                await ws.send(json.dumps({"inputText": "やっぱりEC2だけ教えて"}))

                # 回答完了を待つ(サンプルのため固定時間で待機)
                await asyncio.sleep(15)

            finally:
                receive_task.cancel()
                try:
                    await receive_task
                except asyncio.CancelledError:
                    pass

    except websockets.exceptions.InvalidStatus as e:
        print(f"WebSocket ハンドシェイクに失敗しました: {e.response.status_code}")
    except Exception as e:
        print(f"接続エラー: {e}")

if __name__ == "__main__":
    asyncio.run(main())

クライアントを実行します。実行する際はbedrock-agentcore:InvokeAgentRuntimeWithWebSocketStreamの権限が必要なので、注意しましょう。

uv run websocket_client_deployed.py

以下のような出力が得られれば成功です!

CleanShot 2025-12-07 at 15.53.36

uv run websocket_client_deployed.py
AgentCore Runtime に接続します...

送信: AWSの主要サービスについて詳しく教えてください
--------------------------------------------------
# AWSの主要サービス

AWSには200以上のサービスがありますが、主要なものを以下に整理しました。

## **コンピューティング**
- **EC2** - 仮想サーバー
- **Lambda** - サーバーレスコンピューティング
- **ECS/EKS** - コンテナオーケストレーション

## **ストレージ**
- **S3** - オブジェクトストレージ(最も利用されるサービス)
- **EBS** - ブロックストレージ
- **EFS** - ファイルストレージ

## **データベース**
- **RDS** - 関係型データベース(MySQL、PostgreSQL等
==================================================
>>> 割り込み送信: やっぱりEC2だけ教えて
==================================================

)
- **DynamoDB** - NoSQLデータベース
- **ElastiCache** - キャッシュサービス

## **ネットワーク・

[割り込み中断] 途中経過: # AWSの主要サービス

AWSには200以上のサービスがありますが、主要なものを以下に整理しまし...
# AWS EC2について

## EC2とは
**Elastic Compute Cloud** の略で、AWSのクラウド上で仮想サーバーを利用できるサービスです。

## 主な特徴

| 項目 | 内容 |
|------|------|
| **スケーラビリティ** | 必要に応じてサーバー数を増減できる |
| **料金体系** | 使用した分だけ支払う(従量課金) |
| **OS選択** | Linux、Windows等から選択可能 |
| **インスタンスタイプ** | 用途に応じて性能を選べる |

## よく使われるインスタンスタイプ
- **t2/t3** - 小~中規模アプリケーション
- **m5/m6** - 汎用サーバー
- **c5/c6** - 計算集約型処理
- **g4/p3** - GPU利用

## 基本的な流れ
1. インスタンスタイプを選択
2. インスタンスを起動
3. セキュリティグループ設定
4. アプリケーションをデプロイ
5. 必要に応じてスケール

シンプルに始められるので、クラウド初心者にもおすすめです。

何か具体的に知りたい点はありますか?

[完了]

AgentCore Runtime にデプロイしたエージェントでも、割り込みが正しく動作していますね!
割り込み後は新しい質問(EC2について)に回答しています。

双方向ストリーミングでも今までと同じく session_id(runtimeSessionId)を指定できます。同じ session_id を指定して接続すると、その WebSocket 接続が継続している間は会話コンテキストを維持しやすくなります。なお、切断後の長期的な記憶を保持したい場合は別途 Memory 等の仕組みと組み合わせる設計が必要になります。こちらも今までと同じですね。

おわりに

今回は、Amazon Bedrock AgentCore Runtime の双方向ストリーミング機能を使ってエージェントを作成してみました!

実際に使ってみて感じたのは、WebSocket の複雑な実装を意識することなく、ライブラリを使えばデコレータを使うだけで双方向通信が実現できる点が良いなと思いました。
この双方向性を活かしたリアルタイム音声会話も面白そうですね。たかくにさんが書いたブログと組み合わせてAgentCore Runtimeでホストするのも面白そうです。

https://dev.classmethod.jp/articles/build-voice-interface-agent-bidiagent-openai-experimental/

今後は、より実践的なユースケースでの活用を試していきたいと思います!本記事が少しでも参考になりましたら幸いです。
最後までご覧いただきありがとうございましたー!!

補足

実際にWebSocketを活用したAIエージェントを作成する際は下記公式ドキュメントが参考になるので、ぜひ必要に応じてご参照ください。

https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/runtime-get-started-websocket.html

この記事をシェアする

FacebookHatena blogX

関連記事