Merge pull request #69 from Fairy-Oracle-Sanctuary/dev

Dev
This commit is contained in:
镀铬酸钾
2026-03-15 13:49:37 +08:00
committed by GitHub
25 changed files with 2457 additions and 116 deletions

173
adapters/discord_adapter.py Normal file
View File

@@ -0,0 +1,173 @@
# -*- coding: utf-8 -*-
"""
Discord 适配器 (Discord Adapter)
此模块负责与 Discord API 建立连接,接收 Discord 消息,
并将其转换为通用数据模型 (Universal Data Models)
同时提供将通用消息段发送回 Discord 的能力。
"""
import asyncio
import json
import os
import io
import requests
from typing import Union, List, Optional
try:
import discord
DISCORD_AVAILABLE = True
except ImportError:
DISCORD_AVAILABLE = False
from core.utils.logger import ModuleLogger
from .router import DiscordToOneBotConverter
from core.managers.redis_manager import redis_manager
from core.config_loader import global_config
class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
"""
Discord 客户端适配器。
继承自 discord.Client负责处理 Discord 的底层事件。
"""
def __init__(self, token: str):
if not DISCORD_AVAILABLE:
raise ImportError("discord.py 未安装,请运行 `pip install discord.py`")
# 必须声明 Intents否则无法读取消息内容
intents = discord.Intents.default()
intents.message_content = True
# 检查是否配置了代理
self.proxy = None
self.proxy_type = "http"
if global_config.discord.proxy:
self.proxy = global_config.discord.proxy
self.proxy_type = global_config.discord.proxy_type or "http"
super().__init__(intents=intents)
self.token = token
self.logger = ModuleLogger("DiscordAdapter")
self.send_channel = None
async def on_ready(self):
"""当 Bot 成功连接到 Discord 时触发"""
self.logger.success(f"Discord Bot 已登录: {self.user} (ID: {self.user.id})")
# 启动 Redis 订阅以处理跨平台消息
asyncio.create_task(self.start_redis_subscription())
async def on_message(self, message: 'discord.Message'):
"""当收到 Discord 消息时触发"""
# 忽略机器人自己的消息
if message.author.bot:
return
self.logger.info(f"[Discord 消息] {message.author}: {message.content}")
# 1. 将 discord.Message 伪装成 OneBot 事件模型
# 2. 触发业务逻辑
# 将伪装后的事件丢给现有的命令管理器 (matcher)
from core.managers.command_manager import matcher
# matcher.handle_event 需要 bot 实例和 event 实例
# 我们在 create_mock_event 中已经注入了一个假的 bot 对象
try:
mock_event = DiscordToOneBotConverter.create_mock_event(message, self)
await matcher.handle_event(mock_event.bot, mock_event)
except Exception as e:
self.logger.error(f"处理 Discord 消息时发生异常: {e}")
async def start_redis_subscription(self):
"""启动 Redis 订阅以处理跨平台消息发送"""
if redis_manager.redis is None:
self.logger.warning("[DiscordAdapter] Redis 未初始化,跳过订阅")
return
try:
channel_name = "neobot_discord_send"
pubsub = redis_manager.redis.pubsub()
await pubsub.subscribe(channel_name)
self.logger.success(f"[DiscordAdapter] 已订阅 Redis 频道: {channel_name}")
async for message in pubsub.listen():
if message["type"] == "message":
try:
data = json.loads(message["data"])
if data.get("type") == "send_message":
await self.handle_send_message(data)
except json.JSONDecodeError as e:
self.logger.error(f"[DiscordAdapter] 解析 Redis 消息失败: {e}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] 处理 Redis 消息失败: {e}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] Redis 订阅异常: {e}")
async def handle_send_message(self, data: dict):
"""处理来自 Redis 的消息发送请求"""
try:
channel_id = data.get("channel_id")
content = data.get("content", "")
attachments = data.get("attachments", [])
if channel_id is None:
self.logger.error("[DiscordAdapter] 缺少 channel_id")
return
channel = self.get_channel(channel_id)
if channel is None:
self.logger.error(f"[DiscordAdapter] 未找到频道: {channel_id}")
return
self.logger.info(f"[DiscordAdapter] 正在发送消息到频道 {channel_id}")
# 发送内容和附件(合并为一条消息)
if content or attachments:
await channel.send(content=content, files=[discord.File(fp=io.BytesIO(requests.get(attachment_url).content), filename=os.path.basename(attachment_url)) for attachment_url in attachments if attachment_url.startswith('http')] if attachments else None)
self.logger.success(f"[DiscordAdapter] 消息已发送到频道 {channel_id}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] 发送消息失败: {e}")
async def start_client(self):
"""启动 Discord 客户端(非阻塞方式)"""
if not DISCORD_AVAILABLE:
self.logger.error("无法启动 Discord 客户端discord.py 未安装")
return
try:
self.logger.info("正在连接 Discord...")
# 如果配置了代理,使用自定义的 ClientSession
if self.proxy:
import aiohttp
proxy_url = self.proxy
self.logger.info(f"[DiscordAdapter] 使用代理: {proxy_url} (类型: {self.proxy_type})")
connector = aiohttp.TCPConnector()
session = aiohttp.ClientSession(connector=connector)
# discord.py 2.0+ 使用 discord.Client 的 connector 参数
# 但 discord.Client 不直接支持自定义 connector
# 需要使用 discord.AutoShardedClient 或修改内部实现
# 这里我们使用 discord.Client 的 __init__ 传递 connector
# 但 discord.Client 的 __init__ 不支持 connector 参数
# 所以我们需要使用 discord.Client 的 _create_http_client 方法
# 简单方案:使用环境变量设置代理
import os
os.environ["HTTP_PROXY"] = proxy_url
os.environ["HTTPS_PROXY"] = proxy_url
self.logger.info("[DiscordAdapter] 代理已设置,正在连接 Discord...")
await self.start(self.token)
# 清理环境变量
os.environ.pop("HTTP_PROXY", None)
os.environ.pop("HTTPS_PROXY", None)
else:
await self.start(self.token)
except Exception as e:
self.logger.error(f"Discord 连接失败: {e}")

406
adapters/router.py Normal file
View File

