はじめに
社内のバックオフィス業務に関する情報(ナレッジ)をGPTによるチャットインターフェイスで検索できるようにしたいという要望があり、PoCの実装を行いました。今回の課題として、ナレッジとなるデータソースが分散していたり、場合によっては人手でナレッジを追加することが想定されました。そのため、ナレッジとなるデータソースの管理を行うCMSがあると良いと考えました。
Contentfulとは
Contentfulについてご存じの方は多いと思いますが、簡単に紹介します。ContentfulはヘッドレスCMSと呼ばれるサービスで、Webサイトやモバイルアプリなどに配信するコンテンツを管理するためのサービスです。コンテンツのスキーマを定義し、そのコンテンツを登録・編集・削除することができます。また、コンテンツの取得や管理を行うAPIや、コンテンツのイベントを通知するWebhook機能など、デベロッパーフレンドリーなサービスです。
参考:
今回の用途としては、機能的にも容量的にも無料版でも十分であると判断し、Contentfulを利用することにしました。(無料枠が大きいのは嬉しいですね。)
全体像
先に全体像を示します。中央付近にContentfulがあり、データは左側のデータソースから右側のベクトルデータベースへと流れます。Contentfulでスキーマを定義することで、どのようなデータソースであってもContentfulをインターフェイスとしてデータを蓄積することができます。もちろん、Contentfulのコンソールから直接データを管理することもできます。
実装
一例として、Notionの情報をContentfulに同期してベクトルデータベースに同期する仕組みを紹介します。
Notionにはイベントを通知する機能がないため、定期的に情報をクロールする必要があります。前回のクロールの結果と最新の結果を突合して差分をベクトルデータベースに反映するわけですが、ベクトルデータベースは一般的なRDSのような検索クエリや操作を十分に備えていないものもあり、この点でも中間データベースとしてContentfulがあることが役立ちました。
Modelの定義
Knowledgeという Content modelを以下のように定義しました。
databaseId
: データベースのレコードを一意に特定するid。更新不可。DataLoader側で生成する。externalId
: データソースを一意に特定するid。更新不可。前方一致などで絞り込みができるようにする。区切り文字として/
を使う。(例:database_id/page_id
)question
: 質問文answer
: 回答文sourceType
: データソースの種類(例:notion
)sourceUrl
: データソースのURL
Data Loaderの実装
データソースからContentfulにデータを取り込む機能をData Loaderと呼んでいます。この実装では、Notionのデータベース単位で、データベースにある各ページをKnowledgeの1レコードとして登録します。
Notionのクロールについては、Llama HubにNotion LoaderというLoaderが公開されていたので、このソースコードを参考にしました。
この方法でデータを取得した後、Notion側のタイムスタンプとContentful側のタイムスタンプを比較してデータを同期します。また、Notionから削除されてしまったページの検出については、データベース単位でContentfulにある全てのデータとNotionのデータを比較して、NotionにないデータをContentfulから削除することで対応しています。
from datetime import datetime, timezone
import os
from notion_client import Client
import contentful_management
NOTION_DATABASE_IDS = [
# ここにクロール対象のDatabaseのIDを設定する
]
notion = Client(auth=os.environ["NOTION_SECRET"])
space_id = os.environ.get("CONTENTFUL_SPACE_ID")
environment_id = os.environ.get("CONTENTFUL_ENVIRONMENT_ID")
contentful_client = contentful_management.Client(
os.environ.get("CONTENTFUL_MANAGEMENT_API_TOKEN"), default_locale='ja-JP')
def read_block(block_id: str, num_tabs: int = 0) -> str:
print(f"Page reading... {block_id}, {num_tabs}")
"""Read a block."""
done = False
result_lines_arr = []
cur_block_id = block_id
while not done:
data = notion.blocks.children.list(
cur_block_id,
page_size=50,
)
for result in data["results"]:
result_type = result["type"]
# AI BlcokはBlock APIに対応していないのでスキップする
if result_type == "unsupported":
print(f"Skipping unsupported block.")
continue
result_obj = result[result_type]
cur_result_text_arr = []
if "rich_text" in result_obj:
for rich_text in result_obj["rich_text"]:
# skip if doesn't have text object
if "text" in rich_text:
text = rich_text["text"]["content"]
prefix = "\t" * num_tabs
cur_result_text_arr.append(prefix + text)
result_block_id = result["id"]
has_children = result["has_children"]
if has_children:
children_text = read_block(
result_block_id, num_tabs=num_tabs + 1
)
cur_result_text_arr.append(children_text)
cur_result_text = "\n".join(cur_result_text_arr)
result_lines_arr.append(cur_result_text)
if data["next_cursor"] is None:
done = True
break
else:
cur_block_id = data["next_cursor"]
result_lines = "\n".join(result_lines_arr)
return result_lines
def sync_entries(database_id: str) -> None:
existing_entries = contentful_client.entries(
space_id, environment_id).all({
'content_type': 'knowledge',
'fields.sourceType[match]': 'notion',
'fields.externalId[match]': database_id
})
print(f"database_id: {database_id} ({len(existing_entries)} pages)")
results = notion.databases.query(database_id=database_id)
for page in results["results"]:
# 名前が設定されていない場合はスキップ
if len(page["properties"]["名前"]['title']) == 0:
print(
f"Skipping page because it doesn't have a title. {page['url']}")
continue
page_id = page["id"]
external_id = f'{database_id}/{page_id}'
question = page["properties"]["名前"]["title"][0]["plain_text"]
source_url = page["url"]
# existing_entriesから、idがpage_idと一致するものを取り出す
existing_entry = next(
filter(lambda e: e.fields()['external_id'] == external_id, existing_entries), None)
# existing_entriesから一致したレコードを除外する
existing_entries = list(
filter(lambda e: e.fields()['external_id'] != external_id, existing_entries))
if existing_entry is None:
try:
# Create
answer = read_block(page_id)
entry = contentful_client.entries(space_id, environment_id).create(
None, {
'content_type_id': 'knowledge',
'fields': {
'databaseId': {
'ja-JP': page_id # UUIDであれば何でも良い
},
'externalId': {
'ja-JP': external_id
},
'question': {
'ja-JP': question
},
'answer': {
'ja-JP': answer
},
'sourceType': {
'ja-JP': 'notion'
},
'sourceUrl': {
'ja-JP': source_url
}
}
}
)
entry.publish()
print(f'Created entry: {entry.id}')
except Exception as e:
print(f'Failed to create entry: {e}')
else:
try:
last_edited_time = datetime.strptime(
page['last_edited_time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
# Update
# last_edited_time が existing_entry のUpdatedAtよりも新しい場合のみ更新する
if existing_entry.sys['updated_at'] >= last_edited_time:
print(f'Skipped entry: {existing_entry.id}')
continue
answer = read_block(page_id)
existing_entry.question = question
existing_entry.answer = answer
existing_entry.save()
print(f'Updated entry: {existing_entry.id}')
except Exception as e:
print(f'Failed to update entry: {e}')
# existing_entriesに残ったものをUnpublishする
for entry in existing_entries:
try:
entry.unpublish()
# すぐにdeleteしてしまうと、ベクトルDB側を削除しようとしたときに情報が無くなってしまうので、削除はしない
# entry.delete()
print(f'Deleted entry: {entry.id}')
except Exception as e:
print(f'Failed to delete entry: {e}')
if __name__ == '__main__':
for database_id in NOTION_DATABASE_IDS:
sync_entries(database_id)
Contentfulからベクトルデータベースへの同期
ContentfulのコンテンツがPublish/Unpublishedされたイベントをトリガーとして、WebhookでCloud Functionsに情報を送信します。トリガーはこのように細かく条件を指定することができます。
ベクトルデータベースとしてはWeaviateを使っていますが、他のベクトルデータベースに置き換えても問題ないような設計にしています。
import functions_framework
import weaviate
import os
import contentful_management
space_id = os.environ.get("CONTENTFUL_SPACE_ID")
environment_id = os.environ.get("CONTENTFUL_ENVIRONMENT_ID")
contentful_client = contentful_management.Client(
os.environ.get("CONTENTFUL_MANAGEMENT_API_TOKEN"))
auth_config = weaviate.AuthApiKey(
api_key=os.environ.get("WEAVIATE_API_KEY", "")) # ローカル環境は未認証のため、空文字列を渡す
weaviate_client = weaviate.Client(
url=os.environ.get("WEAVIATE_URL"),
auth_client_secret=auth_config,
additional_headers={
"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")
}
)
def delete_data_object(entry_id: str) -> None:
try:
# Unpublishイベントの場合、dataにfieldsが含まれていないので、Entryを取得する
res = contentful_client.entries(
space_id, environment_id).find(entry_id)
weaviate_client.data_object.delete(
res.fields('ja-JP')['database_id'],
class_name="Knowledge"
)
print(f'deleted: entry_id: {entry_id}')
except Exception as e:
print(f'error: {e}')
@functions_framework.http
def sync_database(request):
data = request.get_json()
entry_id = data['sys']['id']
# Unpublish or Archiveされたとき
if data['sys']['type'] == 'DeletedEntry':
delete_data_object(entry_id)
return 'OK'
# Publishedされたとき
fields = data['fields']
# すでにWeaviateに存在するか確認する
exists = weaviate_client.data_object.exists(
fields['databaseId']['ja-JP'],
class_name="Knowledge"
)
new_data_object = {
"question": fields['question']['ja-JP'],
"answer": fields['answer']['ja-JP'],
"url": fields['sourceUrl']['ja-JP'],
}
if exists:
weaviate_client.data_object.replace(
new_data_object,
"Knowledge",
fields['databaseId']['ja-JP']
)
print(f'replaced: entry_id: {entry_id}')
else:
weaviate_client.data_object.create(
new_data_object,
"Knowledge",
uuid=fields['databaseId']['ja-JP']
)
print(f'created: entry_id: {entry_id}')
return 'OK'
おわりに
NotionをデータソースとしてContentfulに登録して、ベクトルデータベースへと同期する仕組みを紹介しました。
本当はリポジトリごと公開しても良かったのですが、手間とセキュリティの観点から断片的なソースコードの公開にとどめました。もし社内で活用したいという人がいましたらお声がけください。