Merge pull request #61 from Fairy-Oracle-Sanctuary/dev

Dev
This commit is contained in:
baby2016
2026-02-28 20:59:37 +08:00
committed by GitHub
15 changed files with 3545 additions and 59 deletions

211
REVERSE_WS_LOAD_BALANCE.md Normal file
View File

@@ -0,0 +1,211 @@
# 反向 WebSocket 负载均衡配置
## 功能特性
### 1. 负载均衡
当有多个前端NapCat等连接到反向WebSocket服务端时系统会自动进行负载均衡
- **自动选择负载最低的客户端**API调用时会自动选择负载最低的健康客户端
- **健康检查**系统会记录每个客户端的最后活动时间只选择最近30秒内有活动的客户端
- **负载计数**:每个客户端的消息处理次数会被记录,用于负载均衡计算
### 2. 防重复发送
系统实现了多层防重复机制:
- **事件ID检查**通过事件ID`id``post_id``time`)识别重复事件
- **消息锁机制**:使用异步锁防止同一消息被并发处理
- **双重检查**:在锁内再次检查是否重复,防止并发竞争条件
- **自动清理**定期清理过期的事件ID和消息锁默认60秒和300秒
### 3. 工作原理
```
┌─────────────┐
│ Frontend │
│ (NapCat) │
└──────┬──────┘
│ WebSocket
┌──────▼──────┐
│ │
│ ReverseWS │ ←── 负载均衡 + 防重复
│ Manager │
│ │
└──────┬──────┘
│ 处理事件
┌──────▼──────┐
│ Command │
│ Manager │
│ │
└─────────────┘
```
## 配置说明
`config.toml` 中配置:
```toml
[reverse_ws]
enabled = true # 启用反向WebSocket
host = "0.0.0.0" # 监听地址
port = 3002 # 监听端口
token = "" # 访问令牌(可选)
```
## 使用方法
### 启动配置
1.`config.toml` 中设置 `enabled = true`
2. 确保防火墙允许指定端口的连接
3. 启动机器人服务
### 前端配置
在 NapCat 等前端配置中,将 WebSocket 连接地址改为:
```
ws://your-server-ip:3002
```
多个前端可以连接到同一个地址,系统会自动进行负载均衡。
## API 调用
### 使用负载均衡(推荐)
```python
from core.managers import reverse_ws_manager
# 自动选择负载最低的健康客户端
response = await reverse_ws_manager.call_api(
action="send_group_msg",
params={
"group_id": 123456,
"message": "Hello"
},
use_load_balance=True # 默认为 True
)
```
### 指定客户端
```python
# 向特定客户端发送
response = await reverse_ws_manager.call_api(
action="send_group_msg",
params={
"group_id": 123456,
"message": "Hello"
},
client_id="specific-client-id",
use_load_balance=False
)
```
### 获取客户端信息
```python
# 获取所有连接的客户端
clients = reverse_ws_manager.get_connected_clients()
# 获取健康的客户端最近30秒有活动
healthy = reverse_ws_manager.get_healthy_clients()
# 获取负载最低的客户端
least_load = reverse_ws_manager.get_client_with_least_load()
```
## 负载均衡策略
系统采用以下策略选择客户端:
1. **健康检查**只选择最近30秒内有活动的客户端
2. **负载计数**:在健康客户端中选择负载最低的
3. **自动切换**:如果负载最低的客户端不健康,自动选择下一个
## 防重复机制
### 事件ID检查
系统通过以下方式识别事件:
- 优先使用 `id` 字段
- 其次使用 `post_id` 字段
- 最后使用 `time` 字段
### 消息锁
消息处理使用异步锁,防止并发重复处理:
```python
async with self._get_message_lock(message_key):
# 处理消息
await matcher.handle_event(None, event)
```
### 自动清理
系统每10秒清理一次过期数据
- 事件ID保留时间60秒
- 消息锁保留时间300秒
## 监控和调试
### 查看客户端状态
```python
# 查看所有客户端
print("所有客户端:", reverse_ws_manager.get_connected_clients())
# 查看健康客户端
print("健康客户端:", reverse_ws_manager.get_healthy_clients())
# 查看负载情况
print("客户端负载:", reverse_ws_manager._client_load)
# 查看健康时间
print("客户端健康时间:", reverse_ws_manager._client_health)
```
### 日志输出
系统会输出以下日志:
- 客户端连接/断开
- 检测到重复事件
- 负载均衡选择
- API调用结果
## 最佳实践
1. **多前端部署**建议部署2-3个前端实例进行负载均衡
2. **健康检查**:定期检查前端连接状态
3. **监控日志**:关注重复事件日志,排查网络问题
4. **合理设置TTL**根据消息频率调整事件ID保留时间
## 故障排查
### 问题:消息重复处理
**原因**:网络延迟导致前端重复发送
**解决**检查事件ID是否正确设置系统已自动处理
### 问题API调用超时
**原因**:选择的客户端不健康或网络问题
**解决**:系统会自动切换到其他健康客户端
### 问题:所有客户端都不健康
**原因**:前端断开连接或网络问题
**解决**:检查前端连接状态和网络连接

