AWS Bedrockのストリーミング応答を実装してみた - converse_streamのPythonサンプル

AWS Bedrockのストリーミング応答を実装してみた - converse_streamのPythonサンプル

AWS Bedrockのconverse_stream APIを使ったPythonサンプルコードを解説。ストリーミング形式でAIモデルからの応答を受け取る実装方法と、必要な権限設定について詳しく説明します。
Clock Icon2025.03.18

タイオフィスの三並です。

AWS Bedrockを使ったアプリケーション開発を進める中で、converse_stream APIを使ったPythonのサンプルコードがネット上になかなか見つからず苦労しました。そこで今回は、自分で実装したサンプルコードを共有したいと思います。

はじめに

AWS Bedrockは、Amazon Web Servicesが提供する生成AIサービスです。Claude、Llama 2などの大規模言語モデルを簡単に利用できるAPIを提供しています。特にconverse_stream APIは、AIモデルからの応答をストリーミング形式で受け取ることができるため、ユーザー体験を向上させることができます。

しかし、このAPIを使ったPythonのサンプルコードがネット上に見つからなかったため、自分で実装してみました。

前提条件

サンプルプログラムを動かすためには、以下の条件を満たす必要があります。

Bedrockの権限設定

Bedrockを利用するには適切な権限が必要です。以下のいずれかの方法で権限を設定してください。

  1. AWS Management Consoleログイン

    • Bedrockの権限を持つユーザーでログインし、CloudShellで実行する方法
    • 開発・テスト用に手軽に試せます
  2. IAM Roleを使用

    • EC2インスタンス用にBedrockの権限を持つIAM Roleを作成
    • インスタンスプロファイルとして設定
    • 本番環境での推奨方法
  3. IAM User + ACCESS KEY

    • Bedrockの権限を持つIAM Userを作成し、ACCESS KEYを発行
    • Python実行環境に設定
    • ※セキュリティリスクがあるため、推奨しません

Bedrockでモデルの有効化

今回のサンプルではOregonRegionで、 Claude 3.7 Sonet を使っています。事前にこのモデルを有効化してください。

サンプルプログラムの解説

以下に実装したサンプルプログラムの主要部分を解説します。
完全なサンプルプログラムのブログの最後の方に記載します。

1. Bedrockクライアントの初期化

bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',
    region_name='us-west-2'
)

Bedrockのランタイムクライアントを初期化しています。リージョンは必要に応じて変更してください。

2. 会話メッセージの設定

messages = [
    {
        "role": "user",
        "content": [
            {
                "text": "AIの最近の進歩について教えてください。"
            }
        ]
    }
]

ユーザーからの質問を設定しています。複数のメッセージを配列として渡すことで、会話の履歴を含めることも可能です。

3. モデルの選択

model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"

使用するモデルを指定します。この例ではClaude 3.7 Sonnetを使用していますが、他のモデルも利用可能です。

4. ストリーミングリクエストの実行

response = bedrock_runtime.converse_stream(
    modelId=model_id,
    messages=messages,
    inferenceConfig={
        "maxTokens": 1000,
        "temperature": 0.0,
        "topP": 0.9
    }
)

converse_stream APIを呼び出し、ストリーミングレスポンスを取得します。inferenceConfigでは、生成するトークン数やTemparatureなどのパラメータを設定できます。

5. ストリーミングレスポンスの処理

for event in response['stream']:
    # イベントタイプに基づいて処理
    if 'messageStart' in event:
        role = event['messageStart']['role']
        print(f"\n[{role}の応答開始]")

    elif 'contentBlockDelta' in event:
        if 'delta' in event['contentBlockDelta'] and 'text' in event['contentBlockDelta']['delta']:
            text = event['contentBlockDelta']['delta']['text']
            print(text, end="", flush=True)
            full_response += text

ストリーミングレスポンスは複数のイベントとして返されます。主なイベントタイプは以下の通りです:

  • messageStart: 応答の開始
  • contentBlockStart: コンテンツブロックの開始
  • contentBlockDelta: 実際のテキスト内容
  • contentBlockStop: コンテンツブロックの終了
  • messageStop: 応答の終了

特に重要なのはcontentBlockDeltaで、ここに実際のテキスト内容が含まれています。

完全なサンプルコード

以下が完全なサンプルコードです。

import boto3
from botocore.exceptions import ClientError

def bedrock_converse_stream():
    # Bedrock クライアントの初期化
    bedrock_runtime = boto3.client(
        service_name='bedrock-runtime',
        region_name='us-west-2'
    )

    # 会話のメッセージを設定
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "text": "AIの最近の進歩について教えてください。"
                }
            ]
        }
    ]

    # モデルとリクエストパラメータの設定
    model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"

    try:
        # ストリーミングレスポンスのリクエスト
        response = bedrock_runtime.converse_stream(
            modelId=model_id,
            messages=messages,
            inferenceConfig={
                "maxTokens": 1000,
                "temperature": 0.0,
                "topP": 0.9
            }
        )

        # ストリーミングレスポンスの処理
        print("AIからの応答:")
        full_response = ""

        # デバッグ用に各イベントの構造を出力
        for event in response['stream']:
            # イベントタイプに基づいて処理
            if 'messageStart' in event:
                role = event['messageStart']['role']
                print(f"\n[{role}の応答開始]")  # 必要に応じてコメント解除

            elif 'contentBlockStart' in event:
                content_type = event['contentBlockStart']['contentType']
                print(f"\n[{content_type}コンテンツ開始]", end="")  # 必要に応じてコメント解除

            elif 'contentBlockDelta' in event:
                # 修正: delta.text から取得
                if 'delta' in event['contentBlockDelta'] and 'text' in event['contentBlockDelta']['delta']:
                    text = event['contentBlockDelta']['delta']['text']
                    print(text, end="", flush=True)
                    full_response += text

            elif 'contentBlockStop' in event:
                print(f"\n[コンテンツ終了]")  # 必要に応じてコメント解除
                pass

            elif 'messageStop' in event:
                print(f"\n[応答終了]")  # 必要に応じてコメント解除
                pass

            elif 'error' in event:
                print(f"\nエラーが発生しました: {event['error']}")

        print("\n\n完全な応答:")
        print(full_response)

    except ClientError as err:
        print(f"エラーが発生しました: {err}")

if __name__ == "__main__":
    bedrock_converse_stream()

実行結果

このプログラムを実行すると、AIからの応答がリアルタイムで表示されます。各トークンが順次表示され、最終的に完全な応答が表示されます。

まとめ

AWS Bedrockのconverse_stream APIを使うことで、AIモデルからの応答をストリーミング形式で受け取ることができます。これにより、ユーザーはAIが考えている過程をリアルタイムで見ることができ、より自然な対話体験を提供できます。
また、converseメソッドで READ TIMEOUT のエラーが出る場合の対策としても使えます。

今回紹介したサンプルコードは基本的な実装ですが、実際のアプリケーションでは、会話履歴の管理や、エラーハンドリングの強化、UIとの連携などを考慮する必要があります。

AWS Bedrockは比較的新しいサービスですが、生成AIを活用したアプリケーション開発において強力なツールとなります。ぜひ、このサンプルコードを参考に、独自のアプリケーション開発に挑戦してみてください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.