@@ -0,0 +1,406 @@
# -*- coding: utf-8 -*-
"""
事件路由与转换器 (Event Router & Converter)
此模块负责在不同平台(如 Discord和 OneBot 业务逻辑之间进行数据转换。
核心目标是:**让现有的 OneBot 插件(如 bili.py在不修改任何代码的情况下能够处理 Discord 消息。**
实现原理:
1. 接收 Discord 消息 (`discord.Message`)。
2. 将其“伪装”成 OneBot 的 `GroupMessageEvent` 或 `PrivateMessageEvent`。
3. 拦截插件调用的 `event.reply()` 方法。
4. 将插件返回的 OneBot `MessageSegment` 转换为 Discord 格式并发送。
"""
import asyncio
from typing import Union, List, Any, Optional
try:
import discord
DISCORD_AVAILABLE = True
except ImportError:
DISCORD_AVAILABLE = False
from models.events.message import GroupMessageEvent, PrivateMessageEvent
from models.message import MessageSegment as OneBotMessageSegment
from models.sender import Sender
from core.utils.logger import ModuleLogger
logger = ModuleLogger("EventRouter")
class DiscordToOneBotConverter:
"""
将 Discord 消息转换为 OneBot 消息事件的转换器。
"""
@staticmethod
def create_mock_event(discord_message: 'discord.Message', adapter: Any) -> Union[GroupMessageEvent, PrivateMessageEvent]:
"""
将 discord.Message 伪装成 OneBot 的 MessageEvent。
Args:
discord_message: 原始的 Discord 消息对象
adapter: DiscordAdapter 实例,用于回调发送消息
Returns:
伪装后的 OneBot 事件对象
"""
# 1. 提取基础信息
user_id = discord_message.author.id
message_id = discord_message.id
# 处理 Discord 的 raw_message
# 如果消息是以 @机器人 开头Discord 的 content 会是 "<@机器人ID> /echo 1"
# 我们需要把前面的 @ 提及去掉,否则命令匹配器 (matcher) 无法识别以 "/" 开头的命令
raw_message = discord_message.content
# 添加附件信息到 raw_message
if discord_message.attachments:
for attachment in discord_message.attachments:
raw_message += f"\n{attachment.url}"
bot_mention = f"<@{adapter.user.id}>"
if raw_message.startswith(bot_mention):
raw_message = raw_message[len(bot_mention):].strip()
# 构造发送者信息
sender = Sender(
user_id=user_id,
nickname=discord_message.author.display_name,
card=getattr(discord_message.author, 'nick', ''), # 群名片
role="member" # 简化处理,默认都是普通成员
)
# 2. 判断是群聊还是私聊
is_private = isinstance(discord_message.channel, discord.DMChannel)
# 构造 message 列表 (将纯文本转换为 MessageSegment)
message_list = [OneBotMessageSegment.text(raw_message)]
import time
current_time = int(time.time())
self_id = adapter.user.id if adapter.user else 0
# 注入 Discord 特定信息(用于跨平台插件识别)
discord_channel_id = discord_message.channel.id if not isinstance(discord_message.channel, discord.DMChannel) else None
discord_username = discord_message.author.name
discord_discriminator = f"#{discord_message.author.discriminator}" if discord_message.author.discriminator != "0" else ""
if is_private:
# 构造私聊事件
event = PrivateMessageEvent(
time=current_time,
self_id=self_id,
message_type="private",
sub_type="friend",
message_id=message_id,
user_id=user_id,
raw_message=raw_message,
message=message_list,
sender=sender
)
else:
# 构造群聊事件
group_id = discord_message.channel.id
event = GroupMessageEvent(
time=current_time,
self_id=self_id,
message_type="group",
sub_type="normal",
message_id=message_id,
user_id=user_id,
group_id=group_id,
raw_message=raw_message,
message=message_list,
sender=sender
)
# 注入 Discord 特定属性(用于跨平台插件识别)
event._is_discord_message = True
event.discord_channel_id = discord_channel_id
event.discord_username = discord_username
event.discord_discriminator = discord_discriminator
# 3. 拦截并重写 reply 方法 (核心魔法)
# 插件调用 event.reply() 时,实际上会执行这个闭包
async def mock_reply(message: Union[str, OneBotMessageSegment, List[OneBotMessageSegment]], auto_escape: bool = False):
await DiscordToOneBotConverter.send_discord_reply(discord_message, message, adapter)
# 覆盖实例方法
event.reply = mock_reply
# 注入一个假的 bot 对象,防止插件调用 event.bot.xxx 时报错
# 这里只提供最基础的属性,如果插件调用了复杂的 API可能会报错
class MockBot:
def __init__(self):
self.self_id = adapter.user.id if adapter.user else 0
async def send(self, event, message, **kwargs):
await DiscordToOneBotConverter.send_discord_reply(discord_message, message, adapter)
async def send_forwarded_messages(self, target, nodes):
"""
模拟发送合并转发消息。
Discord 不支持像 QQ 那样的合并转发,所以我们将其转换为普通消息发送。
"""
content = ""
files = []
for node in nodes:
if node.get("type") == "node":
node_data = node.get("data", {})
node_content = node_data.get("content", [])
# 提取节点中的文本和图片
if isinstance(node_content, str):
# 尝试解析 CQ 码
import re
cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]'
matches = list(re.finditer(cq_pattern, node_content))
if not matches:
content += f"{node_content}\n"
else:
last_end = 0
for match in matches:
if match.start() > last_end:
content += node_content[last_end:match.start()]
cq_type = match.group(1)
cq_params_str = match.group(2) or ""
params = {}
if cq_params_str:
for param in cq_params_str.split(','):
if '=' in param:
k, v = param.split('=', 1)
params[k] = v
if cq_type in ("image", "video"):
file_url = params.get("url") or params.get("file")
if file_url:
if str(file_url).startswith("http"):
content += f"\n{file_url}\n"
elif str(file_url).startswith("base64://"):
import base64
import io
b64_data = str(file_url)[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
elif cq_type == "at":
qq_id = params.get("qq")
if qq_id == "all":
content += "@everyone "
else:
content += f"<@{qq_id}> "
last_end = match.end()
if last_end < len(node_content):
content += node_content[last_end:]
content += "\n"
elif isinstance(node_content, list):
for seg in node_content:
if isinstance(seg, dict):
seg_type = seg.get("type")
seg_data = seg.get("data", {})
if seg_type == "text":
content += seg_data.get("text", "")
elif seg_type == "image" or seg_type == "video":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
if isinstance(file_url, bytes):
import io
try:
files.append(discord.File(fp=io.BytesIO(file_url), filename="image.png"))
except Exception as e:
logger.error(f"解析 bytes 图片失败: {e}")
elif str(file_url).startswith("http"):
content += f"\n{file_url}\n"
elif str(file_url).startswith("base64://") or "data:image" in str(file_url):
import base64
import io
b64_data = str(file_url)
if b64_data.startswith("base64://"):
b64_data = b64_data[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
content += "\n"
try:
if content or files:
await discord_message.channel.send(content=content, files=files if files else None)
except Exception as e:
logger.error(f"发送 Discord 合并转发消息失败: {e}")
event.bot = MockBot()
return event
@staticmethod
async def send_discord_reply(
original_message: 'discord.Message',
message: Union[str, OneBotMessageSegment, List[OneBotMessageSegment]],
adapter: Any
):
"""
将 OneBot 的消息段转换为 Discord 格式并发送。
Args:
original_message: 触发此回复的原始 Discord 消息
message: 插件返回的 OneBot 消息内容 (字符串或 MessageSegment 列表)
adapter: DiscordAdapter 实例
"""
content = ""
files = []
# 统一转换为列表处理
if not isinstance(message, list):
message = [message]
import re
for segment in message:
if isinstance(segment, str):
# 尝试解析 CQ 码
cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]'
matches = list(re.finditer(cq_pattern, segment))
if not matches:
content += segment
continue
last_end = 0
for match in matches:
# 添加 CQ 码之前的纯文本
if match.start() > last_end:
content += segment[last_end:match.start()]
cq_type = match.group(1)
cq_params_str = match.group(2) or ""
# 解析参数
params = {}
if cq_params_str:
for param in cq_params_str.split(','):
if '=' in param:
k, v = param.split('=', 1)
params[k] = v
if cq_type in ("image", "video"):
file_url = params.get("url") or params.get("file")
if file_url:
if str(file_url).startswith("http"):
content += f"\n{file_url}"
elif str(file_url).startswith("base64://") or "data:image" in str(file_url):
import base64
import io
b64_data = str(file_url)
if b64_data.startswith("base64://"):
b64_data = b64_data[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
elif cq_type == "at":
qq_id = params.get("qq")
if qq_id == "all":
content += "@everyone "
else:
content += f"<@{qq_id}> "
last_end = match.end()
# 添加最后一个 CQ 码之后的纯文本
if last_end < len(segment):
content += segment[last_end:]
elif isinstance(segment, OneBotMessageSegment):
# 解析 OneBot 的 MessageSegment
seg_type = segment.type
seg_data = segment.data
if seg_type == "text":
content += seg_data.get("text", "")
elif seg_type == "image" or seg_type == "video":
# OneBot 的图片/视频通常有 file (URL或本地路径) 或 url 字段
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
# 处理 bytes 类型
if isinstance(file_url, bytes):
import io
try:
files.append(discord.File(fp=io.BytesIO(file_url), filename="image.png"))
except Exception as e:
logger.error(f"解析 bytes 图片失败: {e}")
elif str(file_url).startswith("http"):
# 如果是网络 URL直接拼接到文本中Discord 会自动解析预览
content += f"\n{file_url}"
elif str(file_url).startswith("base64://") or "data:image" in str(file_url):
# 处理 Base64 图片 (需要解码并作为文件上传)
import base64
import io
b64_data = str(file_url)
if b64_data.startswith("base64://"):
b64_data = b64_data[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
# 假设是本地文件路径
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
elif seg_type == "at":
qq_id = seg_data.get("qq")
if qq_id == "all":
content += "@everyone "
else:
# 尝试将 QQ 号映射回 Discord ID (这里简单处理,直接拼接)
content += f"<@{qq_id}> "
elif seg_type == "reply":
# 忽略回复段,或者你可以尝试映射 message_id
pass
# 发送消息到 Discord
try:
# 如果内容为空但有文件Discord 允许发送
if content or files:
await original_message.channel.send(content=content, files=files if files else None)
else:
logger.warning("尝试发送空消息到 Discord已拦截")
except Exception as e:
logger.error(f"发送 Discord 消息失败: {e}")

101
adapters/universal_model.py Normal file
View File

@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
"""
通用数据模型 (Universal Data Models)
此模块定义了平台无关的数据结构,用于在不同平台(如 OneBot, Discord
和业务逻辑层(如 Plugins之间传递数据。
"""
from dataclasses import dataclass, field
from typing import List, Optional, Union, Dict, Any
@dataclass
class UniversalMessageSegment:
"""
平台无关的通用消息段模型。
业务逻辑层只负责生成这个对象,由底层的 Adapter 负责将其翻译成特定平台的格式。
"""
type: str # 消息类型:'text', 'image', 'video', 'audio', 'at', 'reply' 等
data: Dict[str, Any] # 消息数据载荷
@staticmethod
def text(text: str) -> "UniversalMessageSegment":
return UniversalMessageSegment("text", {"text": text})
@staticmethod
def image(url: Optional[str] = None, base64: Optional[str] = None, file_path: Optional[str] = None) -> "UniversalMessageSegment":
"""
图片消息。
Discord 支持直接发 URL 或上传本地文件OneBot 支持 URL、Base64 或本地路径。
"""
return UniversalMessageSegment("image", {"url": url, "base64": base64, "file_path": file_path})
@staticmethod
def video(url: Optional[str] = None, file_path: Optional[str] = None) -> "UniversalMessageSegment":
"""
视频消息。
Discord 通常直接发 URL 或作为附件上传OneBot 支持 URL 或本地路径。
"""
return UniversalMessageSegment("video", {"url": url, "file_path": file_path})
@staticmethod
def at(user_id: str) -> "UniversalMessageSegment":
"""
@某人。
注意:为了兼容 Discord 的雪花 ID (Snowflake)user_id 必须是字符串。
"""
return UniversalMessageSegment("at", {"user_id": user_id})
@staticmethod
def reply(message_id: str) -> "UniversalMessageSegment":
"""
回复某条消息。
"""
return UniversalMessageSegment("reply", {"message_id": message_id})
@dataclass
class UniversalUser:
"""通用用户模型"""
id: str # 用户唯一ID (QQ号 或 Discord Snowflake ID)
name: str # 用户昵称/群名片
avatar_url: str # 头像URL
is_bot: bool # 是否是机器人
@dataclass
class UniversalChannel:
"""通用频道/群组模型"""
id: str # 频道/群组唯一ID (QQ群号 或 Discord Channel ID)
name: str # 频道/群组名称
type: str # 类型:'private' (私聊), 'group' (QQ群), 'guild_text' (Discord文字频道) 等
guild_id: Optional[str] = None # 仅 Discord 有效:服务器(Guild) ID
@dataclass
class UniversalMessageEvent:
"""
平台无关的通用消息事件模型。
这是传递给业务逻辑层(如 bili.py的最终对象。
"""
platform: str # 来源平台标识:'onebot' 或 'discord'
message_id: str # 消息唯一ID (QQ消息ID 或 Discord Message ID)
user: UniversalUser # 发送者信息
channel: UniversalChannel # 消息来源频道/群组信息
raw_message: str # 纯文本形式的消息内容(用于正则匹配、命令解析)
# 解析后的消息段列表(可选,如果你需要处理图文混排)
message: List[UniversalMessageSegment] = field(default_factory=list)
# 原始的底层事件对象(保留引用,方便高级操作)
# 例如OneBot 的原始 JSON 字典,或 discord.py 的 discord.Message 对象
raw_event: Any = field(repr=False, default=None)
async def reply(self, message: Union[str, UniversalMessageSegment, List[UniversalMessageSegment]]):
"""
统一的回复接口。
这个方法应该是一个抽象方法或由具体的 Adapter 注入实现。
业务逻辑层调用此方法时,不需要关心底层是调用 OneBot API 还是 Discord API。
"""
raise NotImplementedError("此方法应由具体的 Platform Adapter 实现")

View File

@@ -3,9 +3,9 @@
# NapCat WebSocket 配置 # NapCat WebSocket 配置
[napcat_ws] [napcat_ws]
uri = "ws://114.66.61.199:3001" uri = "ws://127.0.0.1:6700"
# WebSocket 连接地址 # WebSocket 连接地址
token = "KoIAF.mcEHzxrPYF" token = ""
# 重连间隔(秒) # 重连间隔(秒)
reconnect_interval = 5 reconnect_interval = 5
@@ -13,8 +13,8 @@ reconnect_interval = 5
[reverse_ws] [reverse_ws]
enabled = true # 是否启用 enabled = true # 是否启用
host = "0.0.0.0" # 监听地址 host = "0.0.0.0" # 监听地址
port = 3002 # 监听端口 port = 8095 # 监听端口
token = "" token = "U~jqzl-F8oUXtle-"
# Bot 基础配置 # Bot 基础配置
[bot] [bot]
@@ -96,5 +96,38 @@ dedeuserid = ""
# 用于下载远程文件到本地并提供本地访问,解决 NapCat 无法直接访问某些远程资源的问题 # 用于下载远程文件到本地并提供本地访问,解决 NapCat 无法直接访问某些远程资源的问题
[local_file_server] [local_file_server]
enabled = true # 是否启用 enabled = true # 是否启用
host = "101.36.126.55" # 监听地址 host = "0.0.0.0" # 监听地址0.0.0.0 表示监听所有网卡
port = 3003 # 监听端口 port = 3003 # 监听端口
base_url = "http://101.36.126.55:3003" # 外部访问的 URL
[discord]
enabled = true
token = "MTQ4MjQzODA1NzExNzYxODI4Nw.G9R6uR.ddxHn3pmUf7SyrrOBg_-_lc7Y62lsCitPxpdGM"
proxy = "http://127.0.0.1:7890"
proxy_type = "http"
# 跨平台消息互通配置
[cross_platform]
enabled = true # 是否启用跨平台互通
# 映射配置
# 格式: discord频道ID = {qq_group_id = QQ群ID, name = "显示名称"}
# 示例:
# [cross_platform.mappings.123456789012345678]
# qq_group_id = 123456789
# name = "主群"
# [cross_platform.mappings.987654321098765432]
# qq_group_id = 987654321
# name = "测试群"
[cross_platform.mappings.1130287250513592453]
qq_group_id = 542898825
name = "Paw"
# 日志配置
[logging]
# 控制台日志级别DEBUG, INFO, SUCCESS, WARNING, ERROR
console_level = "INFO"
# 文件日志级别DEBUG, INFO, SUCCESS, WARNING, ERROR
file_level = "DEBUG"
# 全局日志级别DEBUG, INFO, SUCCESS, WARNING, ERROR
level = "DEBUG"

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import tomllib import tomllib
from pydantic import ValidationError from pydantic import ValidationError
from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel, ImageManagerModel, MySQLModel, ReverseWSModel, ThreadingModel, BilibiliModel, LocalFileServerModel from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel, ImageManagerModel, MySQLModel, ReverseWSModel, ThreadingModel, BilibiliModel, LocalFileServerModel, DiscordModel, LoggingModel
from .utils.logger import ModuleLogger from .utils.logger import ModuleLogger
from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError
@@ -157,6 +157,19 @@ class Config:
""" """
return self._model.local_file_server return self._model.local_file_server
@property
def discord(self) -> DiscordModel:
"""
获取 Discord 配置
"""
return self._model.discord
@property
def logging(self) -> LoggingModel:
"""
获取日志配置
"""
return self._model.logging
# 实例化全局配置对象 # 实例化全局配置对象

View File

@@ -107,6 +107,25 @@ class LocalFileServerModel(BaseModel):
port: int = 3003 port: int = 3003
class DiscordModel(BaseModel):
"""
对应 `config.toml` 中的 `[discord]` 配置块。
"""
enabled: bool = False
token: str = ""
proxy: Optional[str] = None
proxy_type: str = "http"
class LoggingModel(BaseModel):
"""
对应 `config.toml` 中的 `[logging]` 配置块。
"""
level: str = "DEBUG"
file_level: str = "DEBUG"
console_level: str = "INFO"
class ConfigModel(BaseModel): class ConfigModel(BaseModel):
""" """
顶层配置模型,整合了所有子配置块。 顶层配置模型,整合了所有子配置块。
@@ -121,5 +140,7 @@ class ConfigModel(BaseModel):
threading: ThreadingModel = Field(default_factory=ThreadingModel) threading: ThreadingModel = Field(default_factory=ThreadingModel)
bilibili: BilibiliModel = Field(default_factory=BilibiliModel) bilibili: BilibiliModel = Field(default_factory=BilibiliModel)
local_file_server: LocalFileServerModel = Field(default_factory=LocalFileServerModel) local_file_server: LocalFileServerModel = Field(default_factory=LocalFileServerModel)
discord: DiscordModel = Field(default_factory=DiscordModel)
logging: LoggingModel = Field(default_factory=LoggingModel)

View File

@@ -0,0 +1,57 @@
from typing import Dict, List, Optional, TYPE_CHECKING
import threading
from ..utils.logger import ModuleLogger
if TYPE_CHECKING:
from ..bot import Bot
class BotManager:
"""
Bot 实例管理器
负责统一管理所有活跃的 Bot 实例(包括正向 WS 和反向 WS 连接的 Bot
提供注册、注销和获取 Bot 实例的方法。
"""
def __init__(self):
self._bots: Dict[str, "Bot"] = {} # type: ignore[assignment] # key: bot_id (str), value: Bot instance
self._lock = threading.RLock()
self.logger = ModuleLogger("BotManager")
def register_bot(self, bot: "Bot") -> None:
"""
注册一个 Bot 实例
"""
if not bot or not bot.self_id:
self.logger.warning("尝试注册无效的 Bot 实例")
return
bot_id = str(bot.self_id)
with self._lock:
self._bots[bot_id] = bot
self.logger.info(f"Bot 实例已注册: {bot_id}")
def unregister_bot(self, bot_id: str) -> None:
"""
注销一个 Bot 实例
"""
with self._lock:
if bot_id in self._bots:
del self._bots[bot_id]
self.logger.info(f"Bot 实例已注销: {bot_id}")
def get_bot(self, bot_id: str) -> Optional["Bot"]:
"""
根据 ID 获取 Bot 实例
"""
with self._lock:
return self._bots.get(str(bot_id))
def get_all_bots(self) -> List["Bot"]:
"""
获取所有活跃的 Bot 实例
"""
with self._lock:
return list(self._bots.values())
# 全局单例实例
bot_manager = BotManager()

View File

@@ -255,6 +255,10 @@ class ReverseWSManager:
del self._client_health[client_id] del self._client_health[client_id]
with self._bots_lock: with self._bots_lock:
if client_id in self.bots: if client_id in self.bots:
# 从 BotManager 注销
from .bot_manager import bot_manager
if self.bots[client_id].self_id:
bot_manager.unregister_bot(str(self.bots[client_id].self_id))
del self.bots[client_id] del self.bots[client_id]
# 清理该客户端的防重复数据 # 清理该客户端的防重复数据
@@ -313,6 +317,7 @@ class ReverseWSManager:
# 为事件注入Bot实例 # 为事件注入Bot实例
from ..ws import ReverseWSClient from ..ws import ReverseWSClient
from .bot_manager import bot_manager
# 为每个前端创建独立的Bot实例 # 为每个前端创建独立的Bot实例
with self._bots_lock: with self._bots_lock:
@@ -322,6 +327,10 @@ class ReverseWSManager:
temp_ws.self_id = event.self_id if hasattr(event, 'self_id') else 0 temp_ws.self_id = event.self_id if hasattr(event, 'self_id') else 0
self.bots[client_id] = Bot(temp_ws) self.bots[client_id] = Bot(temp_ws)
# 注册到 BotManager
if event.self_id:
bot_manager.register_bot(self.bots[client_id])
event.bot = self.bots[client_id] event.bot = self.bots[client_id]
# 记录客户端健康状态 # 记录客户端健康状态
@@ -461,7 +470,7 @@ class ReverseWSManager:
clients_to_send.append((cid, self.clients[cid])) clients_to_send.append((cid, self.clients[cid]))
for cid, websocket in clients_to_send: for cid, websocket in clients_to_send:
await websocket.send(orjson.dumps(payload)) await websocket.send(orjson.dumps(payload).decode('utf-8'))
return await asyncio.wait_for(future, timeout=30.0) return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:

217
core/plugin.py Normal file
View File

@@ -0,0 +1,217 @@
import inspect
import functools
from typing import Optional, Union, Any, Callable
from core.managers.command_manager import matcher as command_manager
from core.permission import Permission
from models.events.message import MessageEvent
class Plugin:
"""
插件基类,提供类风格的插件编写方式。
通过继承此类,可以使用装饰器在类方法上注册命令和事件处理器。
"""
def __init__(self):
self._register_handlers()
def _register_handlers(self):
"""
自动注册带有装饰器的方法。
"""
# 遍历实例的所有方法
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
# 检查是否有命令元数据
if hasattr(method, "_command_meta"):
meta = method._command_meta
# 调用 command_manager 的装饰器来注册绑定后的方法
command_manager.command(
*meta['names'],
permission=meta.get('permission'),
override_permission_check=meta.get('override_permission_check', False)
)(method)
# 检查是否有消息处理元数据
if hasattr(method, "_on_message_meta"):
command_manager.on_message()(method)
# 检查是否有通知处理元数据
if hasattr(method, "_on_notice_meta"):
meta = method._on_notice_meta
command_manager.on_notice(notice_type=meta.get('notice_type'))(method)
# 检查是否有请求处理元数据
if hasattr(method, "_on_request_meta"):
meta = method._on_request_meta
command_manager.on_request(request_type=meta.get('request_type'))(method)
async def send(self, event: MessageEvent, message: Union[str, Any]):
"""
发送消息的基础逻辑。
"""
if hasattr(event, 'reply'):
await event.reply(message)
else:
pass
async def reply(self, event: MessageEvent, message: Union[str, Any]):
"""
回复消息。
"""
await self.send(event, message)
class SimplePlugin(Plugin):
"""
面向新手的简化插件基类。
特性:
1. 自动将公共方法不以_开头注册为指令。
2. 指令名默认为方法名。
3. 自动解析参数类型。
4. 支持直接返回字符串来回复消息。
"""
def _register_handlers(self):
# 先处理带装饰器的方法
super()._register_handlers()
# 扫描普通方法并注册为指令
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
if name.startswith("_"):
continue
if hasattr(method, "_command_meta"):
continue # 已经处理过
if hasattr(method, "_on_message_meta"):
continue
if hasattr(method, "_on_notice_meta"):
continue
if hasattr(method, "_on_request_meta"):
continue
if name in dir(Plugin):
continue # 忽略基类方法
self._register_method_as_command(name, method)
def _register_method_as_command(self, name: str, method: Callable):
# 获取方法的签名
sig = inspect.signature(method)
# 包装函数
@functools.wraps(method)
async def wrapper(event: MessageEvent, args: list[str]):
try:
# 准备调用参数
call_args: list[Any] = []
# 跳过 self第一个参数应该是 event
params = list(sig.parameters.values())
if not params:
# 方法没有参数?这不应该发生,至少要有 event
await method()
return
# 绑定 event
call_args.append(event)
# 处理剩余参数
method_params = params[1:] # 除去 event
if not method_params:
# 方法不需要额外参数
pass
elif len(method_params) == 1:
# 只有一个参数,把所有 args 拼起来传给它
param = method_params[0]
if args:
str_val = " ".join(args)
val: Any = str_val
# 类型转换
if param.annotation is int:
val = int(str_val)
elif param.annotation is float:
val = float(str_val)
call_args.append(val)
elif param.default is not inspect.Parameter.empty:
call_args.append(param.default)
else:
await event.reply(f"缺少参数: {param.name}")
return
else:
# 多个参数,尝试一一对应
if len(args) < len([p for p in method_params if p.default is inspect.Parameter.empty]):
# 必填参数不足
usage = " ".join([f"<{p.name}>" for p in method_params])
await event.reply(f"参数不足。用法: /{name} {usage}")
return
for i, param in enumerate(method_params):
if i < len(args):
arg_str = args[i]
arg_val: Any = arg_str
# 简单的类型转换
try:
if param.annotation is int:
arg_val = int(arg_str)
elif param.annotation is float:
arg_val = float(arg_str)
except ValueError:
await event.reply(f"参数 {param.name} 类型错误,应为 {param.annotation.__name__}")
return
call_args.append(arg_val)
else:
call_args.append(param.default)
# 调用方法
result = await method(*call_args)
# 如果有返回值,自动回复
if result is not None:
await event.reply(str(result))
except Exception as e:
await event.reply(f"执行命令时发生错误: {str(e)}")
# 注册命令
command_manager.command(name)(wrapper)
def command(name: str, *aliases: str, permission: Optional[Permission] = None, override_permission_check: bool = False):
"""
装饰器:标记方法为命令处理器。
"""
def decorator(func):
func._command_meta = {
"names": (name,) + aliases,
"permission": permission,
"override_permission_check": override_permission_check
}
return func
return decorator
def on_message():
"""
装饰器:标记方法为通用消息处理器。
"""
def decorator(func):
func._on_message_meta = {}
return func
return decorator
def on_notice(notice_type: Optional[str] = None):
"""
装饰器:标记方法为通知处理器。
"""
def decorator(func):
func._on_notice_meta = {
"notice_type": notice_type
}
return func
return decorator
def on_request(request_type: Optional[str] = None):
"""
装饰器:标记方法为请求处理器。
"""
def decorator(func):
func._on_request_meta = {
"request_type": request_type
}
return func
return decorator

View File

@@ -72,13 +72,14 @@ class LocalFileServer:
url_hash = hashlib.md5(url.encode()).hexdigest()[:16] url_hash = hashlib.md5(url.encode()).hexdigest()[:16]
return f"file_{url_hash}" return f"file_{url_hash}"
async def download_file(self, url: str, timeout: int = 60) -> Optional[str]: async def download_file(self, url: str, timeout: int = 60, headers: Optional[Dict[str, str]] = None) -> Optional[str]:
""" """
下载远程文件到本地 下载远程文件到本地
Args: Args:
url (str): 远程文件 URL url (str): 远程文件 URL
timeout (int): 下载超时时间(秒) timeout (int): 下载超时时间(秒)
headers (Optional[Dict[str, str]]): 请求头
Returns: Returns:
Optional[str]: 本地文件 ID如果失败则返回 None Optional[str]: 本地文件 ID如果失败则返回 None
@@ -96,7 +97,7 @@ class LocalFileServer:
# 使用 aiohttp 下载文件 # 使用 aiohttp 下载文件
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response: async with session.get(url, timeout=timeout, headers=headers) as response:
if response.status != 200: if response.status != 200:
logger.error(f"[LocalFileServer] 下载失败: HTTP {response.status}") logger.error(f"[LocalFileServer] 下载失败: HTTP {response.status}")
return None return None
@@ -195,13 +196,14 @@ async def stop_local_file_server():
_local_file_server = None _local_file_server = None
async def download_to_local(url: str, timeout: int = 60) -> Optional[str]: async def download_to_local(url: str, timeout: int = 60, headers: Optional[Dict[str, str]] = None) -> Optional[str]:
""" """
下载远程文件到本地并返回本地访问 URL 下载远程文件到本地并返回本地访问 URL
Args: Args:
url (str): 远程文件 URL url (str): 远程文件 URL
timeout (int): 下载超时时间(秒) timeout (int): 下载超时时间(秒)
headers (Optional[Dict[str, str]]): 请求头
Returns: Returns:
Optional[str]: 本地访问 URL如果失败则返回 None Optional[str]: 本地访问 URL如果失败则返回 None
@@ -210,7 +212,7 @@ async def download_to_local(url: str, timeout: int = 60) -> Optional[str]:
if not server: if not server:
return None return None
file_id = await server.download_file(url, timeout) file_id = await server.download_file(url, timeout, headers)
if not file_id: if not file_id:
return None return None

View File

@@ -8,6 +8,13 @@ import os
from pathlib import Path from pathlib import Path
from loguru import logger from loguru import logger
# 导入全局配置
try:
from ..config_loader import global_config
USE_CONFIG = True
except ImportError:
USE_CONFIG = False
# 定义日志格式添加进程ID和线程ID作为上下文信息 # 定义日志格式添加进程ID和线程ID作为上下文信息
LOG_FORMAT = ( LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | " "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
@@ -30,14 +37,21 @@ DEBUG_LOG_FORMAT = (
# 移除 loguru 默认的处理器 # 移除 loguru 默认的处理器
logger.remove() logger.remove()
# 获取当前环境 # 获取日志级别配置
ENVIRONMENT = os.getenv("NEOBOT_ENV", "development") if USE_CONFIG:
LOG_LEVEL = global_config.logging.level
FILE_LEVEL = global_config.logging.file_level
CONSOLE_LEVEL = global_config.logging.console_level
else:
LOG_LEVEL = "DEBUG"
FILE_LEVEL = "DEBUG"
CONSOLE_LEVEL = "INFO"
# 添加控制台输出处理器 # 添加控制台输出处理器
logger.add( logger.add(
sys.stderr, sys.stderr,
level="INFO" if ENVIRONMENT == "production" else "DEBUG", level=CONSOLE_LEVEL,
format=LOG_FORMAT if ENVIRONMENT == "production" else DEBUG_LOG_FORMAT, format=LOG_FORMAT,
colorize=True, colorize=True,
enqueue=True # 异步写入 enqueue=True # 异步写入
) )
@@ -50,7 +64,7 @@ log_file_path = log_dir / "{time:YYYY-MM-DD}.log"
# 添加文件输出处理器 # 添加文件输出处理器
logger.add( logger.add(
log_file_path, log_file_path,
level="DEBUG", level=FILE_LEVEL,
format=DEBUG_LOG_FORMAT, format=DEBUG_LOG_FORMAT,
colorize=False, colorize=False,
rotation="00:00", # 每天午夜创建新文件 rotation="00:00", # 每天午夜创建新文件

View File

@@ -232,6 +232,11 @@ class WS:
""" """
self.logger.info("正在关闭 WebSocket 客户端...") self.logger.info("正在关闭 WebSocket 客户端...")
# 从 BotManager 注销
if self.bot and self.self_id:
from .managers.bot_manager import bot_manager
bot_manager.unregister_bot(str(self.self_id))
if self.ws: if self.ws:
await self.ws.close() await self.ws.close()
@@ -286,7 +291,7 @@ class WS:
self._pending_requests[echo_id] = future self._pending_requests[echo_id] = future
try: try:
await self.ws.send(orjson.dumps(payload)) await self.ws.send(orjson.dumps(payload).decode('utf-8'))
return await asyncio.wait_for(future, timeout=30.0) return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
with self._pending_requests_lock: with self._pending_requests_lock:

View File

@@ -65,10 +65,30 @@ python setup_mypyc.py build_ext --inplace
uri = "ws://127.0.0.1:3001" uri = "ws://127.0.0.1:3001"
token = "" token = ""
#当然你也可以配置逆向连接
[reverse_ws]
enabled = true # 是否启用
host = "0.0.0.0" # 监听地址
port = 3002 # 监听端口
token = ""
[redis] [redis]
host = "127.0.0.1" host = "127.0.0.1"
port = 6379 port = 6379
db = 0 db = 0
# MySQL 配置
[mysql]
# MySQL 主机地址
host = "114.66.61.199"
# MySQL 端口
port = 42398
# MySQL 用户名
user = "neobot"
# MySQL 密码
password = "neobot"
# MySQL 数据库名称
db = "neobot"
``` ```
`uri` 改成你自己的 OneBot 地址。 `uri` 改成你自己的 OneBot 地址。
@@ -88,6 +108,3 @@ python -X jit -X gil=0 main.py
如果你看到日志刷出来,最后显示 "连接成功!",恭喜,你成功了! 如果你看到日志刷出来,最后显示 "连接成功!",恭喜,你成功了!
现在,试着给你的机器人发个 `/help`看看会返回什么东西 现在,试着给你的机器人发个 `/help`看看会返回什么东西
**多前端支持**
如果需要同时连接多个 OneBot 实现(如多个 QQ 账号GIL-free 模式可以确保每个连接真正并行处理事件,不会相互阻塞。

View File

@@ -73,6 +73,13 @@ Bot 应该会回复你:“你好,[你的昵称]!”
就这么简单,一个最基础的插件就写完了。 就这么简单,一个最基础的插件就写完了。
## 极简插件开发(推荐新手)
如果你觉得上面的装饰器写法太复杂,或者只是想快速写几个简单的指令,我们提供了一种**极简模式**。
你只需要定义一个类,写几个方法,它们就会自动变成指令!
- [查看极简插件开发指南](./simple-plugin.md)
## 进阶阅读 ## 进阶阅读
- [指令处理](./command-handling.md): 了解如何处理参数、获取用户输入。 - [指令处理](./command-handling.md): 了解如何处理参数、获取用户输入。

View File

@@ -0,0 +1,127 @@
# 极简插件开发指南
如果你是 Python 新手,或者只是想快速写一些简单的指令,那么 `SimplePlugin` 是你的最佳选择。它让你无需理解复杂的装饰器和事件处理机制,只需要写普通的 Python 方法即可。
## 1. 快速开始
`plugins/` 目录下创建一个新文件,例如 `my_simple_plugin.py`
```python
from core.plugin import SimplePlugin
from models.events.message import MessageEvent
class MyPlugin(SimplePlugin):
async def hello(self, event: MessageEvent):
"""
发送 /hello 即可调用
"""
return "你好!这是极简插件。"
async def echo(self, event: MessageEvent, msg: str):
"""
发送 /echo <内容> 即可调用
"""
return f"你说了: {msg}"
# 必须实例化插件以生效
plugin = MyPlugin()
```
就是这么简单!现在你可以发送 `/hello``/echo 测试` 来测试你的插件了。
## 2. 核心特性
### 方法即指令
`SimplePlugin` 的子类中,任何**不以下划线开头**的方法都会自动注册为指令。
指令名称就是方法名。
例如:
- `async def ping(self, ...)` -> 注册为 `/ping`
- `async def help_me(self, ...)` -> 注册为 `/help_me`
### 自动参数解析
框架会根据你定义的参数类型,自动解析用户输入的参数。
#### 字符串参数
```python
async def greet(self, event: MessageEvent, name: str):
return f"你好, {name}"
```
- 发送 `/greet Neo` -> `name` 参数为 `"Neo"`
#### 数字参数 (自动转换类型)
```python
async def add(self, event: MessageEvent, a: int, b: int):
return f"{a} + {b} = {a + b}"
```
- 发送 `/add 10 20` -> `a``10` (int), `b``20` (int)
- 如果用户输入非数字(如 `/add a b`),框架会自动提示参数类型错误。
#### 捕获剩余文本
如果你的方法只有一个参数(除了 `event`),那么该参数会捕获指令后的所有文本。
```python
async def broadcast(self, event: MessageEvent, content: str):
return f"广播内容: {content}"
```
- 发送 `/broadcast 这是一个 很长 的消息` -> `content``"这是一个 很长 的消息"`
### 自动回复
如果你的方法返回了字符串(`str`),框架会自动将其作为回复发送给用户。
如果返回 `None`(即没有 return 语句),则不发送回复。
```python
async def silent(self, event: MessageEvent):
# 执行一些操作,但不回复
print("Silent command executed")
# 也可以手动调用 reply
await event.reply("手动回复")
```
## 3. 进阶用法
### 访问事件对象
所有方法的第一个参数(除了 `self`)必须是 `event`。通过 `event` 对象,你可以获取更多信息:
```python
async def whoami(self, event: MessageEvent):
user_id = event.user_id
nickname = event.sender.nickname
return f"你是 {nickname} ({user_id})"
```
### 混合使用装饰器
虽然 `SimplePlugin` 旨在简化开发,但你仍然可以使用装饰器来处理更复杂的场景,例如权限控制或监听非指令消息。
```python
from core.plugin import SimplePlugin, command, on_message
from core.permission import Permission
class AdvancedPlugin(SimplePlugin):
# 普通指令
async def normal(self, event: MessageEvent):
return "普通指令"
# 使用装饰器添加权限控制
@command("admin_only", permission=Permission.ADMIN)
async def admin_op(self, event: MessageEvent, args: list[str]):
return "只有管理员能看到这个"
# 监听所有消息
@on_message()
async def handle_all(self, event: MessageEvent):
if "敏感词" in event.raw_message:
await event.reply("检测到敏感词!")
```
## 4. 注意事项
1. **方法名**:不要使用以 `_` 开头的方法名作为指令,这些方法会被忽略。
2. **参数类型**:目前支持 `str`, `int`, `float` 的自动转换。
3. **实例化**:不要忘记在文件末尾实例化你的类(`plugin = MyPlugin()`),否则插件不会生效。

29
main.py
View File

@@ -21,24 +21,10 @@ from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor from core.utils.executor import run_in_thread_pool, initialize_executor
from core.config_loader import global_config as config from core.config_loader import global_config as config
from core.services.local_file_server import start_local_file_server, stop_local_file_server from core.services.local_file_server import start_local_file_server, stop_local_file_server
from adapters.discord_adapter import DiscordAdapter
# 尝试使用高性能事件循环
try:
if sys.platform == 'win32':
# winloop 与 Playwright 存在兼容性问题 (不支持 startupinfo),暂时禁用
# import winloop
# asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
# print("已启用 winloop 高性能事件循环")
print("Windows 平台检测到 Playwright已自动禁用 winloop 以确保兼容性")
else:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
print("已启用 uvloop 高性能事件循环")
except ImportError:
print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环")
# 将项目根目录添加到 sys.path # 将项目根目录添加到 sys.path
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT_DIR) sys.path.insert(0, ROOT_DIR)
@@ -158,6 +144,15 @@ async def main():
asyncio.create_task(start_local_file_server()) asyncio.create_task(start_local_file_server())
logger.success(f"本地文件服务器已启动: http://{config.local_file_server.host}:{config.local_file_server.port}") logger.success(f"本地文件服务器已启动: http://{config.local_file_server.host}:{config.local_file_server.port}")
# 启动 Discord 客户端(如果启用)
discord_client = None
if config.discord.enabled and config.discord.token:
logger.info("正在启动 Discord 客户端...")
discord_client = DiscordAdapter(token=config.discord.token)
asyncio.create_task(discord_client.start_client())
elif config.discord.enabled:
logger.warning("Discord 已启用,但未配置 Token跳过启动。")
# 启动文件监控 # 启动文件监控
# 监控 plugins 目录 # 监控 plugins 目录
plugin_path = os.path.join(os.path.dirname(__file__), "plugins") plugin_path = os.path.join(os.path.dirname(__file__), "plugins")
@@ -202,6 +197,9 @@ async def main():
if websocket_client: if websocket_client:
await websocket_client.close() await websocket_client.close()
if discord_client:
await discord_client.close()
# 关闭反向 WebSocket 服务端 # 关闭反向 WebSocket 服务端
if config.reverse_ws.enabled and reverse_ws_manager.server: if config.reverse_ws.enabled and reverse_ws_manager.server:
await reverse_ws_manager.stop() await reverse_ws_manager.stop()
@@ -233,7 +231,6 @@ if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt: except KeyboardInterrupt:
# 捕获 KeyboardInterrupt不做任何操作让 asyncio.run 正常结束 # 捕获 KeyboardInterrupt不做任何操作让 asyncio.run 正常结束
# 这样 main 函数中的 finally 块会被执行
pass pass
except Exception as e: except Exception as e:
main_logger.exception("程序发生未处理的全局异常") main_logger.exception("程序发生未处理的全局异常")

View File

@@ -114,10 +114,21 @@ async def broadcast_subscription_loop():
logger.info(f"[Broadcast] 收到跨机器人广播消息: 来源 {robot_id}") logger.info(f"[Broadcast] 收到跨机器人广播消息: 来源 {robot_id}")
# 获取当前机器人的实例 # 获取所有活跃的 Bot 实例
from core.ws import WS from core.managers.bot_manager import bot_manager
if WS.instance: all_bots = bot_manager.get_all_bots()
await broadcast_message_to_groups(WS.instance, message_data, robot_id)
if not all_bots:
logger.warning("[Broadcast] 没有活跃的 Bot 实例,无法转发广播消息")
continue
# 遍历所有 Bot 进行广播
for bot in all_bots:
# 避免重复广播:如果消息来源就是当前 Bot则跳过
if str(bot.self_id) == str(robot_id):
continue
await broadcast_message_to_groups(bot, message_data, robot_id)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error(f"[Broadcast] 解析广播消息失败: {e}") logger.error(f"[Broadcast] 解析广播消息失败: {e}")
@@ -178,16 +189,26 @@ async def handle_broadcast_content(event: MessageEvent):
await event.reply("捕获到的消息为空,已取消广播。") await event.reply("捕获到的消息为空,已取消广播。")
return True return True
# 获取当前机器人ID使用反向WS的机器人ID # 获取当前机器人ID
from core.ws import WS
robot_id = "unknown" robot_id = "unknown"
if WS.instance and hasattr(WS.instance, 'self_id'): if event.bot and hasattr(event.bot, 'self_id'):
robot_id = str(WS.instance.self_id) robot_id = str(event.bot.self_id)
# --- 执行本地广播 --- # --- 执行本地广播 ---
# 1. 先让接收到指令的这个 Bot 进行广播
await broadcast_message_to_groups(event.bot, message_to_broadcast, robot_id) await broadcast_message_to_groups(event.bot, message_to_broadcast, robot_id)
# --- 通过 Redis 发布消息给其他机器人 --- # 2. 获取其他所有 Bot 并进行广播(针对同一进程内的其他 Bot
from core.managers.bot_manager import bot_manager
all_bots = bot_manager.get_all_bots()
for bot in all_bots:
# 跳过已经广播过的 Bot (即当前接收指令的 Bot)
if str(bot.self_id) == robot_id:
continue
await broadcast_message_to_groups(bot, message_to_broadcast, robot_id)
# --- 通过 Redis 发布消息给其他进程的机器人 ---
try: try:
if redis_manager.redis: if redis_manager.redis:
broadcast_data = { broadcast_data = {

View File

@@ -0,0 +1,38 @@
from core.plugin import Plugin, command, on_message
from models.events.message import MessageEvent
from core.permission import Permission
# 插件元信息
__plugin_meta__ = {
"name": "类风格插件示例",
"description": "演示如何使用类风格编写插件",
"usage": "/hello - 打招呼\n/echo <msg> - 复读消息",
}
class MyPlugin(Plugin):
def __init__(self):
super().__init__()
# 可以在这里初始化一些状态
self.count = 0
@command("hello")
async def hello(self, event: MessageEvent, args: list[str]):
self.count += 1
await self.reply(event, f"Hello from class-based plugin! (Called {self.count} times)")
@command("echo", permission=Permission.USER)
async def echo(self, event: MessageEvent, args: list[str]):
if args:
await self.reply(event, " ".join(args))
else:
await self.reply(event, "请输入要复读的内容。")
@on_message()
async def handle_message(self, event: MessageEvent):
# 这是一个通用的消息处理器,会处理所有消息
# 注意:这可能会与命令冲突,通常需要过滤
if "特定关键词" in event.raw_message:
await self.reply(event, "检测到特定关键词!")
# 实例化插件以注册
plugin = MyPlugin()

703
plugins/cross_platform.py Normal file
View File

@@ -0,0 +1,703 @@
# -*- coding: utf-8 -*-
"""
跨平台消息互通插件
功能:
- Discord 频道与 QQ 群之间的消息互通
- 在消息中自动标注来源平台和子频道/群组 ID
- 支持 OneBot v11 协议和数据结构
- 支持图片、视频等媒体消息
"""
import asyncio
import json
import re
import time
from typing import Dict, List, Optional, Any
from core.managers.command_manager import matcher
from models.events.message import GroupMessageEvent, PrivateMessageEvent, MessageEvent
from models.message import MessageSegment
from core.permission import Permission
from core.utils.logger import logger
from core.managers.redis_manager import redis_manager
# --- 配置 ---
# 跨平台映射配置
# 格式: {discord_channel_id: {"qq_group_id": qq_group_id, "name": "显示名称"}}
CROSS_PLATFORM_MAP: Dict[int, Dict[str, Any]] = {
# 示例配置:
# 123456789012345678: {"qq_group_id": 123456789, "name": "主群"},
# 987654321098765432: {"qq_group_id": 987654321, "name": "测试群"},
}
# Redis 通道名称
CROSS_PLATFORM_CHANNEL = "neobot_cross_platform"
# 是否启用跨平台转发
ENABLE_CROSS_PLATFORM = True
def get_platform_info(platform: str, identifier: Any) -> str:
"""
获取平台信息字符串,用于在消息中标注来源
Args:
platform: 平台名称 ('discord''qq')
identifier: 频道 ID 或群组 ID
Returns:
格式化的平台信息字符串
"""
if platform == "discord":
channel_id = int(identifier)
if channel_id in CROSS_PLATFORM_MAP:
group_info = CROSS_PLATFORM_MAP[channel_id]
group_name = group_info.get("name", f"群组 {group_info['qq_group_id']}")
return f"[Discord {group_name}]"
return f"[Discord]"
elif platform == "qq":
group_id = int(identifier)
return f"[QQ {group_id}]"
return ""
async def format_discord_to_qq_content(
discord_username: str,
discord_discriminator: str,
content: str,
channel_id: int,
attachments: List[str] = None
) -> tuple[str, List[str]]:
"""
将 Discord 消息格式化为 QQ 消息格式
Args:
discord_username: Discord 用户名
discord_discriminator: Discord discriminator (如 #1234)
content: 消息内容
channel_id: Discord 频道 ID
attachments: 附件列表
Returns:
格式化后的消息内容和图片列表
"""
platform_info = get_platform_info("discord", channel_id)
# 构建消息头(简化版,只显示名字)
message_header = f"{platform_info}\n {discord_username}:"
# 构建消息体
message_body = content.strip() if content else ""
# 组合完整消息
if message_body:
full_message = f"{message_header}\n{message_body}"
else:
full_message = message_header
return full_message, attachments or []
async def format_qq_to_discord_content(
qq_nickname: str,
qq_user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[str] = None
) -> tuple[str, List[str]]:
"""
将 QQ 消息格式化为 Discord 消息格式
Args:
qq_nickname: QQ 昵称
qq_user_id: QQ 用户 ID
group_name: 群名称
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表
Returns:
格式化后的消息内容和图片列表
"""
platform_info = get_platform_info("qq", group_id)
# 构建消息头(简化版,只显示名字)
message_header = f"{platform_info} {qq_nickname}:"
# 构建消息体
message_body = content if content else ""
# 组合完整消息(移除分隔符)
if message_body:
full_message = f"{message_header} {message_body}"
else:
full_message = message_header
return full_message, attachments or []
async def send_to_discord(channel_id: int, content: str, attachments: List[str] = None):
"""
发送消息到 Discord 频道
通过 Redis 发布消息,由 Discord 适配器接收并发送
这样可以避免跨模块导入实例的问题
Args:
channel_id: Discord 频道 ID
content: 消息内容
attachments: 附件 URL 列表
"""
try:
publish_data = {
"type": "send_message",
"channel_id": channel_id,
"content": content,
"attachments": attachments or []
}
await redis_manager.redis.publish("neobot_discord_send", json.dumps(publish_data))
logger.info(f"[CrossPlatform] 消息已发布到 Redis 供 Discord 适配器发送: {channel_id}")
except Exception as e:
logger.error(f"[CrossPlatform] 发送消息到 Discord 失败: {e}")
async def send_to_qq(group_id: int, content: str, attachments: List[str] = None):
"""
发送消息到 QQ 群
Args:
group_id: QQ 群 ID
content: 消息内容
attachments: 附件 URL 列表
"""
try:
from core.managers.bot_manager import bot_manager
from models.message import MessageSegment
# 获取所有 QQ 机器人实例
all_bots = bot_manager.get_all_bots()
if not all_bots:
logger.error(f"[CrossPlatform] 没有可用的 QQ 机器人实例")
return
logger.debug(f"[CrossPlatform] 找到 {len(all_bots)} 个 QQ 机器人实例")
for bot in all_bots:
try:
# 构建消息
message = content
# 发送消息(如果有附件,使用 OneBot 的图片格式)
if attachments:
# 构建完整消息:文本 + 图片
from models.message import MessageSegment
full_message = []
if content:
full_message.append(MessageSegment.text(content))
for attachment in attachments:
full_message.append(MessageSegment.image(attachment, cache=True, proxy=True, timeout=30))
logger.debug(f"[CrossPlatform] 准备发送消息到 QQ 群 {group_id}: {full_message}")
# 一次性发送
await bot.send_group_msg(group_id, full_message)
logger.info(f"[CrossPlatform] 消息已发送到 QQ 群 {group_id}")
else:
# 只发送文本
await bot.send_group_msg(group_id, message)
logger.info(f"[CrossPlatform] 消息已发送到 QQ 群 {group_id}")
break # 只需要发送一次
except Exception as e:
logger.error(f"[CrossPlatform] 发送消息到 QQ 群 {group_id} 失败: {e}")
except Exception as e:
logger.error(f"[CrossPlatform] 发送消息到 QQ 失败: {e}")
async def forward_discord_to_qq(
discord_username: str,
discord_discriminator: str,
content: str,
channel_id: int,
attachments: List[str] = None
):
"""
将 Discord 消息转发到所有映射的 QQ 群
Args:
discord_username: Discord 用户名
discord_discriminator: Discord discriminator
content: 消息内容
channel_id: Discord 频道 ID
attachments: 附件列表
"""
if channel_id not in CROSS_PLATFORM_MAP:
logger.warning(f"[CrossPlatform] 未找到 Discord 频道 {channel_id} 的映射配置")
return
group_info = CROSS_PLATFORM_MAP[channel_id]
target_qq_group = group_info["qq_group_id"]
# 格式化消息
formatted_content, image_list = await format_discord_to_qq_content(
discord_username,
discord_discriminator,
content,
channel_id,
attachments
)
# 发送到 QQ
await send_to_qq(target_qq_group, formatted_content, image_list)
logger.success(f"[CrossPlatform] Discord 频道 {channel_id} -> QQ 群 {target_qq_group}")
async def forward_qq_to_discord(
qq_nickname: str,
qq_user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[str] = None
):
"""
将 QQ 消息转发到所有映射的 Discord 频道
Args:
qq_nickname: QQ 昵称
qq_user_id: QQ 用户 ID
group_name: 群名称
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表
"""
# 查找映射的 Discord 频道
target_channels = []
for discord_channel_id, info in CROSS_PLATFORM_MAP.items():
if info["qq_group_id"] == group_id:
target_channels.append(discord_channel_id)
if not target_channels:
logger.warning(f"[CrossPlatform] 未找到 QQ 群 {group_id} 的映射配置")
return
# 格式化消息
formatted_content, image_list = await format_qq_to_discord_content(
qq_nickname,
qq_user_id,
group_name,
group_id,
content,
attachments
)
# 发送到所有映射的 Discord 频道
for channel_id in target_channels:
await send_to_discord(channel_id, formatted_content, image_list)
logger.success(f"[CrossPlatform] QQ 群 {group_id} -> Discord 频道 {target_channels}")
async def publish_to_redis(platform: str, data: dict):
"""
通过 Redis 发布跨平台消息
Args:
platform: 平台名称
data: 消息数据
"""
try:
if redis_manager.redis:
publish_data = {
"platform": platform,
"data": data,
"timestamp": int(__import__('time').time())
}
await redis_manager.redis.publish(CROSS_PLATFORM_CHANNEL, json.dumps(publish_data))
logger.debug(f"[CrossPlatform] 已通过 Redis 发布消息: platform={platform}")
except Exception as e:
logger.error(f"[CrossPlatform] Redis 发布失败: {e}")
async def handle_discord_message(
username: str,
discriminator: str,
content: str,
channel_id: int,
attachments: List[str] = None
):
"""
处理 Discord 消息并转发
Args:
username: Discord 用户名
discriminator: Discord discriminator
content: 消息内容
channel_id: Discord 频道 ID
attachments: 附件列表
"""
if not ENABLE_CROSS_PLATFORM:
return
logger.info(f"[CrossPlatform] 收到 Discord 消息: {username}#{discriminator} in {channel_id}")
# 转发到映射的 QQ 群
await forward_discord_to_qq(username, discriminator, content, channel_id, attachments)
async def handle_qq_message(
nickname: str,
user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[str] = None
):
"""
处理 QQ 消息并转发
Args:
nickname: QQ 昵称
user_id: QQ 用户 ID
group_name: 群名称
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表
"""
if not ENABLE_CROSS_PLATFORM:
return
logger.info(f"[CrossPlatform] 收到 QQ 消息: {nickname} ({user_id}) in {group_name}({group_id})")
# 转发到映射的 Discord 频道
await forward_qq_to_discord(nickname, user_id, group_name, group_id, content, attachments)
@matcher.on_message()
async def handle_qq_group_message(event: GroupMessageEvent):
"""
处理 QQ 群消息,转发到 Discord
"""
if not ENABLE_CROSS_PLATFORM:
return
# 检查是否是映射的群组
group_id = event.group_id
mapped_channel = None
for discord_channel_id, info in CROSS_PLATFORM_MAP.items():
if info["qq_group_id"] == group_id:
mapped_channel = discord_channel_id
break
if mapped_channel is None:
return
# 提取消息内容
content = ""
attachments = []
if isinstance(event.message, list):
for segment in event.message:
if isinstance(segment, MessageSegment):
if segment.type == "text":
content += segment.data.get("text", "")
elif segment.type == "image":
file_url = segment.data.get("url") or segment.data.get("file")
if file_url:
attachments.append(str(file_url))
elif segment.type == "video":
file_url = segment.data.get("url") or segment.data.get("file")
if file_url:
attachments.append(str(file_url))
elif segment.type == "at":
qq_id = segment.data.get("qq")
if qq_id and qq_id != "all":
content += f"@{qq_id} "
elif qq_id == "all":
content += "@所有人 "
elif isinstance(segment, str):
content += segment
elif isinstance(event.message, str):
content = event.message
# 清理多余空白
content = content.strip()
# 获取群名称
group_name = ""
try:
group_info = await event.bot.get_group_info(event.group_id)
group_name = group_info.get("group_name", "")
except Exception:
group_name = f"{group_id}"
# 处理消息
await handle_qq_message(
nickname=event.sender.nickname or event.sender.card or str(event.user_id),
user_id=event.user_id,
group_name=group_name,
group_id=group_id,
content=content,
attachments=attachments
)
@matcher.on_message()
async def handle_discord_message_event(event: Any):
"""
处理 Discord 消息事件(通过适配器注入)
"""
if not ENABLE_CROSS_PLATFORM:
return
# 检查事件是否包含 Discord 特定信息
if not hasattr(event, '_is_discord_message'):
return
discord_channel_id = getattr(event, 'discord_channel_id', None)
if discord_channel_id is None:
return
# 提取消息内容
content = event.raw_message or ""
attachments = []
# 从 raw_message 中提取附件 URLDiscord 附件已添加到 raw_message
import re
url_pattern = r'https?://[^\s<>"]+|www\.\S+'
raw_message_lines = content.split('\n')
content_lines = []
for line in raw_message_lines:
line = line.strip()
if re.match(url_pattern, line):
# 这是附件 URL
if line not in attachments:
attachments.append(line)
else:
# 这是普通文本内容
if line:
content_lines.append(line)
content = '\n'.join(content_lines).strip()
# 从 message 列表中提取(备用方案)
if hasattr(event, 'message') and isinstance(event.message, list):
for segment in event.message:
if isinstance(segment, MessageSegment):
if segment.type == "text":
pass # 已经在 raw_message 中
elif segment.type == "image":
file_url = segment.data.get("url") or segment.data.get("file")
if file_url and str(file_url) not in attachments:
attachments.append(str(file_url))
elif segment.type == "video":
file_url = segment.data.get("url") or segment.data.get("file")
if file_url and str(file_url) not in attachments:
attachments.append(str(file_url))
# 获取用户信息
discord_username = getattr(event, 'discord_username', 'Unknown')
discord_discriminator = getattr(event, 'discord_discriminator', '')
# 处理消息
await handle_discord_message(
username=discord_username,
discriminator=discord_discriminator,
content=content,
channel_id=discord_channel_id,
attachments=attachments
)
async def cross_platform_subscription_loop():
"""
Redis 跨平台消息订阅循环
"""
if redis_manager.redis is None:
logger.warning("[CrossPlatform] Redis 未初始化,无法启动订阅")
return
try:
pubsub = redis_manager.redis.pubsub()
await pubsub.subscribe(CROSS_PLATFORM_CHANNEL)
logger.success("[CrossPlatform] 已订阅 Redis 跨平台频道")
async for message in pubsub.listen():
if message["type"] == "message":
try:
data = json.loads(message["data"])
platform = data.get("platform", "")
message_data = data.get("data", {})
logger.info(f"[CrossPlatform] 收到跨平台消息: {platform}")
if platform == "discord":
# 从 Discord 转发到 QQ
await forward_discord_to_qq(
discord_username=message_data.get("username", "Unknown"),
discord_discriminator=message_data.get("discriminator", ""),
content=message_data.get("content", ""),
channel_id=message_data.get("channel_id", 0),
attachments=message_data.get("attachments", [])
)
elif platform == "qq":
# 从 QQ 转发到 Discord
await forward_qq_to_discord(
qq_nickname=message_data.get("nickname", "Unknown"),
qq_user_id=message_data.get("user_id", 0),
group_name=message_data.get("group_name", ""),
group_id=message_data.get("group_id", 0),
content=message_data.get("content", ""),
attachments=message_data.get("attachments", [])
)
except json.JSONDecodeError as e:
logger.error(f"[CrossPlatform] 解析消息失败: {e}")
except Exception as e:
logger.error(f"[CrossPlatform] 处理跨平台消息失败: {e}")
except Exception as e:
logger.error(f"[CrossPlatform] 订阅循环异常: {e}")
# 全局订阅任务
_subscription_task = None
async def start_cross_platform_subscription():
"""
启动跨平台消息订阅
"""
global _subscription_task
if _subscription_task is None and ENABLE_CROSS_PLATFORM:
_subscription_task = asyncio.create_task(cross_platform_subscription_loop())
logger.success("[CrossPlatform] 跨平台消息订阅已启动")
async def stop_cross_platform_subscription():
"""
停止跨平台消息订阅
"""
global _subscription_task
if _subscription_task:
_subscription_task.cancel()
try:
await _subscription_task
except asyncio.CancelledError:
pass
_subscription_task = None
logger.info("[CrossPlatform] 跨平台消息订阅已停止")
async def reload_config():
"""
重新加载配置
"""
global CROSS_PLATFORM_MAP, ENABLE_CROSS_PLATFORM
try:
import os
config_path = os.path.join(os.path.dirname(__file__), "..", "config.toml")
if os.path.exists(config_path):
try:
import tomllib
except ImportError:
import tomli as tomllib
with open(config_path, "rb") as f:
config = tomllib.load(f)
cross_platform_config = config.get("cross_platform", {})
ENABLE_CROSS_PLATFORM = cross_platform_config.get("enabled", True)
# 重新加载映射配置(支持两种格式)
mappings = cross_platform_config.get("mappings", {})
CROSS_PLATFORM_MAP = {}
# 格式1: [cross_platform.mappings.123456789012345678] 子表形式
if isinstance(mappings, dict) and mappings:
for key, value in mappings.items():
if isinstance(value, dict) and "qq_group_id" in value:
try:
discord_id = int(key) if str(key).isdigit() else int(str(key).split('.')[-1])
CROSS_PLATFORM_MAP[discord_id] = {
"qq_group_id": int(value.get("qq_group_id", 0)),
"name": value.get("name", "")
}
except (ValueError, AttributeError):
continue
# 格式2: 旧的字典形式(向后兼容)
if not CROSS_PLATFORM_MAP:
for key, value in mappings.items():
if isinstance(key, str) and key.isdigit():
CROSS_PLATFORM_MAP[int(key)] = {
"qq_group_id": int(value.get("qq_group_id", 0)),
"name": value.get("name", "")
}
logger.success(f"[CrossPlatform] 配置已重新加载: {len(CROSS_PLATFORM_MAP)} 个映射")
except Exception as e:
logger.error(f"[CrossPlatform] 重新加载配置失败: {e}")
# 插件加载时自动启动和加载配置
import asyncio
try:
asyncio.create_task(reload_config())
except Exception as e:
logger.error(f"[CrossPlatform] 重新加载配置失败: {e}")
try:
asyncio.create_task(start_cross_platform_subscription())
except Exception as e:
logger.error(f"[CrossPlatform] 启动订阅失败: {e}")
# 命令处理器
@matcher.command("cross_config", "跨平台配置", permission=Permission.ADMIN)
async def cross_config_command(event: MessageEvent):
"""
查看跨平台配置
"""
if not ENABLE_CROSS_PLATFORM:
await event.reply("跨平台功能已禁用")
return
config_lines = ["=== 跨平台映射配置 ==="]
if not CROSS_PLATFORM_MAP:
config_lines.append("当前没有配置任何映射")
else:
for discord_id, info in CROSS_PLATFORM_MAP.items():
discord_channel = f"Discord: {discord_id}"
qq_group = f"QQ: {info['qq_group_id']}"
name = info.get("name", "")
if name:
config_lines.append(f"{discord_channel}{qq_group} ({name})")
else:
config_lines.append(f"{discord_channel}{qq_group}")
await event.reply("\n".join(config_lines))
@matcher.command("cross_reload", "跨平台重载", permission=Permission.ADMIN)
async def cross_reload_command(event: MessageEvent):
"""
重新加载跨平台配置
"""
await reload_config()
await event.reply("跨平台配置已重载")
# 清理函数
def cleanup():
"""清理资源"""
asyncio.create_task(stop_cross_platform_subscription())

View File

@@ -6,7 +6,6 @@ Echo 与交互插件
from core.managers.command_manager import matcher from core.managers.command_manager import matcher
from core.bot import Bot from core.bot import Bot
from models.events.message import MessageEvent from models.events.message import MessageEvent
from core.permission import Permission
__plugin_meta__ = { __plugin_meta__ = {
"name": "echo", "name": "echo",
@@ -14,7 +13,7 @@ __plugin_meta__ = {
"usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞", "usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞",
} }
@matcher.command("echo", permission=Permission.ADMIN) @matcher.command("echo")
async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
""" """
处理 echo 指令,原样回复用户输入的内容 处理 echo 指令,原样回复用户输入的内容

View File

@@ -0,0 +1,41 @@
from core.plugin import SimplePlugin
from models.events.message import MessageEvent
# 插件元信息
__plugin_meta__ = {
"name": "极简插件示例",
"description": "演示面向新手的极简插件写法",
"usage": "/ping - 测试\n/add <a> <b> - 加法\n/greet <name> - 问候",
}
class MySimplePlugin(SimplePlugin):
async def ping(self, event: MessageEvent):
"""
发送 /ping 即可调用
"""
return "Pong! (来自极简插件)"
async def greet(self, event: MessageEvent, name: str):
"""
发送 /greet Neo 即可调用
"""
return f"你好, {name}!"
async def add(self, event: MessageEvent, a: int, b: int):
"""
发送 /add 10 20 即可调用
自动处理类型转换
"""
return f"{a} + {b} = {a + b}"
async def echo_all(self, event: MessageEvent, msg: str):
"""
只有一个参数时,会自动捕获所有剩余文本
发送 /echo_all 这是一个 测试 消息
msg 将会是 "这是一个 测试 消息"
"""
return f"复读: {msg}"
# 实例化插件以生效
plugin = MySimplePlugin()

View File

@@ -1,5 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re import re
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List, Union from typing import Optional, Dict, Any, List, Union
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
@@ -13,9 +17,25 @@ from bilibili_api.exceptions import ResponseCodeException
from core.config_loader import global_config from core.config_loader import global_config
from core.services.local_file_server import download_to_local from core.services.local_file_server import download_to_local
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
logger.warning("[B站解析器] aiohttp 未安装,音视频合并功能将不可用")
# bilibili_api-python 可用性标志 # bilibili_api-python 可用性标志
BILI_API_AVAILABLE = True BILI_API_AVAILABLE = True
# ffmpeg 可用性标志
FFMPEG_AVAILABLE = False
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
FFMPEG_AVAILABLE = True
logger.success("[B站解析器] ffmpeg 已安装,支持合并音视频")
except (subprocess.CalledProcessError, FileNotFoundError):
logger.warning("[B站解析器] ffmpeg 未安装,视频可能没有声音。建议安装 ffmpeg 以获得完整音视频体验")
# 显式指定使用 aiohttp避免与其他库冲突 # 显式指定使用 aiohttp避免与其他库冲突
try: try:
select_client("aiohttp") select_client("aiohttp")
@@ -273,20 +293,51 @@ class BiliParser(BaseParser):
if not cid: if not cid:
return None return None
# 获取下载链接数据 # 获取下载链接数据,使用 html5=True 获取网页格式(通常包含合并的音视频)
download_url_data = await v.get_download_url(cid=cid) download_url_data = await v.get_download_url(cid=cid, html5=True)
# 使用 VideoDownloadURLDataDetecter 解析数据 # 使用 VideoDownloadURLDataDetecter 解析数据
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
# 尝试获取 MP4 格式的合并流(包含音视频)
streams = detecter.detect_best_streams()
# 如果没有获取到流,尝试其他格式
if not streams:
logger.warning(f"[{self.name}] 无法获取 html5 格式,尝试获取其他格式...")
download_url_data = await v.get_download_url(cid=cid, html5=False)
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detecter.detect_best_streams() streams = detecter.detect_best_streams()
if streams: if streams:
# 获取视频直链 # 获取视频直链
video_direct_url = streams[0].url video_direct_url = streams[0].url
# 检查是否是分离的 m4s 流(可能没有声音)
is_m4s_stream = '.m4s' in video_direct_url
if is_m4s_stream:
logger.warning(f"[{self.name}] 检测到分离的 m4s 流B站 API 返回的 m4s 流通常是分离的视频和音频,需要客户端合并才能有声音")
logger.info(f"[{self.name}] 建议: 使用支持合并 m4s 流的下载工具(如 ffmpeg合并视频和音频")
logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...") logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...")
# B站下载需要 Referer 和 User-Agent
headers = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
# 调试:打印 download_url_data 结构
logger.debug(f"[{self.name}] download_url_data 类型: {type(download_url_data)}")
if isinstance(download_url_data, dict):
logger.debug(f"[{self.name}] download_url_data keys: {list(download_url_data.keys())}")
# 如果是 m4s 流且 ffmpeg 可用,先保存 download_url_data 供合并使用
if is_m4s_stream and FFMPEG_AVAILABLE and AIOHTTP_AVAILABLE:
local_url = await self._download_and_merge_m4s(video_direct_url, headers, bvid, download_url_data)
else:
# 使用本地文件服务器下载 # 使用本地文件服务器下载
local_url = await download_to_local(video_direct_url, timeout=120) local_url = await download_to_local(video_direct_url, timeout=120, headers=headers)
if local_url: if local_url:
logger.success(f"[{self.name}] 视频已下载到本地: {local_url}") logger.success(f"[{self.name}] 视频已下载到本地: {local_url}")
@@ -300,6 +351,199 @@ class BiliParser(BaseParser):
return None return None
async def _download_and_merge_m4s(self, video_url: str, headers: Dict[str, str], bvid: str, download_url_data: Dict) -> Optional[str]:
"""
下载并合并 m4s 视频和音频流
Args:
video_url (str): 视频流 URL
headers (Dict[str, str]): 请求头
bvid (str): BV号
download_url_data (Dict): 下载 URL 数据
Returns:
Optional[str]: 合并后的本地视频 URL如果失败则返回None
"""
if not FFMPEG_AVAILABLE:
logger.warning("[B站解析器] ffmpeg 不可用,无法合并音视频")
return None
if not AIOHTTP_AVAILABLE:
logger.warning("[B站解析器] aiohttp 不可用,无法合并音视频")
return None
try:
logger.info(f"[{self.name}] 开始下载并合并 m4s 音视频...")
# 创建共享的 ClientSession 用于下载
async with aiohttp.ClientSession() as session:
# 下载视频流
video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
video_file.close()
async with session.get(video_url, headers=headers, timeout=60) as response:
if response.status != 200:
logger.error(f"[{self.name}] 下载视频流失败: HTTP {response.status}")
return None
with open(video_file.name, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
logger.info(f"[{self.name}] 视频流下载完成: {video_file.name}")
# 从 download_url_data 中提取音频 URL
# B站的 dash 格式包含视频和音频流
audio_url = None
if isinstance(download_url_data, dict):
# 尝试 dash 格式(推荐)
if 'dash' in download_url_data and isinstance(download_url_data['dash'], dict):
dash = download_url_data['dash']
if 'audio' in dash and isinstance(dash['audio'], list) and len(dash['audio']) > 0:
# 获取第一个音频流
audio_item = dash['audio'][0]
audio_url = audio_item.get('baseUrl') or audio_item.get('url') or audio_item.get('backupUrl')
logger.debug(f"[{self.name}] 从 dash.audio 提取音频 URL: {audio_url is not None}")
elif 'audio' in dash and isinstance(dash['audio'], dict):
audio_url = dash['audio'].get('baseUrl') or dash['audio'].get('url')
logger.debug(f"[{self.name}] 从 dash.audio (dict) 提取音频 URL: {audio_url is not None}")
# 尝试 durl 格式(非分段流)
elif 'durl' in download_url_data:
if isinstance(download_url_data['durl'], list) and len(download_url_data['durl']) > 0:
main_url = download_url_data['durl'][0].get('url') or download_url_data['durl'][0].get('baseUrl')
if main_url:
video_url = main_url
logger.debug(f"[{self.name}] 使用 durl 主 URL: {video_url}")
if not audio_url and not video_url.startswith('http'):
logger.warning(f"[{self.name}] 无法从 download_url_data 中提取音频 URL")
logger.debug(f"[{self.name}] download_url_data 结构: {download_url_data}")
os.unlink(video_file.name)
return None
# 下载音频流
audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
audio_file.close()
async with session.get(audio_url, headers=headers, timeout=60) as response:
if response.status != 200:
logger.error(f"[{self.name}] 下载音频流失败: HTTP {response.status}")
os.unlink(video_file.name)
return None
with open(audio_file.name, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
logger.info(f"[{self.name}] 音频流下载完成: {audio_file.name}")
# 使用 ffmpeg 合并视频和音频
merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
merged_file.close()
# ffmpeg命令使用ffmpeg -i多次输入然后合并
# 先转换视频流(移除音频),然后添加音频流
ffmpeg_cmd = [
'ffmpeg', '-y', '-i', video_file.name, '-i', audio_file.name,
'-c:v', 'libx264', '-c:a', 'aac',
'-shortest', merged_file.name
]
logger.debug(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
# 详细记录ffmpeg输出
if result.stdout:
logger.debug(f"[{self.name}] ffmpeg stdout: {result.stdout}")
if result.stderr:
logger.debug(f"[{self.name}] ffmpeg stderr: {result.stderr}")
if result.returncode != 0:
logger.error(f"[{self.name}] ffmpeg 合并失败: {result.stderr}")
os.unlink(video_file.name)
os.unlink(audio_file.name)
return None
# 验证输出文件
merged_size = os.path.getsize(merged_file.name)
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
if merged_size == 0:
logger.error(f"[{self.name}] ffmpeg生成了空文件命令可能有问题")
logger.error(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
if result.stderr:
logger.error(f"[{self.name}] ffmpeg错误输出: {result.stderr}")
os.unlink(video_file.name)
os.unlink(audio_file.name)
return None
logger.info(f"[{self.name}] 音视频合并成功: {merged_file.name} ({merged_size} bytes)")
# 上传合并后的文件到本地文件服务器
from core.services.local_file_server import get_local_file_server
server = get_local_file_server()
if server:
try:
file_id = server._generate_file_id(f'file://{merged_file.name}')
dest_path = server.download_dir / file_id
# 获取合并文件大小
merged_size = os.path.getsize(merged_file.name)
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
if merged_size == 0:
logger.error(f"[{self.name}] 合并文件为空ffmpeg可能失败了")
merged_url = None
else:
# 复制本地文件到服务器目录
import shutil
shutil.copy2(merged_file.name, dest_path)
server.file_map[file_id] = dest_path
# 验证复制后的文件
if dest_path.exists():
dest_size = dest_path.stat().st_size
logger.debug(f"[{self.name}] 复制后文件大小: {dest_size} bytes")
if dest_size == merged_size:
merged_url = f"http://127.0.0.1:{server.port}/download?id={file_id}"
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
else:
logger.error(f"[{self.name}] 文件大小不匹配: 原始 {merged_size} vs 复制 {dest_size}")
merged_url = None
else:
logger.error(f"[{self.name}] 文件复制失败: {dest_path} 不存在")
merged_url = None
except Exception as e:
logger.error(f"[{self.name}] 上传合并文件失败: {e}")
merged_url = None
else:
merged_url = None
# 清理临时文件
try:
os.unlink(video_file.name)
os.unlink(audio_file.name)
os.unlink(merged_file.name)
except Exception as e:
logger.warning(f"[{self.name}] 清理临时文件失败: {e}")
if merged_url:
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
return merged_url
except Exception as e:
logger.error(f"[{self.name}] 合并音视频失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
""" """
格式化B站视频响应消息 格式化B站视频响应消息
@@ -314,8 +558,8 @@ class BiliParser(BaseParser):
# 检查视频时长 # 检查视频时长
video_message: Union[str, MessageSegment] video_message: Union[str, MessageSegment]
direct_url = None direct_url = None
if data['duration'] > 1200: # 20分钟 = 1200秒 if data['duration'] > 7200: # 2小时 = 7200秒
video_message = "视频时长超过20分钟,不进行解析。" video_message = "视频时长超过2小时,不进行解析。"
else: else:
# 构建完整的B站视频URL # 构建完整的B站视频URL
video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}" video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}"

View File

@@ -12,64 +12,83 @@ bs4==0.0.2
cachetools==6.2.4 cachetools==6.2.4
certifi==2026.1.4 certifi==2026.1.4
cffi==2.0.0 cffi==2.0.0
charset-normalizer==3.4.4 chardet==6.0.0.post1
colorama==0.4.6 click==8.3.1
coverage==7.13.1 concurrencytest==0.1.4
cryptography==46.0.3 ConfigParser==7.2.0
dill==0.4.0 contextlib2==21.6.0
docker==7.1.0 curio==1.6
docopt==0.6.2 curl_cffi==0.14.0
frozenlist==1.8.0 Cython==3.2.4
greenlet==3.3.0 cython==3.2.4
h11==0.16.0 defusedxml==0.7.1
httpcore==1.0.9 Django==6.0.2
httpx==0.27.0 dl==0.1.0
idna==3.11 docutils==0.22.4
iniconfig==2.3.0 email_validator==2.3.0
isort==7.0.0 etcd3==0.12.0
Jinja2==3.1.6 eval_type_backport==0.3.1
librt==0.7.7 eventlet==0.40.4
loguru==0.7.3 exceptiongroup==1.3.1
MarkupSafe==3.0.3 fastapi==0.134.0
mccabe==0.7.0 filelock==3.24.3
multidict==6.7.0 flake8==7.3.0
mypy==1.19.1 gunicorn==25.1.0
mypy_extensions==1.1.0 h2==4.3.0
orjson==3.11.5 html5lib==1.1
packaging==25.0 HTMLParser==0.0.2
pathspec==1.0.3 hypothesis==6.151.9
pillow==12.1.0 importlib_resources==6.5.2
pipreqs==0.4.13 ini2toml==0.15
platformdirs==4.5.1 ipykernel==7.2.0
playwright==1.57.0 ipython==9.10.0
pluggy==1.6.0 ipywidgets==8.1.8
propcache==0.4.1 jnius==1.1.0
psutil==5.9.8 js==1.0
pycparser==2.23 keyring==25.7.0
pydantic==2.12.5 lxml_html_clean==0.4.4
pydantic_core==2.41.5 mask==1.0.0
pyee==13.0.0 matplotlib==3.10.8
Pygments==2.19.2 mod==0.3.0
pylint==4.0.4 multiprocess==0.70.19
pytest==9.0.2 nacl==0.0.0
pytest-asyncio==1.3.0 olefile==0.47
pytest-cov==7.0.0 outcome==1.3.0.post0
pytest-mock==3.15.1 ox_profile==0.2.14
redis==7.1.0 paramiko==4.0.0
requests==2.32.5 pexpect==4.9.0
setuptools==80.9.0 pip_api==0.0.34
sniffio==1.3.1 pkg1==0.0.3
soupsieve==2.8.1 pox==0.3.7
toml==0.10.2 protobuf==7.34.0
tomlkit==0.13.3 pudb==2025.1.5
types-cachetools==6.2.0.20251022 pybreaker==1.4.1
types-docker==7.1.0.20251202 pycryptodome_test_vectors==1.0.22
types-paramiko==4.0.0.20250822 pyenchant==3.3.0
types-requests==2.32.4.20260107 PyInstaller==6.19.0
typing-inspection==0.4.2 pymongo==4.16.0
typing_extensions==4.15.0 pyodide==0.0.2
urllib3==2.6.3 PyOpenGL==3.1.10
watchdog==6.0.0 pyOpenSSL==25.3.0
websockets==16.0 PyQt6==6.10.2
yarg==0.1.10 PySide6==6.10.2
yarl==1.22.0 python-dotenv==1.2.1
python_bcrypt==0.3.2
python_socks==2.8.1
pywin32==311
requests==2.32.3
simplejson==3.20.2
socksio==1.0.0
speedups==1.4.0
Sphinx==9.1.0
sympy==1.14.0
trove_classifiers==2026.1.14.14
urllib3_secure_extra==0.1.0
uvloop==0.22.1
websocket_client==1.9.0
Werkzeug==3.1.6
winloop==0.5.0
wmi==1.5.1
xmlrpclib==1.0.1
xx==3.3.2
zope==5.13

41
scripts/add_plugins.py Normal file
View File

@@ -0,0 +1,41 @@
import os
import sys
def create_plugin(plugin_name):
base = os.path.dirname(os.path.abspath(__file__))
plugin_dir = os.path.join(base, "../plugins")
os.makedirs(plugin_dir, exist_ok=True)
file_name = f"{plugin_name.lower()}.py"
file_path = os.path.join(plugin_dir, file_name)
if os.path.exists(file_path):
print("插件已存在")
return
template = f'''from core.managers.command_manager import matcher
from core.bot import Bot
from models.events.message import MessageEvent
from core.permission import Permission
__plugin_meta__ = {{
"name": "{plugin_name.lower()}",
"description": "",
"usage": ""
}}
@matcher.command("{plugin_name.lower()}")
async def _(bot: Bot, event: MessageEvent):
pass
'''
with open(file_path, "w", encoding="utf-8") as f:
f.write(template)
print(f"插件创建成功:{file_path}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法python create_plugin.py 插件名")
sys.exit(1)
create_plugin(sys.argv[1])

36
test_image_fix.py Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""测试图片处理修复"""
import sys
sys.path.insert(0, '.')
print("测试 1: 检查 discord_adapter.py 导入")
try:
from adapters.discord_adapter import DiscordAdapter
print("✓ discord_adapter.py 导入成功")
except Exception as e:
print(f"✗ discord_adapter.py 导入失败: {e}")
print("\n测试 2: 检查 cross_platform.py 导入")
try:
import plugins.cross_platform as cp
print("✓ cross_platform.py 导入成功")
except Exception as e:
print(f"✗ cross_platform.py 导入失败: {e}")
print("\n测试 3: 检查 router.py 导入")
try:
from adapters.router import DiscordToOneBotConverter
print("✓ router.py 导入成功")
except Exception as e:
print(f"✗ router.py 导入失败: {e}")
print("\n测试 4: 检查 MessageSegment 导入")
try:
from models.message import MessageSegment
print("✓ MessageSegment 导入成功")
except Exception as e:
print(f"✗ MessageSegment 导入失败: {e}")
print("\n所有测试完成!")