View File

@@ -9,6 +9,13 @@ token = "KoIAF.mcEHzxrPYF"
# 重连间隔(秒)
reconnect_interval = 5
[reverse_ws]
enabled = true # 是否启用
host = "0.0.0.0" # 监听地址
port = 3002 # 监听端口
token = ""
# Bot 基础配置
[bot]
# 命令前缀列表
@@ -21,13 +28,28 @@ permission_denied_message = "权限不足,需要 {permission_name} 权限"
# Redis 配置
[redis]
# Redis 主机地址
host = "101.36.126.55"
host = "114.66.61.199"
# Redis 端口
port = 6379
port = 37080
# Redis 数据库编号
db = 0
# Redis 密码
password = "redis_5fCmnE"
password = "redis_n7Ke76"
# MySQL 配置
[mysql]
# MySQL 主机地址
host = "114.66.61.199"
# MySQL 端口
port = 42398
# MySQL 用户名
user = "neobot"
# MySQL 密码
password = "neobot"
# MySQL 数据库名称
db = "neobot"
# Docker 配置
[docker]

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import tomllib
from pydantic import ValidationError
from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel, ImageManagerModel
from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel, ImageManagerModel, MySQLModel, ReverseWSModel
from .utils.logger import ModuleLogger
from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError
@@ -59,9 +59,9 @@ class Config:
error_details.append(error_msg)
validation_error = ConfigValidationError(
message="配置验证失败",
original_error=e
message="配置验证失败"
)
validation_error.original_error = e
self.logger.error("配置验证失败,请检查 `config.toml` 文件中的以下错误:")
for detail in error_details:
@@ -71,17 +71,17 @@ class Config:
raise validation_error
except tomllib.TOMLDecodeError as e:
error = ConfigError(
message=f"TOML解析错误: {str(e)}",
original_error=e
message=f"TOML解析错误: {str(e)}"
)
error.original_error = e
self.logger.error(f"加载配置文件时发生TOML解析错误: {error.message}")
self.logger.log_custom_exception(error)
raise error
except Exception as e:
error = ConfigError(
message=f"加载配置文件时发生未知错误: {str(e)}",
original_error=e
message=f"加载配置文件时发生未知错误: {str(e)}"
)
error.original_error = e
self.logger.exception(f"加载配置文件时发生未知错误: {error.message}")
self.logger.log_custom_exception(error)
raise error
@@ -107,6 +107,13 @@ class Config:
获取 Redis 配置
"""
return self._model.redis
@property
def mysql(self) -> MySQLModel:
"""
获取 MySQL 配置
"""
return self._model.mysql
@property
def docker(self) -> DockerModel:
@@ -122,6 +129,14 @@ class Config:
"""
return self._model.image_manager
@property
def reverse_ws(self) -> ReverseWSModel:
"""
获取反向 WebSocket 配置
"""
return self._model.reverse_ws
# 实例化全局配置对象
global_config = Config()

