Files
NeoBot/plugins/weather.py
2026-03-01 11:25:36 +08:00

201 lines
6.6 KiB
Python

# -*- coding: utf-8 -*-
import re
from datetime import datetime
from typing import Any, Dict, List
import requests
from core.managers.command_manager import matcher
from core.managers.image_manager import image_manager
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from .resource.city_code import CITY_CODES
# 插件元数据
__plugin_meta__ = {
"name": "weather",
"description": "查询天气信息,支持中国天气网数据。",
"usage": "/天气 [城市代码] - 查询指定城市的天气信息\n例如:/天气 101190207 (南京)",
}
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
def get_weather_data(city_code: str) -> Dict[str, Any]:
"""
获取天气数据
Args:
city_code (str): 城市代码
Returns:
Dict[str, Any]: 包含城市信息和天气数据的字典
"""
try:
url = f"https://www.weather.com.cn/weather/{city_code}.shtml"
response = requests.get(url, headers=HEADERS, timeout=10)
response.encoding = "utf-8"
html_content = response.text
# 提取城市信息
city_info = (
html_content.split('<a href="http://www.weather.com.cn/forecast/" ')[-1]
.split("</div>")[0]
.strip()
)
city_parts = []
city_parts.append(city_info.split("<a>")[-1].split("</a>")[0])
if city_info.count("_blank") == 1:
city_parts.append(
city_info.split("<span>></span>")[-1]
.replace("</span>", "")
.replace("<span>", "")
.strip()
)
else:
additional_parts = (
city_info.split('target="_blank">')[-1]
.replace("<span>></span> <span>", "")
.replace("</span>", "")
.split("</a>")
)
city_parts.extend(additional_parts)
city_name = " ".join([part for part in city_parts if part.strip()])
# 提取天气信息
weather_data = []
for i in range(7):
try:
weather_info = (
html_content.split('<ul class="t clearfix">')[-1]
.split('on">')[1]
.split("</li>")[i]
)
day = weather_info.split("<h1>")[-1].split("</h1>")[0].strip()
weather = (
weather_info.split('<p title="')[-1]
.split('" class="wea">')[0]
.strip()
)
tem = (
weather_info.split("<span>")[-1]
.split("</i>")[0]
.replace("</span>/<i>", " / ")
.strip()
)
if len(tem) > 10:
tem = weather_info.split("<i>")[1].split("</i>")[0].strip()
wind = weather_info.split('<span title="')
wind_direction = []
if len(wind) > 2:
for j in range(2):
wind_direction.append(
wind[j + 1]
.split('"></span>')[0]
.replace('" class="', " / ")
)
else:
wind_direction.append(
wind[1].split('"></span>')[0].replace('" class="', " / ")
)
wind_power = weather_info.split("<i>")[-1].split("</i>")[0].strip()
wind_direction_str = (
" / ".join(wind_direction) if wind_direction else "未知"
)
weather_data.append(
{
"day": day,
"weather": weather,
"temperature": tem,
"wind_power": wind_power,
"wind_direction": wind_direction_str,
}
)
except (IndexError, ValueError) as e:
logger.warning(f"解析第{i + 1}天天气数据失败: {e}")
continue
return {
"city_name": city_name,
"weather_data": weather_data,
"query_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"timestamp": datetime.now().strftime("%Y年%m月%d%H:%M"),
}
except Exception as e:
logger.error(f"获取天气数据失败: {e}")
return None
@matcher.command("天气")
async def handle_weather(bot, event: MessageEvent, args: List[str]):
"""
处理天气查询指令
Args:
bot: Bot实例
event: 消息事件
args: 指令参数
"""
if not args:
# 显示支持的城市列表
city_list = "\n".join(
[f"{name}: {code}" for name, code in list(CITY_CODES.items())[:10]]
)
reply_msg = f"请指定城市名称或城市代码,例如:\n/天气 北京\n/天气 101010100\n\n支持的城市:\n{city_list}\n..."
await event.reply(reply_msg)
return
city_input = args[0].strip()
# 尝试匹配城市名称或直接使用城市代码
city_code = None
if city_input in CITY_CODES:
city_code = CITY_CODES[city_input]
elif re.match(r"^\d{9}$", city_input):
city_code = city_input
else:
# 尝试模糊匹配城市名称
for name, code in CITY_CODES.items():
if city_input in name:
city_code = code
break
if not city_code:
await event.reply(f"未找到城市 '{city_input}',请检查城市名称或使用城市代码。")
return
# 获取天气数据
await event.reply("正在查询天气信息,请稍候...")
weather_info = get_weather_data(city_code)
if not weather_info or not weather_info.get("weather_data"):
await event.reply("获取天气信息失败,请稍后重试。")
return
try:
# 渲染HTML模板为图片
base64_image = await image_manager.render_template_to_base64(
"weather.html", weather_info, output_name="weather.png", width=400, height=500
)
if base64_image:
# 发送图片消息
await event.reply(MessageSegment.image(base64_image))
else:
await event.reply("生成天气图片失败,请稍后重试。")
except Exception as e:
logger.error(f"渲染天气图片失败: {e}")
await event.reply("生成天气图片时发生错误,请稍后重试。")