This commit is contained in:
2025-12-31 22:01:35 +08:00
commit 2cba589b2e
13 changed files with 465 additions and 0 deletions

141
.gitignore vendored Normal file
View File

@@ -0,0 +1,141 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# End of https://www.toptal.com/developers/gitignore/api/python

22
base_plugins/__init__.py Normal file
View File

@@ -0,0 +1,22 @@
import os
import importlib
import pkgutil
def load_all_plugins():
"""扫描并加载当前包下所有的插件(支持文件和文件夹)"""
package_name = __package__
package_path = [os.path.dirname(__file__)]
print(f" 正在从 {package_name} 加载插件...")
for loader, module_name, is_pkg in pkgutil.iter_modules(package_path):
full_module_name = f"{package_name}.{module_name}"
try:
importlib.import_module(full_module_name)
type_str = "" if is_pkg else "文件"
print(f" [{type_str}] 成功加载: {module_name}")
except Exception as e:
print(f" 加载插件 {module_name} 失败: {e}")
load_all_plugins()

7
config.toml Normal file
View File

@@ -0,0 +1,7 @@
[napcat_ws]
uri = "ws://127.0.0.1:30004"
token = "12333"
reconnect_interval = 5
[bot]
command = ["/"]

5
core/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .ws import WS
from .command_manager import matcher
from .config_loader import global_config
__all__ = ["WS", "matcher", "global_config"]

55
core/command_manager.py Normal file
View File

@@ -0,0 +1,55 @@
from typing import Any
import inspect
from .config_loader import global_config
comm = global_config.bot.get("command")
class CommandManager:
def __init__(self, prefixes=(tuple[Any, ...] (comm))):
self.prefixes = prefixes
self.commands = {} # 存储指令函数
def command(self, name: str):
"""装饰器:注册指令"""
def decorator(func):
self.commands[name] = func
return func
return decorator
async def handle_message(self, bot, event):
"""解析并分发指令"""
raw_text = event.raw_message.strip()
# 1. 检查前缀
prefix_found = None
for p in self.prefixes:
if raw_text.startswith(p):
prefix_found = p
break
if not prefix_found:
return # 不是指令,跳过
# 2. 拆分指令和参数
full_cmd = raw_text[len(prefix_found):].split()
if not full_cmd:
return
cmd_name = full_cmd[0]
args = full_cmd[1:]
# 3. 查找并执行
if cmd_name in self.commands:
func = self.commands[cmd_name]
# 自动注入参数 (判断函数是否需要 args)
sig = inspect.signature(func)
if "args" in sig.parameters:
await func(bot, event, args)
else:
await func(bot, event)
# 实例化全局管理器
qianzhui = global_config.bot.get("command")
matcher = CommandManager(prefixes=(tuple[Any, ...] (comm)))

38
core/config_loader.py Normal file
View File

@@ -0,0 +1,38 @@
import tomllib
from pathlib import Path
from typing import Any, Dict
class Config:
def __init__(self, file_path: str = "config.toml"):
self.path = Path(file_path)
self._data: Dict[str, Any] = {}
self.load()
def load(self):
if not self.path.exists():
raise FileNotFoundError(f"配置文件 {self.path} 未找到!")
with open(self.path, "rb") as f:
self._data = tomllib.load(f)
# 通过属性访问配置
@property
def napcat_ws(self) -> dict:
return self._data.get("napcat_ws", {})
@property
def bot(self) -> dict:
return self._data.get("bot", {})
@property
def features(self) -> dict:
return self._data.get("features", {})
# 实例化全局配置对象
global_config = Config()
if __name__ == "__main__":
print(global_config.napcat_ws)
print(global_config.bot.get("command"))
print(type(global_config.bot.get("command")) is list)
print(global_config.features)

112
core/ws.py Normal file
View File