View File

@@ -25,6 +25,15 @@ class BotModel(BaseModel):
ignore_self_message: bool = True
permission_denied_message: str = "权限不足,需要 {permission_name} 权限"
class ReverseWSModel(BaseModel):
"""
对应 `config.toml` 中的 `[reverse_ws]` 配置块。
"""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 3002
token: Optional[str] = None
class RedisModel(BaseModel):
"""
@@ -36,6 +45,19 @@ class RedisModel(BaseModel):
password: str
class MySQLModel(BaseModel):
"""
对应 `config.toml` 中的 `[mysql]` 配置块。
"""
host: str
port: int
user: str
password: str
db: str
charset: str = "utf8mb4"
class DockerModel(BaseModel):
"""
对应 `config.toml` 中的 `[docker]` 配置块。
@@ -57,6 +79,16 @@ class ImageManagerModel(BaseModel):
image_width: int = 1080
class ReverseWSModel(BaseModel):
"""
对应 `config.toml` 中的 `[reverse_ws]` 配置块。
"""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 3002
token: Optional[str] = None
class ConfigModel(BaseModel):
"""
顶层配置模型,整合了所有子配置块。
@@ -64,7 +96,9 @@ class ConfigModel(BaseModel):
napcat_ws: NapCatWSModel
bot: BotModel
redis: RedisModel
mysql: MySQLModel
docker: DockerModel
image_manager: ImageManagerModel
reverse_ws: ReverseWSModel

View File

@@ -8,8 +8,10 @@ from .command_manager import matcher as command_manager
from .permission_manager import PermissionManager
from .plugin_manager import PluginManager
from .redis_manager import RedisManager
from .mysql_manager import MySQLManager
from .browser_manager import BrowserManager
from .image_manager import ImageManager
from .reverse_ws_manager import ReverseWSManager
# --- 实例化所有单例管理器 ---
@@ -26,18 +28,26 @@ plugin_manager = PluginManager(command_manager)
# Redis 管理器
redis_manager = RedisManager()
# MySQL 管理器
mysql_manager = MySQLManager()
# 浏览器管理器
browser_manager = BrowserManager()
# 图片管理器
image_manager = ImageManager()
# 反向 WebSocket 管理器
reverse_ws_manager = ReverseWSManager()
__all__ = [
"permission_manager",
"command_manager",
"matcher",
"plugin_manager",
"redis_manager",
"mysql_manager",
"browser_manager",
"image_manager",
"reverse_ws_manager",
]

View File

@@ -35,7 +35,7 @@ class ImageManager(Singleton):
# 模板缓存
self._template_cache: Dict[str, Template] = {}
async def render_template(self, template_name: str, data: Dict[str, Any], output_name: str = "output.png", quality: int = 80, image_type: str = "png") -> Optional[str]:
async def render_template(self, template_name: str, data: Dict[str, Any], output_name: str = "output.png", quality: int = 80, image_type: str = "png", width: int = 1920, height: int = 1080) -> Optional[str]:
"""
使用 Playwright 渲染 Jinja2 模板并保存为图片文件
@@ -45,6 +45,8 @@ class ImageManager(Singleton):
output_name (str, optional): 输出文件名. Defaults to "output.png".
quality (int, optional): JPEG 质量 (0-100). 仅在 image_type 为 jpeg 时有效. Defaults to 80.
image_type (str, optional): 图片类型 ('png' or 'jpeg'). Defaults to "png".
width (int, optional): 图片宽度. Defaults to 1920.
height (int, optional): 图片高度. Defaults to 1080.
Returns:
Optional[str]: 生成图片的绝对路径,如果失败则返回 None
@@ -74,8 +76,8 @@ class ImageManager(Singleton):
return None
try:
width = global_config.image_manager.image_width
height = global_config.image_manager.image_height
width = data.get("width", width)
height = data.get("height", height)
await page.set_viewport_size({"width": width, "height": height})
# 加载内容

