# 安全最佳实践 本文档介绍了 NeoBot 框架的安全最佳实践,包括配置安全、输入验证、异常处理等方面。 ## 目录 1. [配置安全](#配置安全) 2. [输入验证](#输入验证) 3. [异常处理](#异常处理) 4. [代码执行安全](#代码执行安全) 5. [网络通信安全](#网络通信安全) 6. [文件操作安全](#文件操作安全) ## 配置安全 ### 环境变量配置 NeoBot 支持使用环境变量管理敏感配置,避免将密码、令牌等敏感信息硬编码在配置文件中。 #### 使用方法 1. 复制 `.env.example` 为 `.env`: ```bash cp .env.example .env ``` 2. 编辑 `.env` 文件,填写实际值: ```env # 数据库配置 MYSQL_HOST=localhost MYSQL_PORT=3306 MYSQL_USER=root MYSQL_PASSWORD=your_secure_password # Redis 配置 REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD=your_redis_password # Discord 配置 DISCORD_TOKEN=your_discord_bot_token # Bilibili 配置 BILIBILI_SESSDATA=your_bilibili_sessdata BILIBILI_BILI_JCT=your_bilibili_jct ``` 3. 确保 `.env` 文件权限安全: ```bash # Linux/Mac chmod 600 .env # Windows icacls .env /inheritance:r /grant:r "%USERNAME%:R" ``` #### 配置优先级 环境变量的优先级高于配置文件: 1. 环境变量(最高优先级) 2. `config.toml` 文件 3. 默认值(最低优先级) #### 代码中使用 ```python from neobot.core.utils.env_loader import env_loader # 加载环境变量 env_loader.load() # 获取配置值 mysql_host = env_loader.get("MYSQL_HOST", "localhost") mysql_port = env_loader.get_int("MYSQL_PORT", 3306) discord_token = env_loader.get("DISCORD_TOKEN") # 获取掩码的敏感值(用于日志) masked_password = env_loader.get_masked("MYSQL_PASSWORD") # 输出: pa***rd(仅显示前2个和后2个字符) ``` ### 配置文件权限检查 框架会自动检查配置文件的权限,如果发现不安全权限会输出警告: ``` [WARNING] 配置文件 config.toml 其他用户可读,存在安全风险 [INFO] 建议使用命令: chmod 600 config.toml ``` ## 输入验证 ### 输入验证器 NeoBot 提供了全面的输入验证工具,防止常见的安全攻击。 #### 基本使用 ```python from neobot.core.utils.input_validator import input_validator # 验证 SQL 输入 if not input_validator.validate_sql_input(user_input): await event.reply("输入包含不安全字符") # 验证 XSS 攻击 if not input_validator.validate_xss_input(user_input): await event.reply("输入包含不安全内容") # 验证命令注入 if not input_validator.validate_command_input(user_input): await event.reply("输入包含危险命令") # 验证路径遍历 if not input_validator.validate_path_input(file_path): await event.reply("文件路径不安全") ``` #### 综合验证 ```python # 执行所有默认验证 results = input_validator.validate_all(user_input) # results = {'sql': True, 'xss': True, 'path': True, 'command': True} # 自定义验证类型 results = input_validator.validate_all( user_input, validation_types=['sql', 'xss', 'email', 'url'] ) ``` #### 数据清理 ```python # 清理 HTML,防止 XSS safe_html = input_validator.sanitize_html(user_html_input) # 清理 SQL,防止注入 safe_sql = input_validator.sanitize_sql(user_sql_input) ``` ### 插件中的输入验证 #### 天气插件示例 ```python @matcher.command("天气") async def handle_weather(bot, event: MessageEvent, args: List[str]): city_input = args[0].strip() # 输入验证 if not input_validator.validate_sql_input(city_input): await event.reply("输入包含不安全字符,请重新输入。") return if not input_validator.validate_xss_input(city_input): await event.reply("输入包含不安全内容,请重新输入。") return # 继续处理... ``` #### 代码执行插件示例 ```python def validate_code_security(code: str) -> bool: """验证代码安全性""" # 检查命令注入 if not input_validator.validate_command_input(code): return False # 检查路径遍历 if not input_validator.validate_path_input(code): return False # 检查危险的系统调用 dangerous_patterns = [ r"import\s+(os|sys|subprocess|shutil|platform|ctypes)", r"__import__\s*\(", r"eval\s*\(", r"exec\s*\(", ] for pattern in dangerous_patterns: if re.search(pattern, code, re.IGNORECASE): return False return True ``` ## 异常处理 ### 最佳实践 1. **避免裸异常捕获**: ```python # 错误做法 try: # 一些操作 except Exception: pass # 正确做法 try: # 一些操作 except (ValueError, TypeError) as e: logger.error(f"处理数据时出错: {e}") except ConnectionError as e: logger.error(f"网络连接失败: {e}") ``` 2. **提供有意义的错误信息**: ```python try: result = await some_async_operation() except asyncio.TimeoutError: await event.reply("操作超时,请稍后重试") except aiohttp.ClientError as e: logger.error(f"网络请求失败: {e}") await event.reply("网络请求失败,请检查网络连接") ``` 3. **记录异常堆栈**: ```python try: # 一些操作 except Exception as e: logger.exception(f"处理消息时发生未预期错误: {e}") # 不要向用户暴露堆栈信息 await event.reply("处理消息时发生错误,请稍后重试") ``` ### 框架提供的异常类 ```python from neobot.core.utils.exceptions import ( ConfigError, ConfigNotFoundError, ConfigValidationError, PluginError, PermissionDeniedError, ) try: config = Config("config.toml") except ConfigNotFoundError as e: logger.error(f"配置文件不存在: {e}") except ConfigValidationError as e: logger.error(f"配置验证失败: {e}") for detail in e.error_details: logger.error(f" - {detail}") ``` ## 代码执行安全 ### 沙箱环境 代码执行插件在 Docker 沙箱中运行用户代码,提供隔离的执行环境。 #### 安全特性 1. **资源限制**: - CPU 使用限制 - 内存使用限制 - 执行时间限制 - 网络访问限制 2. **代码验证**: ```python def validate_code_security(code: str) -> bool: """验证代码安全性""" # 检查危险模块导入 dangerous_imports = [ "import os", "import sys", "import subprocess", "__import__", "eval", "exec", "compile" ] for dangerous in dangerous_imports: if dangerous in code.lower(): return False return True ``` 3. **输入输出限制**: - 最大输入长度限制 - 最大输出长度限制 - 禁止文件系统访问 ### 异步操作 所有可能阻塞的操作都应使用异步版本: ```python # 错误做法(同步阻塞) import requests response = requests.get(url) # 会阻塞事件循环 # 正确做法(异步非阻塞) import aiohttp async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.text() ``` ## 网络通信安全 ### HTTPS 强制 所有外部请求都应使用 HTTPS: ```python # 确保使用 HTTPS url = "https://api.example.com/data" # 验证 URL 安全性 if not input_validator.validate_url(url, allowed_schemes=["https"]): raise ValueError("不安全的 URL 协议") ``` ### 请求超时设置 避免请求无限期等待: ```python import aiohttp timeout = aiohttp.ClientTimeout( total=30, # 总超时时间 connect=10, # 连接超时 sock_read=15 # 读取超时 ) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as response: data = await response.json() ``` ### 请求重试机制 ```python import asyncio from typing import Optional async def safe_request( url: str, max_retries: int = 3, base_delay: float = 1.0 ) -> Optional[str]: """安全的网络请求,带重试机制""" for attempt in range(max_retries): try: timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as response: response.raise_for_status() return await response.text() except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt == max_retries - 1: logger.error(f"请求失败,已达最大重试次数: {e}") return None delay = base_delay * (2 ** attempt) # 指数退避 logger.warning(f"请求失败,{delay}秒后重试: {e}") await asyncio.sleep(delay) return None ``` ## 文件操作安全 ### 路径验证 所有文件操作前都应验证路径安全性: ```python from pathlib import Path def safe_file_operation(file_path: str) -> bool: """安全的文件操作""" # 验证路径安全性 if not input_validator.validate_path_input(file_path): logger.error(f"不安全的文件路径: {file_path}") return False # 解析路径 path = Path(file_path).resolve() # 检查是否在允许的目录内 allowed_base = Path("/var/data").resolve() if not str(path).startswith(str(allowed_base)): logger.error(f"文件路径不在允许的目录内: {file_path}") return False # 检查文件大小限制 if path.exists() and path.stat().st_size > 10 * 1024 * 1024: # 10MB logger.error(f"文件过大: {file_path}") return False return True ``` ### 临时文件安全 ```python import tempfile import os def create_temp_file(content: bytes) -> str: """创建安全的临时文件""" # 创建临时文件 with tempfile.NamedTemporaryFile( mode='wb', delete=False, suffix='.tmp', dir='/tmp' # 指定临时目录 ) as f: f.write(content) temp_path = f.name # 设置安全权限 os.chmod(temp_path, 0o600) return temp_path def cleanup_temp_file(file_path: str): """清理临时文件""" try: if os.path.exists(file_path): os.unlink(file_path) except Exception as e: logger.warning(f"清理临时文件失败: {e}") ``` ## 日志安全 ### 敏感信息掩码 框架自动掩码敏感信息: ```python from neobot.core.utils.env_loader import env_loader # 敏感值会自动掩码 password = env_loader.get_masked("MYSQL_PASSWORD") # 输出: pa***rd(不会显示完整密码) token = env_loader.get_masked("DISCORD_TOKEN") # 输出: di***en(不会显示完整令牌) ``` ### 安全日志记录 ```python from neobot.core.utils.logger import logger # 安全记录用户输入(截断长内容) def safe_log_user_input(user_input: str, max_length: int = 100): """安全记录用户输入""" if len(user_input) > max_length: logged_input = user_input[:max_length] + "..." else: logged_input = user_input # 移除敏感信息 logged_input = logged_input.replace("\n", "\\n") logged_input = logged_input.replace("\r", "\\r") logger.info(f"用户输入: {logged_input}") # 记录操作时避免敏感信息 def log_operation(user_id: int, operation: str, details: str = ""): """记录用户操作""" logger.info(f"用户 {user_id} 执行操作: {operation}") if details: # 确保 details 不包含敏感信息 safe_details = input_validator.sanitize_html(details) logger.debug(f"操作详情: {safe_details}") ``` ## 总结 遵循这些安全最佳实践可以显著提高 NeoBot 应用的安全性: 1. **使用环境变量管理敏感配置** 2. **对所有用户输入进行验证** 3. **使用具体的异常类型进行错误处理** 4. **在沙箱中执行不可信代码** 5. **使用异步操作避免阻塞** 6. **验证所有文件路径和网络请求** 7. **在日志中掩码敏感信息** 定期审查代码,确保遵循这些安全实践,可以保护你的应用免受常见的安全威胁。