@@ -0,0 +1,112 @@
import asyncio
import json
import uuid
import websockets
import traceback
from .command_manager import matcher
from .config_loader import global_config
from models import Event
class WS:
def __init__(self):
# 读取参数
cfg = global_config.napcat_ws
self.url = cfg.get("uri")
self.token = cfg.get("token")
self.reconnect_interval = cfg.get("reconnect_interval", 5)
self.ws = None # 存储当前的活跃连接
self._pending_requests = {} # 存储等待 API 返回的 Future 对象
async def connect(self):
"""主连接循环:负责建立连接并处理断线重连"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
while True:
try:
print(f" 正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(self.url, additional_headers=headers) as websocket:
self.ws = websocket
print(" 连接成功!")
# 进入阻塞式的监听循环
await self._listen_loop(websocket)
except (websockets.exceptions.ConnectionClosed, ConnectionRefusedError) as e:
print(f" 连接断开或服务器拒绝访问: {e}")
except Exception as e:
print(f" 运行异常: {e}")
traceback.print_exc()
print(f" {self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket):
"""核心监听循环:负责从 WebSocket 读取原始数据并分类分发"""
async for message in websocket:
try:
data = json.loads(message)
# 1. 优先处理 API 响应 (带有 echo 字段)
echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id)
if not future.done():
future.set_result(data) # 唤醒对应的 call_api 函数
continue # 处理完 API 响应后跳过本次循环
# 2. 处理上报的事件 (含有 post_type 字段)
if "post_type" in data:
# 使用 create_task 异步执行,确保复杂的业务逻辑不阻塞消息接收
asyncio.create_task(self.on_event(data))
except Exception as e:
print(f" 解析消息异常: {e}")
async def on_event(self, raw_data: dict):
"""事件分发层:将原始字典转换为 Event 对象并交给 matcher"""
# 仅处理消息事件 (message),忽略元事件 (meta_event) 或请求事件 (request)
if raw_data.get("post_type") != "message":
return
try:
# 将字典解析为强类型的 Event 对象
event = Event.from_dict(raw_data)
# 调试日志:可以看到收到的每条指令内容
print(f" 收到消息: [{event.user_id}] -> {event.raw_message}")
# 调用插件系统的入口函数
await matcher.handle_message(self, event)
except Exception as e:
print(f" 事件分发失败: {e}")
async def call_api(self, action: str, params: dict = None):
"""公有 API供插件调用发送指令并异步等待结果"""
if not self.ws or self.ws.closed:
return {"status": "failed", "msg": "websocket not connected"}
# 创建唯一的 echo ID
echo_id = str(uuid.uuid4())
payload = {
"action": action,
"params": params or {},
"echo": echo_id
}
# 创建一个 Future 对象用于等待返回结果
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
# 通过 WebSocket 发送请求
await self.ws.send(json.dumps(payload))
try:
# 设置 100 秒超时,防止 API 请求永久挂起
return await asyncio.wait_for(future, timeout=100.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
return {"status": "failed", "retcode": -1, "msg": "api timeout"}

10
main.py Normal file
View File

@@ -0,0 +1,10 @@
# main.py
import asyncio
from core import WS
async def main():
bot = WS()
await bot.connect()
if __name__ == "__main__":
asyncio.run(main())

4
models/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .event import Event, MessageSegment
from .sender import Sender
__all__ = ["Event", "MessageSegment", "Sender"]

0
models/base.py Normal file
View File

63
models/event.py Normal file
View File

@@ -0,0 +1,63 @@
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from .sender import Sender # 导入上面的 Sender
@dataclass
class MessageSegment:
type: str
data: Dict[str, Any]
@property
def text(self) -> str:
"""如果是文本段,返回文本内容,否则返回空字符串"""
return self.data.get("text", "") if self.type == "text" else ""
@property
def image_url(self) -> str:
"""如果是图片段,返回图片 URL"""
return self.data.get("url", "") if self.type == "image" else ""
def is_at(self, user_id: int = None) -> bool:
"""判断是否是 @某人"""
if self.type != "at":
return False
if user_id is None:
return True
return str(self.data.get("qq")) == str(user_id)
def __repr__(self):
return f"[MS:{self.type}:{self.data}]"
@dataclass
class Event:
post_type: str
message_type: str # group 或 private
user_id: int
self_id: int
raw_message: str
message: List[MessageSegment]
sender: Sender
time: int
group_id: Optional[int] = None
target_id: Optional[int] = None
@classmethod
def from_dict(cls, data: dict):
raw_msg_array = data.get("message", [])
segments = [
MessageSegment(type=seg["type"], data=seg["data"])
for seg in raw_msg_array
]
data_copy = data.copy()
data_copy["message"] = segments
sender_data = data.get("sender", {})
sender_obj = Sender(**{k: v for k, v in sender_data.items() if k in Sender.__annotations__})
data_copy = data.copy()
data_copy["message"] = segments
data_copy["sender"] = sender_obj # 关键点:把对象塞进去
valid_data = {k: v for k, v in data_copy.items() if k in cls.__annotations__}
return cls(**valid_data)

8
models/sender.py Normal file
View File

@@ -0,0 +1,8 @@
from dataclasses import dataclass
@dataclass
class Sender:
user_id: int
nickname: str
card: str = ""
role: str = "" # admin, owner, member

BIN
requirements.txt Normal file

Binary file not shown.