Merge pull request #59 from Fairy-Oracle-Sanctuary/dev

Dev
This commit is contained in:
镀铬酸钾
2026-02-27 22:49:45 +08:00
committed by GitHub
8 changed files with 596 additions and 257 deletions

View File

@@ -73,7 +73,7 @@ def get_jrcd(user_id: int) -> int:
@matcher.command("jrcd")
async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
if event.id == 831797331:
if event.group_id == 831797331:
return None
"""
处理 jrcd 指令,回复用户的“今日长度”。

221
plugins/mirror_avatar.py Normal file
View File

@@ -0,0 +1,221 @@
"""
镜像头像插件
提供 /镜像 指令,将@的用户头像或用户发送的图片处理成轴对称图形。
"""
from core.managers.command_manager import matcher
from core.bot import Bot
from models.events.message import MessageEvent
from core.permission import Permission
from PIL import Image
import io
import aiohttp
import base64
import asyncio
__plugin_meta__ = {
"name": "mirror_avatar",
"description": "将用户头像或图片处理成轴对称图形",
"usage": "/镜像 @人 - 将@的用户头像处理成轴对称图形\n/镜像 - 等待用户发送图片进行镜像处理",
}
# 存储等待图片的用户信息
waiting_for_image = {}
async def get_avatar(user_id: int) -> bytes:
"""
获取用户头像
:param user_id: 用户QQ号
:return: 头像图片字节
"""
# 构建QQ头像URL
url = f"https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
else:
raise Exception(f"获取头像失败: {response.status}")
async def get_image_from_url(url: str) -> bytes:
"""
从URL获取图片
:param url: 图片URL
:return: 图片字节
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
else:
raise Exception(f"获取图片失败: {response.status}")
def process_avatar(image_bytes: bytes) -> bytes:
"""
处理头像为轴对称图形
:param image_bytes: 原始头像字节
:return: 处理后的头像字节
"""
# 打开图片
img = Image.open(io.BytesIO(image_bytes))
# 获取图片尺寸
width, height = img.size
# 计算对称轴位置(中间)
mid_x = width // 2
# 分割图片为左右两部分
left_half = img.crop((0, 0, mid_x, height))
right_half = img.crop((mid_x, 0, width, height))
# 翻转左侧部分到右侧
left_half_flipped = left_half.transpose(Image.FLIP_LEFT_RIGHT)
# 创建新图片
new_img = Image.new('RGB', (width, height))
# 粘贴左侧原始部分和右侧翻转部分
new_img.paste(left_half, (0, 0))
new_img.paste(left_half_flipped, (mid_x, 0))
# 保存处理后的图片
output = io.BytesIO()
new_img.save(output, format='JPEG')
output.seek(0)
return output.read()
async def wait_for_image(bot: Bot, event: MessageEvent):
"""
等待用户发送图片
:param bot: Bot实例
:param event: 消息事件对象
"""
user_id = event.user_id
chat_id = event.group_id if hasattr(event, 'group_id') else event.user_id
is_group = hasattr(event, 'group_id')
# 设置超时时间
timeout = 30
# 提示用户发送图片
await event.reply(f"请在{timeout}秒内发送要处理的图片")
# 记录等待状态
waiting_for_image[user_id] = True
try:
# 等待超时
await asyncio.sleep(timeout)
# 检查是否仍然在等待
if user_id in waiting_for_image:
del waiting_for_image[user_id]
await event.reply("等待超时,请重新发送指令")
except asyncio.CancelledError:
# 图片已收到,任务被取消
pass
@matcher.on_message()
async def handle_image_message(bot: Bot, event: MessageEvent):
"""
处理用户发送的图片消息
:param bot: Bot实例
:param event: 消息事件对象
"""
user_id = event.user_id
# 检查用户是否在等待图片
if user_id not in waiting_for_image:
return
# 查找消息中的图片
images = []
for segment in event.message:
if segment.type == "image" and segment.data.get("url"):
images.append(segment.data["url"])
if not images:
return
# 取消等待任务
del waiting_for_image[user_id]
try:
# 获取第一张图片
image_url = images[0]
# 下载图片
image_bytes = await get_image_from_url(image_url)
# 处理图片
processed_image = process_avatar(image_bytes)
# 检查是否可以发送图片
can_send = await bot.can_send_image()
if not can_send.get("yes"):
await event.reply("当前环境不支持发送图片")
return
# 发送处理后的图片
from models.message import MessageSegment
# 将字节数据转换为 Base64 编码
processed_image_base64 = base64.b64encode(processed_image).decode('utf-8')
# 使用 Base64 编码的字符串
await event.reply(MessageSegment.image(f"base64://{processed_image_base64}"))
except Exception as e:
await event.reply(f"处理图片失败: {str(e)}")
@matcher.command("镜像")
async def handle_mirror(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理镜像指令,将@的用户头像或用户发送的图片处理成轴对称图形
:param bot: Bot实例
:param event: 消息事件对象
:param args: 指令参数列表
"""
# 检查消息中是否有@的用户
at_users = []
for segment in event.message:
if segment.type == "at" and segment.data.get("qq"):
at_users.append(int(segment.data["qq"]))
if at_users:
# 获取第一个@的用户
user_id = at_users[0]
try:
# 获取用户头像
avatar_bytes = await get_avatar(user_id)
# 处理头像
processed_avatar = process_avatar(avatar_bytes)
# 检查是否可以发送图片
can_send = await bot.can_send_image()
if not can_send.get("yes"):
await event.reply("当前环境不支持发送图片")
return
# 发送处理后的头像
from models.message import MessageSegment
# 将字节数据转换为 Base64 编码
processed_avatar_base64 = base64.b64encode(processed_avatar).decode('utf-8')
# 使用 Base64 编码的字符串
await event.reply(MessageSegment.image(f"base64://{processed_avatar_base64}"))
except Exception as e:
await event.reply(f"处理头像失败: {str(e)}")
else:
# 没有@用户,等待用户发送图片
# 启动等待任务
asyncio.create_task(wait_for_image(bot, event))