feat(ai_chat): 添加Markdown渲染和图片生成功能

支持将AI回复的Markdown内容转换为HTML并渲染为美观的图片格式返回,提升聊天体验
```

```msg
feat(knowledge_base): 扩展知识库支持个人和群聊独立记忆

- 新增个人知识库功能,支持独立记忆
- 添加清除个人/群聊记忆命令
- 优化知识搜索逻辑,优先搜索个人记忆
- 更新插件帮助信息
This commit is contained in:
2026-03-24 15:17:50 +08:00
parent fbeceb4dc9
commit fde808b819
3 changed files with 384 additions and 26 deletions

View File

@@ -4,9 +4,12 @@ AI 聊天插件,支持向量数据库记忆功能
"""
import time
import uuid
import markdown
from core.managers.command_manager import matcher
from models.events.message import GroupMessageEvent, PrivateMessageEvent
from models.message import MessageSegment
from core.managers.vectordb_manager import vectordb_manager
from core.managers.image_manager import image_manager
from core.utils.logger import ModuleLogger
from core.config_loader import global_config
@@ -113,7 +116,38 @@ async def chat_command(event: GroupMessageEvent | PrivateMessageEvent, args: lis
user_message = " ".join(args)
user_id = event.user_id
group_id = getattr(event, 'group_id', 0)
user_name = event.sender.nickname or event.sender.card or str(user_id)
await event.reply("正在思考中...")
reply = await get_ai_response(user_id, group_id, user_message)
await event.reply(reply)
# 将 Markdown 转换为 HTML
try:
# 启用扩展以支持代码块、表格等
html_reply = markdown.markdown(reply, extensions=['fenced_code', 'tables', 'nl2br'])
except Exception as e:
logger.error(f"Markdown 转换失败: {e}")
html_reply = reply.replace('\n', '<br>')
# 渲染图片
try:
template_data = {
"user_name": user_name,
"user_message": user_message,
"ai_reply": html_reply
}
base64_img = await image_manager.render_template_to_base64(
template_name="ai_chat.html",
data=template_data,
output_name=f"chat_{user_id}_{int(time.time())}.png",
image_type="png"
)
if base64_img:
await event.reply(MessageSegment.image(f"base64://{base64_img}"))
else:
await event.reply("图片生成失败,返回文本:\n" + reply)
except Exception as e:
logger.error(f"渲染聊天图片失败: {e}")
await event.reply("图片生成失败,返回文本:\n" + reply)

View File

@@ -5,7 +5,7 @@
import time
import uuid
from core.managers.command_manager import matcher
from models.events.message import GroupMessageEvent
from models.events.message import GroupMessageEvent, PrivateMessageEvent
from core.managers.vectordb_manager import vectordb_manager
from core.utils.logger import ModuleLogger
from core.permission import Permission
@@ -13,24 +13,62 @@ from core.permission import Permission
logger = ModuleLogger("GroupKnowledgeBase")
__plugin_meta__ = {
"name": "群聊知识库",
"description": "基于向量数据库的群聊知识库,支持语义检索",
"usage": "/kb_add <问题> <答案> - 添加知识库条目 (仅管理员)\n/kb_search <关键词> - 搜索知识库"
"name": "知识库",
"description": "基于向量数据库的知识库,支持个人和群聊独立记忆",
"usage": "/kb_add <问题> <答案> - 添加个人知识库\n/kb_add_group <问题> <答案> - 添加群聊知识库 (仅管理员)\n/kb_search <关键词> - 搜索知识库\n/kb_remove_person - 清除个人所有记忆\n/kb_remove_group - 清除群聊所有记忆 (仅管理员)"
}
@matcher.command("kb_add", permission=Permission.ADMIN)
async def kb_add_command(event: GroupMessageEvent, args: list[str]):
"""添加知识库条目"""
@matcher.command("kb_add")
async def kb_add_person_command(event: GroupMessageEvent | PrivateMessageEvent, args: list[str]):
"""添加个人知识库条目"""
if len(args) < 2:
await event.reply("用法: /kb_add <问题> <答案>")
return
question = args[0]
answer = " ".join(args[1:])
user_id = event.user_id
try:
collection_name = f"knowledge_base_user_{user_id}"
doc_id = str(uuid.uuid4())
text_to_embed = f"问题: {question}\n答案: {answer}"
metadata = {
"user_id": user_id,
"question": question,
"answer": answer,
"timestamp": int(time.time())
}
success = vectordb_manager.add_texts(
collection_name=collection_name,
texts=[text_to_embed],
metadatas=[metadata],
ids=[doc_id]
)
if success:
await event.reply(f"个人知识库条目添加成功!\n问题: {question}")
else:
await event.reply("个人知识库条目添加失败,请查看日志。")
except Exception as e:
logger.error(f"添加个人知识库失败: {e}")
await event.reply(f"添加失败: {str(e)}")
@matcher.command("kb_add_group", permission=Permission.ADMIN)
async def kb_add_group_command(event: GroupMessageEvent, args: list[str]):
"""添加群聊知识库条目"""
if len(args) < 2:
await event.reply("用法: /kb_add_group <问题> <答案>")
return
question = args[0]
answer = " ".join(args[1:])
group_id = event.group_id
try:
collection_name = f"knowledge_base_{group_id}"
collection_name = f"knowledge_base_group_{group_id}"
doc_id = str(uuid.uuid4())
text_to_embed = f"问题: {question}\n答案: {answer}"
@@ -50,43 +88,109 @@ async def kb_add_command(event: GroupMessageEvent, args: list[str]):
)
if success:
await event.reply(f"知识库条目添加成功!\n问题: {question}")
await event.reply(f"群聊知识库条目添加成功!\n问题: {question}")
else:
await event.reply("知识库条目添加失败,请查看日志。")
await event.reply("群聊知识库条目添加失败,请查看日志。")
except Exception as e:
logger.error(f"添加知识库失败: {e}")
logger.error(f"添加群聊知识库失败: {e}")
await event.reply(f"添加失败: {str(e)}")
@matcher.command("kb_search")
async def kb_search_command(event: GroupMessageEvent, args: list[str]):
"""搜索知识库条目"""
async def kb_search_command(event: GroupMessageEvent | PrivateMessageEvent, args: list[str]):
"""搜索知识库条目(优先搜索个人,再搜索群聊)"""
if not args:
await event.reply("用法: /kb_search <关键词>")
return
query = " ".join(args)
group_id = event.group_id
user_id = event.user_id
group_id = getattr(event, 'group_id', None)
try:
collection_name = f"knowledge_base_{group_id}"
reply_msg = f"为您找到以下相关知识:\n"
found = False
results = vectordb_manager.query_texts(
collection_name=collection_name,
# 1. 搜索个人知识库
person_collection = f"knowledge_base_user_{user_id}"
person_results = vectordb_manager.query_texts(
collection_name=person_collection,
query_texts=[query],
n_results=3
n_results=2
)
if not results or not results.get("documents") or not results["documents"][0]:
if person_results and person_results.get("documents") and person_results["documents"][0]:
reply_msg += "\n【个人记忆】"
for i, metadata in enumerate(person_results["metadatas"][0], 1):
question = metadata.get("question", "")
answer = metadata.get("answer", "")
reply_msg += f"\n{i}. Q: {question}\n A: {answer}"
found = True
# 2. 搜索群聊知识库
if group_id:
group_collection = f"knowledge_base_group_{group_id}"
group_results = vectordb_manager.query_texts(
collection_name=group_collection,
query_texts=[query],
n_results=2
)
if group_results and group_results.get("documents") and group_results["documents"][0]:
reply_msg += "\n\n【群聊记忆】"
for i, metadata in enumerate(group_results["metadatas"][0], 1):
question = metadata.get("question", "")
answer = metadata.get("answer", "")
reply_msg += f"\n{i}. Q: {question}\n A: {answer}"
found = True
if not found:
await event.reply("未找到相关的知识库条目。")
return
reply_msg = f"为您找到以下相关知识:\n"
for i, metadata in enumerate(results["metadatas"][0], 1):
question = metadata.get("question", "")
answer = metadata.get("answer", "")
reply_msg += f"\n{i}. Q: {question}\n A: {answer}"
await event.reply(reply_msg)
except Exception as e:
logger.error(f"搜索知识库失败: {e}")
await event.reply(f"搜索失败: {str(e)}")
@matcher.command("kb_remove_person")
async def kb_remove_person_command(event: GroupMessageEvent | PrivateMessageEvent):
"""清除个人所有记忆"""
user_id = event.user_id
collection_name = f"knowledge_base_user_{user_id}"
try:
# ChromaDB 不支持直接删除整个 collection 的所有数据,最简单的方法是删除 collection
if vectordb_manager._client:
try:
vectordb_manager._client.delete_collection(collection_name)
if collection_name in vectordb_manager._collections:
del vectordb_manager._collections[collection_name]
await event.reply("已成功清除您的所有个人记忆。")
except ValueError:
await event.reply("您还没有任何个人记忆。")
else:
await event.reply("向量数据库未初始化。")
except Exception as e:
logger.error(f"清除个人记忆失败: {e}")
await event.reply(f"清除失败: {str(e)}")
@matcher.command("kb_remove_group", permission=Permission.ADMIN)
async def kb_remove_group_command(event: GroupMessageEvent):
"""清除群聊所有记忆"""
group_id = event.group_id
collection_name = f"knowledge_base_group_{group_id}"
try:
if vectordb_manager._client:
try:
vectordb_manager._client.delete_collection(collection_name)
if collection_name in vectordb_manager._collections:
del vectordb_manager._collections[collection_name]
await event.reply("已成功清除本群的所有群聊记忆。")
except ValueError:
await event.reply("本群还没有任何群聊记忆。")
else:
await event.reply("向量数据库未初始化。")
except Exception as e:
logger.error(f"清除群聊记忆失败: {e}")
await event.reply(f"清除失败: {str(e)}")