RAGできるSlackチャットボットを作ってみた

Bedrock, OpenSearch Serverless, App Runner, Slack Bolt, LangChainを利用してRAGを実行できるSlackチャットボットを作成しました。
2024.06.14

こんにちは。たにもんです。

生成AIを活用したアプリケーションの代表例としてRAG (Retrieval-Augmented Generation; 検索拡張生成) があります。 LLMが生成する文章にはもっともらしい嘘(ハルシネーション)が含まれることがありますが、RAGを用いることでハルシネーションを抑える効果が期待できます。

ハルシネーションはLLMが学習していない知識に関する文章を生成する際に発生する可能性が高まりますが、RAGではユーザーの入力に関連する情報を外部から検索してLLMの知識を補ってあげることで精度向上を目指します。

今回はRAGを実行できるSlackチャットボットを作ってみたので紹介します。 実際に動かしてみた様子は以下のとおりです。LLMが学習していないであろう 生成AI環境構築サービス「AI-Starter」 について正しく説明してくれています。

今回作成したコード全体は以下のGitHubリポジトリで公開しているので、興味のある方はご参照ください。

https://github.com/tanimon/ai-chatbot-slack/tree/v2024.06.13

システム構成

Slack Appの作成

まずはインターフェースとなるSlack App(ボット)を作成しましょう。 作成方法の具体的な手順は以下のブログが参考になるかと思います。

[Slack API] Bolt for JavaScript を使用した Getting Started を試してみた | DevelopersIO

今回は以下のようなマニフェストを利用してSlack Appを作成しました。

display_information:
  name: AIチャットボット
  description: RAGができるAIチャットボット
  background_color: "#000000"
features:
  bot_user:
    display_name: ai-chatbot
    always_online: false
oauth_config:
  scopes:
    bot:
      - app_mentions:read
      - chat:write
      - channels:history
      - groups:history
      - im:history
settings:
  event_subscriptions:
    request_url: https://xxxxxxxxxx.ap-northeast-1.awsapprunner.com/slack/events
    bot_events:
      - app_mention
  org_deploy_enabled: false
  socket_mode_enabled: false
  token_rotation_enabled: false

アプリの作成が完了したら、以下の認証情報を控えておいてください。 CDKデプロイ時にパラメータとして設定する必要があります。

  • Basic Information > App Credentialsに記載されているSigning Secret
  • OAuth & Permissions > OAuth Tokens for Your Workspaceに記載されているBot User OAuth Token

アプリケーションの実装

次はSlack Appのロジックを実装しましょう。 今回は Slack Bolt を用いて次のようなコードを書きました。

import logging
import os
import re

from slack_bolt import App, Say

from server.rag import llm_chain, rag_chain

logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG)

# ボットトークンと署名シークレットを使ってアプリを初期化する
app = App(
    token=os.environ.get("SLACK_BOT_TOKEN"),
    signing_secret=os.environ.get("SLACK_SIGNING_SECRET"),
)

# ボットへのメンションに対するイベントリスナー
@app.event("app_mention")
def handle_app_mention(event, say: Say, logger: logging.Logger):
    logger.debug(f"app_mention event: {event}")

    text = event["text"]
    channel = event["channel"]
    thread_ts = event.get("thread_ts") or event["ts"]

    say(channel=channel, thread_ts=thread_ts, text="考え中です...少々お待ちください...")

    payload = remove_mention(text)
    logger.debug(f"payload: {payload}")

    result = (
        rag_chain.invoke(payload) if is_rag_enabled() else llm_chain.invoke(payload)
    )
    logger.debug(f"result: {result}")

    say(channel=channel, thread_ts=thread_ts, text=result)

@app.error
def handle_error(error, event, say: Say, logger: logging.Logger):
    logger.exception(f"エラーが発生しました: {error}")

    channel = event["channel"]
    thread_ts = event.get("thread_ts") or event["ts"]
    say(channel=channel, thread_ts=thread_ts, text=f"エラーが発生しました: {error}")


def remove_mention(text: str) -> str:
    """メンションを除去する"""

    mention_regex = r"<@.*>"
    return re.sub(mention_regex, "", text).strip()