View File

@@ -0,0 +1,148 @@
import aiomysql
from ..config_loader import global_config as config
from ..utils.logger import logger
from ..utils.singleton import Singleton
class MySQLManager(Singleton):
"""
MySQL 数据库连接管理器(异步单例)
"""
_pool = None
def __init__(self):
"""
初始化 MySQL 管理器
"""
super().__init__()
async def initialize(self):
"""
异步初始化 MySQL 连接池并进行健康检查
"""
if self._pool is None:
try:
mysql_config = config.mysql
host = mysql_config.host
port = mysql_config.port
user = mysql_config.user
password = mysql_config.password
db = mysql_config.db
charset = mysql_config.charset
logger.info(f"正在尝试连接 MySQL: {host}:{port}, DB: {db}")
self._pool = await aiomysql.create_pool(
host=host,
port=port,
user=user,
password=password,
db=db,
charset=charset,
autocommit=False,
maxsize=10,
minsize=1
)
async with self._pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
result = await cur.fetchone()
if result and result[0] == 1:
logger.success("MySQL 连接成功!")
else:
logger.error("MySQL 连接失败: 健康检查失败")
except Exception as e:
logger.exception(f"MySQL 初始化时发生未知错误: {e}")
self._pool = None
@property
def pool(self):
"""
获取 MySQL 连接池实例
"""
if self._pool is None:
raise ConnectionError("MySQL 未初始化或连接失败,请先调用 initialize()")
return self._pool
async def execute(self, sql: str, args: tuple = None):
"""
执行 SQL 语句(用于 INSERT、UPDATE、DELETE
Args:
sql: SQL 语句
args: 参数元组
Returns:
影响的行数
"""
async with self._pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.rowcount
async def fetchone(self, sql: str, args: tuple = None):
"""
查询单条记录
Args:
sql: SQL 语句
args: 参数元组
Returns:
单条记录字典
"""
async with self._pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
async def fetchall(self, sql: str, args: tuple = None):
"""
查询多条记录
Args:
sql: SQL 语句
args: 参数元组
Returns:
记录列表
"""
async with self._pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
async def begin_transaction(self):
"""
开始事务
Returns:
事务连接对象
"""
conn = await self._pool.acquire()
return conn
async def commit_transaction(self, conn):
"""
提交事务
Args:
conn: 事务连接对象
"""
await conn.commit()
await self._pool.release(conn)
async def rollback_transaction(self, conn):
"""
回滚事务
Args:
conn: 事务连接对象
"""
await conn.rollback()
await self._pool.release(conn)
mysql_manager = MySQLManager()

View File

