221 lines
6.6 KiB
Python
221 lines
6.6 KiB
Python
"""
|
|
镜像头像插件
|
|
|
|
提供 /镜像 指令,将@的用户头像或用户发送的图片处理成轴对称图形。
|
|
"""
|
|
from core.managers.command_manager import matcher
|
|
from core.bot import Bot
|
|
from models.events.message import MessageEvent
|
|
from core.permission import Permission
|
|
from PIL import Image
|
|
import io
|
|
import aiohttp
|
|
import base64
|
|
import asyncio
|
|
|
|
__plugin_meta__ = {
|
|
"name": "mirror_avatar",
|
|
"description": "将用户头像或图片处理成轴对称图形",
|
|
"usage": "/镜像 @人 - 将@的用户头像处理成轴对称图形\n/镜像 - 等待用户发送图片进行镜像处理",
|
|
}
|
|
|
|
# 存储等待图片的用户信息
|
|
waiting_for_image = {}
|
|
|
|
async def get_avatar(user_id: int) -> bytes:
|
|
"""
|
|
获取用户头像
|
|
|
|
:param user_id: 用户QQ号
|
|
:return: 头像图片字节
|
|
"""
|
|
# 构建QQ头像URL
|
|
url = f"https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
else:
|
|
raise Exception(f"获取头像失败: {response.status}")
|
|
|
|
async def get_image_from_url(url: str) -> bytes:
|
|
"""
|
|
从URL获取图片
|
|
|
|
:param url: 图片URL
|
|
:return: 图片字节
|
|
"""
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
else:
|
|
raise Exception(f"获取图片失败: {response.status}")
|
|
|
|
def process_avatar(image_bytes: bytes) -> bytes:
|
|
"""
|
|
处理头像为轴对称图形
|
|
|
|
:param image_bytes: 原始头像字节
|
|
:return: 处理后的头像字节
|
|
"""
|
|
# 打开图片
|
|
img = Image.open(io.BytesIO(image_bytes))
|
|
|
|
# 获取图片尺寸
|
|
width, height = img.size
|
|
|
|
# 计算对称轴位置(中间)
|
|
mid_x = width // 2
|
|
|
|
# 分割图片为左右两部分
|
|
left_half = img.crop((0, 0, mid_x, height))
|
|
right_half = img.crop((mid_x, 0, width, height))
|
|
|
|
# 翻转左侧部分到右侧
|
|
left_half_flipped = left_half.transpose(Image.FLIP_LEFT_RIGHT)
|
|
|
|
# 创建新图片
|
|
new_img = Image.new('RGB', (width, height))
|
|
|
|
# 粘贴左侧原始部分和右侧翻转部分
|
|
new_img.paste(left_half, (0, 0))
|
|
new_img.paste(left_half_flipped, (mid_x, 0))
|
|
|
|
# 保存处理后的图片
|
|
output = io.BytesIO()
|
|
new_img.save(output, format='JPEG')
|
|
output.seek(0)
|
|
|
|
return output.read()
|
|
|
|
async def wait_for_image(bot: Bot, event: MessageEvent):
|
|
"""
|
|
等待用户发送图片
|
|
|
|
:param bot: Bot实例
|
|
:param event: 消息事件对象
|
|
"""
|
|
user_id = event.user_id
|
|
chat_id = event.group_id if hasattr(event, 'group_id') else event.user_id
|
|
is_group = hasattr(event, 'group_id')
|
|
|
|
# 设置超时时间
|
|
timeout = 30
|
|
|
|
# 提示用户发送图片
|
|
await event.reply(f"请在{timeout}秒内发送要处理的图片")
|
|
|
|
# 记录等待状态
|
|
waiting_for_image[user_id] = True
|
|
|
|
try:
|
|
# 等待超时
|
|
await asyncio.sleep(timeout)
|
|
|
|
# 检查是否仍然在等待
|
|
if user_id in waiting_for_image:
|
|
del waiting_for_image[user_id]
|
|
await event.reply("等待超时,请重新发送指令")
|
|
except asyncio.CancelledError:
|
|
# 图片已收到,任务被取消
|
|
pass
|
|
|
|
@matcher.on_message()
|
|
async def handle_image_message(bot: Bot, event: MessageEvent):
|
|
"""
|
|
处理用户发送的图片消息
|
|
|
|
:param bot: Bot实例
|
|
:param event: 消息事件对象
|
|
"""
|
|
user_id = event.user_id
|
|
|
|
# 检查用户是否在等待图片
|
|
if user_id not in waiting_for_image:
|
|
return
|
|
|
|
# 查找消息中的图片
|
|
images = []
|
|
for segment in event.message:
|
|
if segment.type == "image" and segment.data.get("url"):
|
|
images.append(segment.data["url"])
|
|
|
|
if not images:
|
|
return
|
|
|
|
# 取消等待任务
|
|
del waiting_for_image[user_id]
|
|
|
|
try:
|
|
# 获取第一张图片
|
|
image_url = images[0]
|
|
|
|
# 下载图片
|
|
image_bytes = await get_image_from_url(image_url)
|
|
|
|
# 处理图片
|
|
processed_image = process_avatar(image_bytes)
|
|
|
|
# 检查是否可以发送图片
|
|
can_send = await bot.can_send_image()
|
|
if not can_send.get("yes"):
|
|
await event.reply("当前环境不支持发送图片")
|
|
return
|
|
|
|
# 发送处理后的图片
|
|
from models.message import MessageSegment
|
|
# 将字节数据转换为 Base64 编码
|
|
processed_image_base64 = base64.b64encode(processed_image).decode('utf-8')
|
|
# 使用 Base64 编码的字符串
|
|
await event.reply(MessageSegment.image(f"base64://{processed_image_base64}"))
|
|
|
|
except Exception as e:
|
|
await event.reply(f"处理图片失败: {str(e)}")
|
|
|
|
@matcher.command("镜像")
|
|
async def handle_mirror(bot: Bot, event: MessageEvent, args: list[str]):
|
|
"""
|
|
处理镜像指令,将@的用户头像或用户发送的图片处理成轴对称图形
|
|
|
|
:param bot: Bot实例
|
|
:param event: 消息事件对象
|
|
:param args: 指令参数列表
|
|
"""
|
|
# 检查消息中是否有@的用户
|
|
at_users = []
|
|
for segment in event.message:
|
|
if segment.type == "at" and segment.data.get("qq"):
|
|
at_users.append(int(segment.data["qq"]))
|
|
|
|
if at_users:
|
|
# 获取第一个@的用户
|
|
user_id = at_users[0]
|
|
|
|
try:
|
|
# 获取用户头像
|
|
avatar_bytes = await get_avatar(user_id)
|
|
|
|
# 处理头像
|
|
processed_avatar = process_avatar(avatar_bytes)
|
|
|
|
# 检查是否可以发送图片
|
|
can_send = await bot.can_send_image()
|
|
if not can_send.get("yes"):
|
|
await event.reply("当前环境不支持发送图片")
|
|
return
|
|
|
|
# 发送处理后的头像
|
|
from models.message import MessageSegment
|
|
# 将字节数据转换为 Base64 编码
|
|
processed_avatar_base64 = base64.b64encode(processed_avatar).decode('utf-8')
|
|
# 使用 Base64 编码的字符串
|
|
await event.reply(MessageSegment.image(f"base64://{processed_avatar_base64}"))
|
|
|
|
except Exception as e:
|
|
await event.reply(f"处理头像失败: {str(e)}")
|
|
else:
|
|
# 没有@用户,等待用户发送图片
|
|
# 启动等待任务
|
|
asyncio.create_task(wait_for_image(bot, event)) |