def is_rag_enabled() -> bool:
    """RAGが有効かどうかを返す"""

    return os.environ.get("RAG_ENABLED", "false").lower() == "true"


app.start(port=int(os.environ.get("PORT", 3000)))

今回はRAGを利用するか素のLLMを呼び出すかを環境変数から設定できるようにしました。 以下の部分で環境変数の値に応じてRAGもしくはLLMを利用して回答を生成し、その結果をSlackのスレッドに返信しています。

    result = (
        rag_chain.invoke(payload) if is_rag_enabled() else llm_chain.invoke(payload)
    )
    logger.debug(f"result: {result}")

    say(channel=channel, thread_ts=thread_ts, text=result)

RAGは以下に示したコードのとおり、LangChainを用いて実装しています。 今回は次のような構成でRAGを構築しました。

import os

import boto3
from langchain import hub
from langchain_aws import BedrockEmbeddings
from langchain_aws.chat_models import ChatBedrock
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnablePassthrough
from opensearchpy import RequestsHttpConnection
from requests_aws4auth import AWS4Auth  # type: ignore

embedding = BedrockEmbeddings(
    model_id="amazon.titan-embed-text-v2:0", region_name="us-east-1", client=None
)

credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
    refreshable_credentials=credentials,
    region="ap-northeast-1",
    service="aoss",
)

vectorstore = OpenSearchVectorSearch(
    opensearch_url=os.environ["AOSS_ENDPOINT_URL"],
    index_name=os.environ["AOSS_INDEX_NAME"],
    embedding_function=embedding,
    http_auth=aws_auth,
    timeout=300,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    engine="faiss",
)

retriever = vectorstore.as_retriever()


def format_docs(docs: list[Document]) -> str:
    return "\n\n".join([doc.page_content for doc in docs])


prompt = hub.pull("rlm/rag-prompt")

llm = ChatBedrock(
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    region_name="us-east-1",
    client=None,
    model_kwargs={
        "temperature": 0,
    },
)

rag_chain: Runnable = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

llm_chain: Runnable = llm | StrOutputParser()

プロンプトに関してはLangChain Hubから取得したrlm/rag-promptを利用しました。 こちらのプロンプトの中身は以下のようになっています。

You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Question: {question} 
Context: {context} 
Answer:

インフラをCDKで構築

インフラを以下のCKDコードで構築しました。 OpenSearch Serverlessコレクション、App Runnerサービスおよびそれらに紐付くリソースを作成しています。

import * as apprunner from "@aws-cdk/aws-apprunner-alpha";
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";