@@ -0,0 +1,438 @@
"""
反向 WebSocket 管理器模块
该模块提供了反向 WebSocket 服务端功能,允许 OneBot 实现(如 NapCat
主动连接到机器人服务器,而不是由机器人主动连接到 OneBot 实现。
"""
import asyncio
import orjson
import websockets
from websockets.server import WebSocketServerProtocol
from typing import Dict, Any, Optional, Set
from datetime import datetime
import uuid
import random
from ..config_loader import global_config
from ..utils.logger import ModuleLogger
from ..utils.exceptions import WebSocketError, WebSocketConnectionError
from ..utils.error_codes import ErrorCode, create_error_response
from .command_manager import matcher
from models.events.factory import EventFactory
from .redis_manager import redis_manager
class ReverseWSManager:
"""
反向 WebSocket 管理器,作为服务端接收 OneBot 实现的连接。
支持多前端负载均衡和防重复发送机制。
"""
def __init__(self):
"""
初始化反向 WebSocket 管理器。
"""
self.server = None
self.clients: Dict[str, WebSocketServerProtocol] = {}
self.client_self_ids: Dict[str, int] = {}
self._pending_requests: Dict[str, asyncio.Future] = {}
self._running = False
self.logger = ModuleLogger("ReverseWSManager")
# 负载均衡相关
self._active_client_id: Optional[str] = None # 当前活跃的客户端(用于消息发送)
self._client_load: Dict[str, int] = {} # 客户端负载计数
self._client_health: Dict[str, datetime] = {} # 客户端健康检查时间
# 防重复发送相关
self._processed_events: Dict[str, datetime] = {} # 已处理的事件ID和时间
self._event_ttl = 60 # 事件ID保留时间
self._message_locks: Dict[str, asyncio.Lock] = {} # 消息处理锁
self._lock_ttl = 300 # 锁保留时间(秒)
# 启动清理任务
self._cleanup_task = None
async def start(self, host: str = "0.0.0.0", port: int = 3002) -> None:
"""
启动反向 WebSocket 服务端。
Args:
host: 监听地址,默认为 0.0.0.0
port: 监听端口,默认为 3002
"""
self._running = True
self.server = await websockets.serve(
self._handle_client,
host,
port,
ping_interval=20,
ping_timeout=20
)
self.logger.success(f"反向 WebSocket 服务端已启动: ws://{host}:{port}")
# 启动清理任务
self._cleanup_task = asyncio.create_task(self._cleanup_expired_data())
async def stop(self) -> None:
"""
停止反向 WebSocket 服务端。
"""
self._running = False
# 停止清理任务
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
if self.server:
self.server.close()
await self.server.wait_closed()
for client_id in list(self.clients.keys()):
await self._disconnect_client(client_id)
self.logger.success("反向 WebSocket 服务端已停止")
async def _handle_client(
self,
websocket: WebSocketServerProtocol,
path: str = None
) -> None:
"""
处理客户端连接。
Args:
websocket: WebSocket 连接对象
path: 连接路径
"""
client_id = str(uuid.uuid4())
self.clients[client_id] = websocket
self.logger.info(f"新客户端连接: {client_id}")
try:
async for message in websocket:
try:
data = orjson.loads(message)
# 处理 API 响应
echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id)
if not future.done():
future.set_result(data)
continue
# 处理上报事件
if "post_type" in data:
asyncio.create_task(self._on_event(client_id, data))
except orjson.JSONDecodeError as e:
self.logger.error(f"JSON 解析失败: {str(e)}")
except Exception as e:
self.logger.exception(f"处理消息异常: {str(e)}")
except websockets.exceptions.ConnectionClosed as e:
self.logger.info(f"客户端断开连接: {client_id} - {str(e)}")
except Exception as e:
self.logger.exception(f"客户端异常: {str(e)}")
finally:
await self._disconnect_client(client_id)
async def _cleanup_expired_data(self) -> None:
"""
清理过期的事件ID和消息锁
"""
while self._running:
try:
await asyncio.sleep(10) # 每10秒清理一次
current_time = datetime.now()
# 清理过期的事件ID
expired_events = [
event_id for event_id, timestamp in self._processed_events.items()
if (current_time - timestamp).total_seconds() > self._event_ttl
]
for event_id in expired_events:
del self._processed_events[event_id]
# 清理过期的消息锁
expired_locks = [
lock_key for lock_key, timestamp in self._message_locks.items()
if (current_time - timestamp).total_seconds() > self._lock_ttl
]
for lock_key in expired_locks:
del self._message_locks[lock_key]
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"清理过期数据失败: {str(e)}")
async def _disconnect_client(self, client_id: str) -> None:
"""
断开客户端连接。
Args:
client_id: 客户端 ID
"""
if client_id in self.clients:
del self.clients[client_id]
if client_id in self.client_self_ids:
del self.client_self_ids[client_id]
async def _on_event(self, client_id: str, event_data: Dict[str, Any]) -> None:
"""
处理事件,包含防重复发送和负载均衡逻辑。
Args:
client_id: 客户端 ID
event_data: 事件数据
"""
try:
event = EventFactory.create_event(event_data)
if hasattr(event, 'self_id'):
self.client_self_ids[client_id] = event.self_id
event.bot = None
# 记录客户端健康状态
self._client_health[client_id] = datetime.now()
# 检查是否为重复事件
if self._is_duplicate_event(event_data):
self.logger.debug(f"检测到重复事件,已忽略: {event_data.get('id')}")
return
# 标记事件已处理
self._mark_event_processed(event_data)
# 处理消息事件
if event.post_type == "message":
sender_name = event.sender.nickname if hasattr(event, "sender") and event.sender else "Unknown"
message_type = getattr(event, "message_type", "Unknown")
user_id = getattr(event, "user_id", "Unknown")
raw_message = getattr(event, "raw_message", "")
self.logger.info(f"[消息] {message_type} | {user_id}({sender_name}): {raw_message}")
# 使用锁防止同一消息被多次处理
message_key = self._get_message_key(event_data)
async with self._get_message_lock(message_key):
# 再次检查是否重复(防止并发问题)
if self._is_duplicate_event(event_data):
self.logger.debug(f"并发检测到重复消息,已忽略: {message_key}")
return
self._mark_event_processed(event_data)
# 更新客户端负载
self._update_client_load(client_id)
await matcher.handle_event(None, event)
elif event.post_type == "notice":
notice_type = getattr(event, "notice_type", "Unknown")
self.logger.info(f"[通知] {notice_type}")
await matcher.handle_event(None, event)
elif event.post_type == "request":
request_type = getattr(event, "request_type", "Unknown")
self.logger.info(f"[请求] {request_type}")
await matcher.handle_event(None, event)
elif event.post_type == "meta_event":
meta_event_type = getattr(event, "meta_event_type", "Unknown")
self.logger.debug(f"[元事件] {meta_event_type}")
await matcher.handle_event(None, event)
except Exception as e:
self.logger.exception(f"事件处理异常: {str(e)}")
async def call_api(
self,
action: str,
params: Optional[Dict[Any, Any]] = None,
client_id: Optional[str] = None,
use_load_balance: bool = True
) -> Dict[Any, Any]:
"""
向客户端发送 API 请求。
Args:
action: API 动作名称
params: API 参数
client_id: 客户端 ID如果为 None 则根据负载均衡策略选择
use_load_balance: 是否使用负载均衡,默认为 True
Returns:
API 响应数据
"""
if not self.clients:
self.logger.error("调用 API 失败: 没有可用的客户端连接")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="没有可用的客户端连接",
data={"action": action, "params": params}
)
# 如果没有指定客户端,使用负载均衡
if client_id is None and use_load_balance:
# 优先选择健康的客户端
healthy_clients = self.get_healthy_clients()
if healthy_clients:
# 选择负载最低的客户端
client_id = self.get_client_with_least_load()
if client_id is None and healthy_clients:
client_id = list(healthy_clients.keys())[0]
else:
# 如果没有健康客户端,使用所有客户端中的一个
client_id = list(self.clients.keys())[0]
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
try:
targets = [client_id] if client_id else list(self.clients.keys())
for cid in targets:
if cid in self.clients:
await self.clients[cid].send(orjson.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)
def get_connected_clients(self) -> Dict[str, int]:
"""
获取已连接的客户端列表。
Returns:
客户端 ID 和 self_id 的映射字典
"""
return self.client_self_ids.copy()
def _is_duplicate_event(self, event_data: Dict[str, Any]) -> bool:
"""
检查是否为重复事件。
Args:
event_data: 事件数据
Returns:
是否为重复事件
"""
event_id = event_data.get('id') or event_data.get('post_id') or event_data.get('time')
if not event_id:
return False
event_key = f"{event_data.get('post_type')}:{event_id}"
return event_key in self._processed_events
def _mark_event_processed(self, event_data: Dict[str, Any]) -> None:
"""
标记事件已处理。
Args:
event_data: 事件数据
"""
event_id = event_data.get('id') or event_data.get('post_id') or event_data.get('time')
if not event_id:
return
event_key = f"{event_data.get('post_type')}:{event_id}"
self._processed_events[event_key] = datetime.now()
def _get_message_key(self, event_data: Dict[str, Any]) -> str:
"""
获取消息唯一标识。
Args:
event_data: 事件数据
Returns:
消息唯一标识
"""
if event_data.get('post_type') == 'message':
message_id = event_data.get('message_id') or event_data.get('id')
user_id = event_data.get('user_id')
return f"msg:{message_id}:{user_id}"
return str(uuid.uuid4())
def _get_message_lock(self, key: str) -> asyncio.Lock:
"""
获取消息处理锁。
Args:
key: 消息唯一标识
Returns:
asyncio.Lock 实例
"""
if key not in self._message_locks:
self._message_locks[key] = asyncio.Lock()
return self._message_locks[key]
def _update_client_load(self, client_id: str) -> None:
"""
更新客户端负载。
Args:
client_id: 客户端 ID
"""
if client_id not in self._client_load:
self._client_load[client_id] = 0
self._client_load[client_id] += 1
def get_client_with_least_load(self) -> Optional[str]:
"""
获取负载最低的客户端。
Returns:
客户端 ID如果没有客户端则返回 None
"""
if not self._client_load:
return None
return min(self._client_load.keys(), key=lambda k: self._client_load[k])
def get_healthy_clients(self) -> Dict[str, int]:
"""
获取健康的客户端列表最近30秒内有活动
Returns:
健康的客户端 ID 和 self_id 的映射字典
"""
current_time = datetime.now()
healthy = {}
for client_id, last_health in self._client_health.items():
if (current_time - last_health).total_seconds() < 30:
if client_id in self.client_self_ids:
healthy[client_id] = self.client_self_ids[client_id]
return healthy
reverse_ws_manager = ReverseWSManager()

