Cloudflare Workers上でLangChainを動かしストリームで回答を返すようにしてみる

2023.05.30

どうも!オペレーション部の西村祐二です。

以前、下記のようなCloudflare Workers上でLangChain動かすブログを書きました。

Cloudflare Workers上でLangChainを動かし指定したWebサイトについて質問できるか試してみた | DevelopersIO

実際に動かしてみると、質問に対する回答がすべて揃ってからデータが返されるため、応答までの時間が気になり、UXとしてはあまり良くないかなと思っていました。

LangChainのドキュメントをみているとChatGPTのように回答を逐次返す機能があったので、以前作ったWorkersを修正してストリームで回答を返せるか試してしみました。

Events / Callbacks | ?️? Langchain

成果物

修正して動作確認したものが下記になります。想定通り、待ち時間が少なく、逐次回答が表示されているのがわかるかと思います。

レスポンスがUint8Arrayで返ってくる都合上日本語の場合は文字化けしてしまうので英語で回答するようにしています。

実装

今回、実装したCloudflare Workersのコードは下記の様になっています。

ストリームで回答を返す処理については解説のコメントも記載しています。

処理の内容としては

  • LangChainとCheerioWebBaseLoaderを使って弊社クラスメソッドのカルチャーのページを読み込んで、このWebページについて質問できるようにする

という処理を行っています。

src/workers.js

import { CheerioWebBaseLoader } from 'langchain/document_loaders/web/cheerio';
import { OpenAIEmbeddings } from 'langchain/embeddings/openai';
import { MemoryVectorStore } from 'langchain/vectorstores/memory';
import { OpenAI } from 'langchain/llms/openai';
import { RetrievalQAChain } from 'langchain/chains';

export default {
  async fetch(request, env, ctx) {
    const { searchParams } = new URL(request.url); // URLからパラメータを取得します
    const question = searchParams.get('question') ?? 'この記事は何ですか?CLPとはなんですか?'; // questionの情報を取得

    // TransformStream
    // https://developer.mozilla.org/ja/docs/Web/API/TransformStream
    // TransformStreamは、読み取り可能なストリームと書き込み可能なストリームの間のパイプラインを表します
    // これは、読み取り可能なストリームから読み取られたデータを変換し、書き込み可能なストリームに書き込むトランスフォームストリームです
    const { readable, writable } = new TransformStream();

    // TransformStreamのwritableに書き込むと、readableから読み取れるようになるため
    // getWriterを使って、TransformStreamの書き込み可能なストリームに書き込むためのTransformStreamDefaultWriterを返してもらいます
    const writer = writable.getWriter();

    // waitUntil
    // https://developer.mozilla.org/ja/docs/Web/API/FetchEvent/waitUntil
    // waitUntil()メソッドは、FetchEventが終了するまで、イベントの処理を遅延させることができます。
    // ここでは、getChatResponse()の処理が終了するまで、イベントの処理を遅延させます。
    ctx.waitUntil(getChatResponse(writer, env, question));

    // writablerに書き込むと、readableにデータが流れてくるので、fetchの戻り値として返されるResponseのbodyに指定します。
    // つまり、readableにデータが流れてくると、クライアントにデータが送信されます。
    return new Response(readable, {
      status: 200,
      headers: {
        // Content-Typeがtext/event-stream
        // https://developer.mozilla.org/ja/docs/Web/API/Server-sent_events/Using_server-sent_events
        // SSEでは、Content-Typeはtext/event-streamである必要があります。
        'Content-Type': 'text/event-stream',
        // no-cache
        // https://developer.mozilla.org/ja/docs/Web/HTTP/Headers/Cache-Control
        // Cache-Controlヘッダーは、リクエスト/レスポンスチェーン全体にわたってキャッシュの動作を変更するために使用されます。
        // ここでは、no-cacheが指定されているので、キャッシュを使用しない挙動になります。
        // 
        // no-transform
        // no-transformは、キャッシュまたはプロキシが、レスポンスの本文を変更しないことを指定します
        // つまり、レスポンスの本文を変更しない挙動になります。
        'Cache-Control': 'no-cache, no-transform',
        // keep-alive
        // https://developer.mozilla.org/ja/docs/Web/HTTP/Headers/Connection
        // Connectionヘッダーは、特定の接続に関するオプションを指定するために使用されます。
        // ここでは、keep-aliveが指定されているので、TCP接続を維持する挙動になります。
        Connection: 'keep-alive',
      },
    });
  },
};

async function getChatResponse(writer, env, query) {
  // TextEncoder
  // https://developer.mozilla.org/ja/docs/Web/API/TextEncoder
  // TextEncoderインターフェイスは、UTF-8などの文字エンコーディングを使用して文字列をバイト配列に変換するための機能を提供します。
  const encoder = new TextEncoder();
  writer.write(encoder.encode('[START]\n'));

  // 質問したいWebページのURLを指定します。
  const url = 'https://classmethod.jp/english/leadership-principles/';
  const loader = new CheerioWebBaseLoader(url);
  const docs = await loader.load();
  const store = await MemoryVectorStore.fromDocuments(docs, new OpenAIEmbeddings({ openAIApiKey: env.OPENAI_API_KEY }));


  // OpenAIのAPIを使用して、文章を生成するためにOpenAIのLLMを作成します。
  const model = new OpenAI({
    openAIApiKey: env.OPENAI_API_KEY,
    // streamingをtrueにすると、文章を生成するたびにCallbackを実行する挙動になります。
    streaming: true,
    // 文章を生成するたびに、handleLLMNewTokenコールバックを呼び出します。
    // 渡ってきた回答の一部(token)をwriterに書き込みます。
    // readableにデータが流れるため、クライアントにデータが送信されます。
    callbacks: [
      {
        async handleLLMNewToken(token) {
          console.log(encoder.encode(token));
          await writer.write(encoder.encode(token));
        },
      },
    ],
  });


  const chain = RetrievalQAChain.fromLLM(model, store.asRetriever());

  await chain.call({
    query,
  });

  // 回答が終わったことがわかるように最後に[DONE]を書き込みます。
  writer.write(encoder.encode('\n[DONE]'));
  return writer.close();
}

これで、Cloudflare Workers上にデプロイし、払い出されてURLに?question=tell me CLPなどのパラメータを付与するとはじめに掲載した挙動になります。

ただ、日本語は文字化してしまうので、Next.jsやRemixなどのクライアントサイドでレスポンスのデータをTextDecoderしてあげる必要があります。

こちらについてはまた別のブログでまとめたいと思います。

さいごに

Cloudflare Workers上でLangChainを動かしストリームで回答を返すようにしてみました。

調べてもLangChain.jsを使った情報があまり出てこなかったのでブログにまとめてみました。

誰かの参考になれば幸いです。