export class MainStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const knowledgeBaseCollection =
      new cdk.aws_opensearchserverless.CfnCollection(
        this,
        "KnowledgeBaseCollection",
        {
          name: "knowledge-base",
          type: "VECTORSEARCH",
          standbyReplicas: "DISABLED",
        },
      );

    const knowledgeBaseCollectionEncryptionPolicy =
      new cdk.aws_opensearchserverless.CfnSecurityPolicy(
        this,
        "KnowledgeBaseCollectionEncryptionPolicy",
        {
          name: "knowledge-base-encryption-policy",
          type: "encryption",
          policy: JSON.stringify({
            Rules: [
              {
                ResourceType: "collection",
                Resource: [`collection/${knowledgeBaseCollection.name}`],
              },
            ],
            AWSOwnedKey: true,
          }),
        },
      );

    // NOTE: コレクションを作成する前に、コレクションの名前と一致するリソースパターンを含む暗号化ポリシーを作成しておく必要がある
    // 暗号化ポリシー作成後にコレクションが作成されるように依存関係を設定する
    // @see https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create
    // @see https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/serverless-encryption.html
    knowledgeBaseCollection.addDependency(
      knowledgeBaseCollectionEncryptionPolicy,
    );

    new cdk.aws_opensearchserverless.CfnSecurityPolicy(
      this,
      "KnowledgeBaseCollectionNetworkPolicy",
      {
        name: "knowledge-base-network-policy",
        type: "network",
        policy: JSON.stringify([
          {
            Rules: [
              {
                ResourceType: "collection",
                Resource: [`collection/${knowledgeBaseCollection.name}`],
              },
              {
                ResourceType: "dashboard",
                Resource: [`collection/${knowledgeBaseCollection.name}`],
              },
            ],
            AllowFromPublic: true,
          },
        ]),
      },
    );

    const serverServiceInstanceRole = new cdk.aws_iam.Role(
      this,
      "ServerServiceInstanceRole",
      {
        assumedBy: new cdk.aws_iam.ServicePrincipal(
          "tasks.apprunner.amazonaws.com",
        ),
      },
    );
    const aossIndexingPrincipalArn =
      cdk.aws_ssm.StringParameter.valueForStringParameter(
        this,
        "AossIndexingPrincipalArn",
      );

    new cdk.aws_opensearchserverless.CfnAccessPolicy(
      this,
      "KnowledgeBaseCollectionAccessPolicy",
      {
        name: "knowledge-base-access-policy",
        type: "data",
        policy: JSON.stringify([
          {
            Rules: [
              {
                ResourceType: "collection",
                Resource: [`collection/${knowledgeBaseCollection.name}`],
                Permission: ["aoss:*"],
              },
              {
                ResourceType: "index",
                Resource: [`index/${knowledgeBaseCollection.name}/*`],
                Permission: ["aoss:*"],
              },
            ],
            Principal: [
              serverServiceInstanceRole.roleArn,
              aossIndexingPrincipalArn, // インデックス操作を行うプリンシパルのARN
            ],
          },
        ]),
      },
    );

    const aossIndexName = cdk.aws_ssm.StringParameter.valueForStringParameter(
      this,
      "AossIndexName",
    );
    const ragEnabled = cdk.aws_ssm.StringParameter.valueForStringParameter(
      this,
      "RagEnabled",
    );
    const slackBotToken = cdk.aws_ssm.StringParameter.valueForStringParameter(
      this,
      "SlackBotToken",
    );
    const slackSignSecret = cdk.aws_ssm.StringParameter.valueForStringParameter(
      this,
      "SlackSignSecret",
    );

    const serverService = new apprunner.Service(this, "ServerService", {
      source: apprunner.Source.fromAsset({
        imageConfiguration: {
          port: 3000,
          environmentVariables: {
            AOSS_ENDPOINT_URL: knowledgeBaseCollection.attrCollectionEndpoint,
            AOSS_INDEX_NAME: aossIndexName,
            RAG_ENABLED: ragEnabled,
            SLACK_BOT_TOKEN: slackBotToken,
            SLACK_SIGNING_SECRET: slackSignSecret,
          },
        },
        asset: new cdk.aws_ecr_assets.DockerImageAsset(this, "ImageAsset", {
          directory: "../server",
          platform: cdk.aws_ecr_assets.Platform.LINUX_AMD64,
        }),
      }),
      cpu: apprunner.Cpu.ONE_VCPU,
      memory: apprunner.Memory.TWO_GB,
      healthCheck: apprunner.HealthCheck.tcp({}),
      instanceRole: serverServiceInstanceRole,
      autoDeploymentsEnabled: true,
    });
    serverService.addToRolePolicy(
      new cdk.aws_iam.PolicyStatement({
        effect: cdk.aws_iam.Effect.ALLOW,
        actions: [
          "bedrock:InvokeModel",
          "bedrock:InvokeModelWithResponseStream",
          "aoss:*",
        ],
        resources: ["*"],
      }),
    );
  }
}

OpenSearch Serverlessコレクションの作成に関しては以下のブログで詳しく説明しています。

OpenSearch ServerlessをCDKで構築してみた | DevelopersIO

このCDKスタックをデプロイするには以下のSSMパラメーターが必要なので、デプロイ前にパラメーターを作成しておいてください。

  • AossIndexingPrincipalArn: OpenSearch Serverlessコレクションのインデックス操作を行うプリンシパルのARN(インデックス操作に関しては後述)
  • AossIndexName: RAGで参照するOpenSearch Serverlessコレクションのインデックス名
  • RagEnabled: RAGを有効化するかどうかを表すフラグ(true or false を設定する)
  • SlackBotToken: Bot User OAuth Tokenの値
  • SlackSignSecret: Signing Secretの値

デプロイが完了したら、Slack AppのEvent Subscriptions > Enable EventsのRequest URLのドメインをApp Runnerサービスのデフォルトドメインに変更してください。 このURLのパス部分は/slack/eventsを指定する必要がある点にご注意ください。