View File

@@ -83,14 +83,15 @@ class ConfigError(Exception):
配置相关错误的基类。
Args:
section: 配置部分名称
key: 配置项名称
message: 错误消息
section: 配置部分名称(可选)
key: 配置项名称(可选)
message: 错误消息(可选)
"""
def __init__(self, section=None, key=None, message=None):
self.section = section
self.key = key
self.message = message
self.original_error = None
if section and key and message:
super().__init__(f"配置错误 [{section}.{key}]: {message}")

View File

@@ -0,0 +1,58 @@
"""
反向 WebSocket 使用示例
该文件展示了如何使用反向 WebSocket 功能。
"""
from core.managers import reverse_ws_manager
async def example_usage():
"""
使用示例
"""
# 1. 启动反向 WebSocket 服务端
await reverse_ws_manager.start(host="0.0.0.0", port=3002)
# 2. 等待客户端连接
# 此时 OneBot 实现(如 NapCat应该连接到 ws://your-server-ip:3002
# 3. 查看已连接的客户端
connected_clients = reverse_ws_manager.get_connected_clients()
print(f"已连接的客户端: {connected_clients}")
# 4. 查看健康的客户端
healthy_clients = reverse_ws_manager.get_healthy_clients()
print(f"健康的客户端: {healthy_clients}")
# 5. 调用 API使用负载均衡
response = await reverse_ws_manager.call_api(
action="get_login_info",
params={},
use_load_balance=True # 启用负载均衡
)
print(f"API 响应: {response}")
# 6. 调用 API向特定客户端发送
if connected_clients:
client_id = list(connected_clients.keys())[0]
response = await reverse_ws_manager.call_api(
action="get_login_info",
params={},
client_id=client_id,
use_load_balance=False # 不使用负载均衡
)
print(f"特定客户端 API 响应: {response}")
# 7. 获取负载最低的客户端
least_load_client = reverse_ws_manager.get_client_with_least_load()
if least_load_client:
print(f"负载最低的客户端: {least_load_client}")
# 8. 停止服务端
await reverse_ws_manager.stop()
if __name__ == "__main__":
import asyncio
asyncio.run(example_usage())

15
main.py
View File

@@ -15,7 +15,7 @@ from core.utils.logger import logger
# 核心模块导入
from core.ws import WS
from core.managers import plugin_manager, matcher, permission_manager
from core.managers import plugin_manager, matcher, permission_manager, reverse_ws_manager
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
@@ -142,6 +142,15 @@ async def main():
# 初始化浏览器管理器 (使用页面池)
await browser_manager.init_pool(size=3)
# 启动反向 WebSocket 服务端(如果启用)
if config.reverse_ws.enabled:
logger.info("正在启动反向 WebSocket 服务端...")
asyncio.create_task(reverse_ws_manager.start(
host=config.reverse_ws.host,
port=config.reverse_ws.port
))
logger.success(f"反向 WebSocket 服务端已启动: ws://{config.reverse_ws.host}:{config.reverse_ws.port}")
# 启动文件监控
# 监控 plugins 目录
plugin_path = os.path.join(os.path.dirname(__file__), "plugins")
@@ -186,6 +195,10 @@ async def main():
if websocket_client:
await websocket_client.close()
# 关闭反向 WebSocket 服务端
if config.reverse_ws.enabled and reverse_ws_manager.server:
await reverse_ws_manager.stop()
# 关闭浏览器管理器
await browser_manager.shutdown()

View File

@@ -12,7 +12,7 @@ from models.message import MessageSegment
__plugin_meta__ = {
"name": "furry",
"description": "处理 /furry 指令发送furry出毛图片",
"usage": "/furry - 发送一条furry图",
"usage": "/furry - 发送一条furry图1-10",
}
@matcher.command("furry")

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,7 @@ from core.managers.command_manager import matcher
from core.managers.image_manager import image_manager
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from .resource.city_code import CITY_CODES
# 插件元数据
__plugin_meta__ = {
"name": "weather",
@@ -17,46 +17,6 @@ __plugin_meta__ = {
"usage": "/天气 [城市代码] - 查询指定城市的天气信息\n例如:/天气 101190207 (南京)",
}
# 城市代码映射(可以扩展)
CITY_CODES = {
"北京": "101010100",
"上海": "101020100",
"广州": "101280101",
"深圳": "101280601",
"南京": "101190101",
"苏州": "101190401",
"杭州": "101210101",
"武汉": "101200101",
"成都": "101270101",
"重庆": "101040100",
"西安": "101110101",
"天津": "101030100",
"沈阳": "101070101",
"大连": "101070201",
"青岛": "101120201",
"济南": "101120101",
"郑州": "101180101",
"长沙": "101250101",
"南昌": "101240101",
"合肥": "101220101",
"福州": "101230101",
"厦门": "101230201",
"南宁": "101300101",
"海口": "101310101",
"昆明": "101290101",
"贵阳": "101260101",
"拉萨": "101140101",
"兰州": "101160101",
"西宁": "101150101",
"银川": "101170101",
"乌鲁木齐": "101130101",
"哈尔滨": "101050101",
"长春": "101060101",
"呼和浩特": "101080101",
"太原": "101100101",
"石家庄": "101090101",
}
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}

View File

@@ -1,5 +1,6 @@
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiomysql==0.2.0
aiosignal==1.4.0
annotated-types==0.7.0
anyio==4.12.1