# API 使用示例 本文档提供了 NeoBot 框架核心 API 的使用示例,帮助开发者快速上手。 ## 目录 1. [插件开发基础](#插件开发基础) 2. [消息处理](#消息处理) 3. [配置管理](#配置管理) 4. [日志记录](#日志记录) 5. [输入验证](#输入验证) 6. [环境变量管理](#环境变量管理) 7. [数据库操作](#数据库操作) 8. [网络请求](#网络请求) ## 插件开发基础 ### 基本插件结构 ```python # -*- coding: utf-8 -*- from typing import List, Optional from neobot.core.managers.command_manager import matcher from neobot.core.utils.logger import logger from models import MessageEvent # 插件元数据 __plugin_meta__ = { "name": "example_plugin", "description": "示例插件", "usage": "/示例命令 [参数] - 示例命令说明", } @matcher.command("示例命令") async def handle_example_command(bot, event: MessageEvent, args: List[str]): """ 处理示例命令 Args: bot: 机器人实例 event: 消息事件 args: 命令参数列表 """ try: if not args: await event.reply("请输入参数,例如:/示例命令 参数") return # 处理逻辑 result = await process_args(args[0]) # 回复结果 await event.reply(f"处理结果: {result}") except Exception as e: logger.error(f"处理命令时出错: {e}") await event.reply("处理命令时发生错误,请稍后重试。") ``` ### 带权限检查的插件 ```python from neobot.core.managers.permission_manager import permission_manager @matcher.command("管理命令", permission="admin") async def handle_admin_command(bot, event: MessageEvent, args: List[str]): """ 处理管理命令(需要管理员权限) """ # 检查权限 user_id = event.user_id if not permission_manager.check_permission(user_id, "admin"): await event.reply("您没有执行此命令的权限。") return # 执行管理操作 await event.reply("管理命令执行成功。") ``` ## 消息处理 ### 发送消息 ```python from models import MessageSegment async def send_messages(event: MessageEvent): """发送各种类型的消息""" # 发送纯文本 await event.reply("这是一条文本消息") # 发送带格式的文本 await event.reply("**粗体** *斜体* `代码`") # 发送图片 image_segment = MessageSegment.image("https://example.com/image.jpg") await event.reply([image_segment, "这是一张图片"]) # 发送文件 file_segment = MessageSegment.file("/path/to/file.txt") await event.reply(file_segment) # 发送语音 voice_segment = MessageSegment.voice("/path/to/voice.mp3") await event.reply(voice_segment) ``` ### 处理消息事件 ```python from typing import Dict, Any @matcher.on_message() async def handle_all_messages(bot, event: MessageEvent): """ 处理所有消息 """ # 获取消息内容 message = event.message user_id = event.user_id group_id = event.group_id # 记录消息 logger.info(f"收到消息: 用户={user_id}, 群组={group_id}, 内容={message}") # 简单的自动回复 if "你好" in message: await event.reply("你好!我是机器人。") # 处理特定关键词 if "帮助" in message: await event.reply("输入 /帮助 查看可用命令。") ``` ### 消息模板 ```python from string import Template class MessageTemplate: """消息模板""" @staticmethod def welcome_message(user_name: str) -> str: """欢迎消息模板""" template = Template("欢迎 $user_name 加入!") return template.substitute(user_name=user_name) @staticmethod def weather_report(city: str, temperature: float, condition: str) -> str: """天气报告模板""" return f""" {city} 天气报告: 🌡️ 温度: {temperature}°C 🌤️ 天气: {condition} """.strip() @staticmethod def error_message(error_type: str, suggestion: str = "") -> str: """错误消息模板""" base = f"发生错误: {error_type}" if suggestion: base += f"\n建议: {suggestion}" return base # 使用示例 async def send_welcome(event: MessageEvent, user_name: str): message = MessageTemplate.welcome_message(user_name) await event.reply(message) ``` ## 配置管理 ### 基本配置使用 ```python from neobot.core.config_loader import Config # 加载配置 config = Config("config.toml") # 获取配置值 bot_name = config.get("bot.name", "NeoBot") admin_users = config.get_list("bot.admin_users", []) # 获取嵌套配置 database_config = config.get_section("database") if database_config: db_host = database_config.get("host", "localhost") db_port = database_config.get_int("port", 3306) # 检查配置是否存在 if config.has("api.keys.openai"): openai_key = config.get("api.keys.openai") ``` ### 配置验证 ```python from typing import Dict, Any from pydantic import BaseModel, Field, validator class DatabaseConfig(BaseModel): """数据库配置模型""" host: str = Field(default="localhost") port: int = Field(default=3306, ge=1, le=65535) user: str password: str database: str @validator('password') def password_not_empty(cls, v): if not v or len(v.strip()) == 0: raise ValueError('密码不能为空') return v class BotConfig(BaseModel): """机器人配置模型""" name: str = Field(default="NeoBot") admin_users: list = Field(default_factory=list) database: DatabaseConfig # 使用配置模型 def load_and_validate_config(config_path: str) -> BotConfig: """加载并验证配置""" config = Config(config_path) # 转换为字典 config_dict = config.to_dict() # 验证配置 try: bot_config = BotConfig(**config_dict) return bot_config except Exception as e: logger.error(f"配置验证失败: {e}") raise ``` ## 日志记录 ### 基本日志使用 ```python from neobot.core.utils.logger import logger # 不同级别的日志 logger.debug("调试信息") logger.info("普通信息") logger.warning("警告信息") logger.error("错误信息") logger.critical("严重错误") # 带上下文的日志 logger.info("用户操作", extra={ "user_id": 123456, "action": "login", "ip": "192.168.1.1" }) # 异常日志 try: # 一些可能失败的操作 result = risky_operation() except Exception as e: logger.exception(f"操作失败: {e}") ``` ### 自定义日志格式 ```python import logging from neobot.core.utils.logger import setup_logging # 自定义日志配置 log_config = { "version": 1, "formatters": { "detailed": { "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" }, "simple": { "format": "%(levelname)s: %(message)s" } }, "handlers": { "console": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple" }, "file": { "class": "logging.handlers.RotatingFileHandler", "filename": "logs/neobot.log", "maxBytes": 10485760, # 10MB "backupCount": 5, "formatter": "detailed" } }, "loggers": { "neobot": { "level": "DEBUG", "handlers": ["console", "file"] } } } # 设置日志 setup_logging(log_config) ``` ## 输入验证 ### 基本验证 ```python from neobot.core.utils.input_validator import input_validator def validate_user_input(user_input: str) -> tuple[bool, str]: """ 验证用户输入 Returns: (是否有效, 错误消息) """ # 检查空输入 if not user_input or not user_input.strip(): return False, "输入不能为空" # 检查长度 if len(user_input) > 1000: return False, "输入过长(最大1000字符)" # 安全检查 if not input_validator.validate_sql_input(user_input): return False, "输入包含不安全字符" if not input_validator.validate_xss_input(user_input): return False, "输入包含不安全内容" return True, "" # 在插件中使用 @matcher.command("安全命令") async def handle_safe_command(bot, event: MessageEvent, args: List[str]): if not args: await event.reply("请输入参数") return user_input = args[0] is_valid, error_msg = validate_user_input(user_input) if not is_valid: await event.reply(f"输入无效: {error_msg}") return # 处理有效输入 await event.reply(f"输入有效: {user_input}") ``` ### 高级验证 ```python from typing import Dict, Any from datetime import datetime class AdvancedValidator: """高级验证器""" @staticmethod def validate_email_domain(email: str, allowed_domains: list) -> bool: """验证邮箱域名""" if not input_validator.validate_email(email): return False domain = email.split('@')[1] return domain in allowed_domains @staticmethod def validate_date_range(date_str: str, start_date: str, end_date: str) -> bool: """验证日期范围""" try: date = datetime.strptime(date_str, "%Y-%m-%d") start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") return start <= date <= end except ValueError: return False @staticmethod def validate_json_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool: """验证JSON数据格式""" try: # 这里可以使用 jsonschema 库 # import jsonschema # jsonschema.validate(data, schema) # 简化版本:检查必需字段 required_fields = schema.get("required", []) for field in required_fields: if field not in data: return False # 检查字段类型 properties = schema.get("properties", {}) for field, field_schema in properties.items(): if field in data: field_type = field_schema.get("type") if field_type == "string" and not isinstance(data[field], str): return False elif field_type == "number" and not isinstance(data[field], (int, float)): return False elif field_type == "integer" and not isinstance(data[field], int): return False elif field_type == "boolean" and not isinstance(data[field], bool): return False return True except Exception: return False ``` ## 环境变量管理 ### 基本使用 ```python from neobot.core.utils.env_loader import env_loader # 加载环境变量 env_loader.load() # 获取环境变量 database_url = env_loader.get("DATABASE_URL") api_key = env_loader.get("API_KEY") # 获取带默认值的环境变量 port = env_loader.get_int("PORT", 8080) debug = env_loader.get_bool("DEBUG", False) # 获取掩码的敏感值(用于日志) masked_api_key = env_loader.get_masked("API_KEY") logger.info(f"API Key: {masked_api_key}") # 输出: AP***EY # 检查环境变量是否设置 if env_loader.is_set("REQUIRED_VAR"): value = env_loader.get("REQUIRED_VAR") else: logger.error("REQUIRED_VAR 环境变量未设置") ``` ### 环境变量验证 ```python from typing import List, Optional class EnvironmentValidator: """环境变量验证器""" @staticmethod def validate_required(variables: List[str]) -> List[str]: """验证必需的环境变量""" missing = [] for var in variables: if not env_loader.is_set(var): missing.append(var) return missing @staticmethod def validate_database_config() -> bool: """验证数据库配置""" required = ["DB_HOST", "DB_PORT", "DB_USER", "DB_PASSWORD", "DB_NAME"] missing = EnvironmentValidator.validate_required(required) if missing: logger.error(f"缺少数据库配置: {missing}") return False # 验证端口范围 port = env_loader.get_int("DB_PORT") if port < 1 or port > 65535: logger.error(f"数据库端口无效: {port}") return False return True @staticmethod def validate_api_keys() -> bool: """验证API密钥""" # 检查是否有至少一个API密钥 api_keys = [ "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GOOGLE_API_KEY" ] has_key = any(env_loader.is_set(key) for key in api_keys) if not has_key: logger.warning("未设置任何API密钥,某些功能可能不可用") return True ``` ## 数据库操作 ### 异步数据库操作 ```python import aiomysql from typing import List, Dict, Any class DatabaseManager: """数据库管理器""" def __init__(self, config: Dict[str, Any]): self.config = config self.pool = None async def connect(self): """连接数据库""" self.pool = await aiomysql.create_pool( host=self.config['host'], port=self.config['port'], user=self.config['user'], password=self.config['password'], db=self.config['database'], minsize=5, maxsize=20 ) async def execute_query(self, query: str, *args) -> List[Dict[str, Any]]: """执行查询""" async with self.pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute(query, args) return await cursor.fetchall() async def execute_update(self, query: str, *args) -> int: """执行更新""" async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(query, args) await conn.commit() return cursor.rowcount async def close(self): """关闭连接""" if self.pool: self.pool.close() await self.pool.wait_closed() ``` ### 数据库模型 ```python from typing import Optional from datetime import datetime class UserModel: """用户模型""" def __init__(self, db_manager: DatabaseManager): self.db = db_manager async def get_user(self, user_id: int) -> Optional[Dict[str, Any]]: """获取用户""" query = "SELECT * FROM users WHERE id = %s" results = await self.db.execute_query(query, user_id) if results: return results[0] return None async def create_user(self, username: str, email: str) -> int: """创建用户""" query = """ INSERT INTO users (username, email, created_at) VALUES (%s, %s, %s) """ now = datetime.now() await self.db.execute_update(query, username, email, now) # 获取新用户的ID query = "SELECT LAST_INSERT_ID() as id" results = await self.db.execute_query(query) return results[0]['id'] async def update_user(self, user_id: int, **kwargs) -> bool: """更新用户""" if not kwargs: return False set_clause = ", ".join([f"{key} = %s" for key in kwargs.keys()]) query = f"UPDATE users SET {set_clause} WHERE id = %s" values = list(kwargs.values()) values.append(user_id) affected = await self.db.execute_update(query, *values) return affected > 0 ``` ## 网络请求 ### 异步HTTP请求 ```python import aiohttp from typing import Dict, Any, Optional class HttpClient: """HTTP客户端""" def __init__(self, base_url: str = "", timeout: int = 30): self.base_url = base_url.rstrip("/") self.timeout = aiohttp.ClientTimeout(total=timeout) async def get(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: """发送GET请求""" url = f"{self.base_url}/{endpoint.lstrip('/')}" async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(url, **kwargs) as response: response.raise_for_status() return await response.json() async def post(self, endpoint: str, data: Dict[str, Any], **kwargs) -> Optional[Dict[str, Any]]: """发送POST请求""" url = f"{self.base_url}/{endpoint.lstrip('/')}" async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.post(url, json=data, **kwargs) as response: response.raise_for_status() return await response.json() async def download_file(self, url: str, save_path: str) -> bool: """下载文件""" try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(url) as response: response.raise_for_status() # 异步写入文件 import aiofiles async with aiofiles.open(save_path, 'wb') as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) return True except Exception as e: logger.error(f"下载文件失败: {e}") return False ``` ### API客户端示例 ```python from typing import List, Optional class WeatherAPI: """天气API客户端""" def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://api.weather.com" self.client = HttpClient(self.base_url) async def get_current_weather(self, city: str) -> Optional[Dict[str, Any]]: """获取当前天气""" endpoint = "/v1/current" params = { "city": city, "api_key": self.api_key, "units": "metric" } try: return await self.client.get(endpoint, params=params) except aiohttp.ClientError as e: logger.error(f"获取天气失败: {e}") return None async def get_forecast(self, city: str, days: int = 3) -> Optional[List[Dict[str, Any]]]: """获取天气预报""" endpoint = "/v1/forecast" params = { "city": city, "days": days, "api_key": self.api_key } try: data = await self.client.get(endpoint, params=params) return data.get("forecast", []) except aiohttp.ClientError as e: logger.error(f"获取天气预报失败: {e}") return None ``` ## 总结 这些示例展示了 NeoBot 框架核心功能的使用方法。通过组合这些基础组件,可以构建出功能强大、安全可靠的机器人插件。 关键要点: 1. **遵循异步编程模式**:所有可能阻塞的操作都应使用异步版本 2. **验证所有用户输入**:防止安全漏洞 3. **使用配置管理**:将敏感信息存储在环境变量中 4. **记录详细的日志**:便于调试和监控 5. **处理所有异常**:提供友好的错误消息 更多高级功能和最佳实践,请参考框架的其他文档。