ドキュメントをインデックスする

RAGで参照するドキュメントをOpenSearch Serverlessコレクションのインデックスに追加するために以下のスクリプトを作成しました。

import os

import boto3
from dotenv import load_dotenv
from langchain_aws import BedrockEmbeddings
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_text_splitters import RecursiveCharacterTextSplitter
from opensearchpy import RequestsHttpConnection
from requests_aws4auth import AWS4Auth  # type: ignore

load_dotenv()

print("Loading started...")

loader = WebBaseLoader(
    web_paths=["https://classmethod.jp/services/generative-ai/ai-starter/"]
)
docs = loader.load()

print("Loading completed!")


print("Splitting started...")

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
embedding = BedrockEmbeddings(
    model_id="amazon.titan-embed-text-v2:0", region_name="us-east-1", client=None
)

print("Splitting completed!")


print("Indexing started...")

credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
    refreshable_credentials=credentials,
    region="ap-northeast-1",
    service="aoss",
)

vectorstore = OpenSearchVectorSearch.from_documents(
    documents=splits,
    embedding=embedding,
    opensearch_url=os.environ["AOSS_ENDPOINT_URL"],
    index_name=os.environ["AOSS_INDEX_NAME"],
    http_auth=aws_auth,
    timeout=300,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    engine="faiss",
)

print("Indexing completed!")

このスクリプトの処理概要は次のとおりです。

  1. 生成AI環境構築サービス「AI-Starter」 | サービス | クラスメソッド株式会社 のページ内容を取得する
  2. 1.の内容をチャンク分割する
  3. 2.の埋め込みを作成する
  4. 3.をOpenSearch Serverlessコレクションのインデックスに追加する

このスクリプトでは環境変数を読み込むため、スクリプトと同一ディレクトリに.env という名前で以下のようなファイルを作成しておいてください。

AOSS_ENDPOINT_URL='https://xxxxxxxxxxxxxxxxxxxx.ap-northeast-1.aoss.amazonaws.com' # OpenSearch ServerlessコレクションのエンドポイントURL
AOSS_INDEX_NAME='knowledge-base-index-name' # OpenSearch Serverlessコレクションのインデックス名

SSMパラメーター AossIndexingPrincipalArn で指定したプリンシパルの認証情報をセット(IAMロールをAssume Roleするなど)した状態で、以下のようにスクリプトを実行することでドキュメント内容がインデックスされます。 環境変数 AOSS_INDEX_NAME で指定したインデックスが未作成の場合は自動的に作成されます。

❯ poetry -C server run python server/scripts/index_documents.py
Loading started...
Loading completed!
Splitting started...
Splitting completed!
Indexing started...
Indexing completed!

動かしてみる

さて、これで準備が整ったので動かしてみましょう。 まずは環境変数 RAG_ENABLEDfalse を設定して、素のLLMに質問してみます。

AI-Sarter が何か聞いてみたのですが、AI学習プラットフォームとの回答が返ってきました。 これはハルシネーションです。

次に、環境変数 RAG_ENABLEDtrue を設定して、RAGを利用してみます。

先ほどと同じ質問をしてみたところ、今度は正しくAI-Starterについて説明してくれました!

つまずきポイント

LangChainを実行した際に以下のようなエラーが発生しました。

TypeError: ForwardRef._evaluate() missing 1 required keyword-only argument: 'recursive_guard'

どうやら、LangChainが内部で利用している Pydantic がPython v3.12.4で行われた変更に対して追従できてないことが原因のようでした。

python - langchain_community & langchain packages giving error: Missing 1 required keyword-only argument: 'recursive_guard' - Stack Overflow

Python v3.12.3を利用すればこのエラーを回避できるとのことだったので、Dockerfileのベースイメージを以下のように変更することでエラーが解消しました。

@@ -1,4 +1,6 @@
-FROM python:3.12-slim
+# NOTE: Python v3.12.4を利用するとlangchainをインポートする際にエラーが発生するため、v3.12.3を利用する
+# ref: https://stackoverflow.com/questions/78593700/langchain-community-langchain-packages-giving-error-missing-1-required-keywor
+FROM python:3.12.3-slim

 WORKDIR /app

参考