241 lines
7.5 KiB
Python
241 lines
7.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List
|
|
|
|
import requests
|
|
|
|
from core.managers.command_manager import matcher
|
|
from core.managers.image_manager import image_manager
|
|
from core.utils.logger import logger
|
|
from models import MessageEvent, MessageSegment
|
|
|
|
# 插件元数据
|
|
__plugin_meta__ = {
|
|
"name": "weather",
|
|
"description": "查询天气信息,支持中国天气网数据。",
|
|
"usage": "/天气 [城市代码] - 查询指定城市的天气信息\n例如:/天气 101190207 (南京)",
|
|
}
|
|
|
|
# 城市代码映射(可以扩展)
|
|
CITY_CODES = {
|
|
"北京": "101010100",
|
|
"上海": "101020100",
|
|
"广州": "101280101",
|
|
"深圳": "101280601",
|
|
"南京": "101190101",
|
|
"苏州": "101190401",
|
|
"杭州": "101210101",
|
|
"武汉": "101200101",
|
|
"成都": "101270101",
|
|
"重庆": "101040100",
|
|
"西安": "101110101",
|
|
"天津": "101030100",
|
|
"沈阳": "101070101",
|
|
"大连": "101070201",
|
|
"青岛": "101120201",
|
|
"济南": "101120101",
|
|
"郑州": "101180101",
|
|
"长沙": "101250101",
|
|
"南昌": "101240101",
|
|
"合肥": "101220101",
|
|
"福州": "101230101",
|
|
"厦门": "101230201",
|
|
"南宁": "101300101",
|
|
"海口": "101310101",
|
|
"昆明": "101290101",
|
|
"贵阳": "101260101",
|
|
"拉萨": "101140101",
|
|
"兰州": "101160101",
|
|
"西宁": "101150101",
|
|
"银川": "101170101",
|
|
"乌鲁木齐": "101130101",
|
|
"哈尔滨": "101050101",
|
|
"长春": "101060101",
|
|
"呼和浩特": "101080101",
|
|
"太原": "101100101",
|
|
"石家庄": "101090101",
|
|
}
|
|
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
}
|
|
|
|
|
|
def get_weather_data(city_code: str) -> Dict[str, Any]:
|
|
"""
|
|
获取天气数据
|
|
|
|
Args:
|
|
city_code (str): 城市代码
|
|
|
|
Returns:
|
|
Dict[str, Any]: 包含城市信息和天气数据的字典
|
|
"""
|
|
try:
|
|
url = f"https://www.weather.com.cn/weather/{city_code}.shtml"
|
|
response = requests.get(url, headers=HEADERS, timeout=10)
|
|
response.encoding = "utf-8"
|
|
html_content = response.text
|
|
|
|
# 提取城市信息
|
|
city_info = (
|
|
html_content.split('<a href="http://www.weather.com.cn/forecast/" ')[-1]
|
|
.split("</div>")[0]
|
|
.strip()
|
|
)
|
|
|
|
city_parts = []
|
|
city_parts.append(city_info.split("<a>")[-1].split("</a>")[0])
|
|
|
|
if city_info.count("_blank") == 1:
|
|
city_parts.append(
|
|
city_info.split("<span>></span>")[-1]
|
|
.replace("</span>", "")
|
|
.replace("<span>", "")
|
|
.strip()
|
|
)
|
|
else:
|
|
additional_parts = (
|
|
city_info.split('target="_blank">')[-1]
|
|
.replace("<span>></span> <span>", "")
|
|
.replace("</span>", "")
|
|
.split("</a>")
|
|
)
|
|
city_parts.extend(additional_parts)
|
|
|
|
city_name = " ".join([part for part in city_parts if part.strip()])
|
|
|
|
# 提取天气信息
|
|
weather_data = []
|
|
for i in range(7):
|
|
try:
|
|
weather_info = (
|
|
html_content.split('<ul class="t clearfix">')[-1]
|
|
.split('on">')[1]
|
|
.split("</li>")[i]
|
|
)
|
|
|
|
day = weather_info.split("<h1>")[-1].split("</h1>")[0].strip()
|
|
weather = (
|
|
weather_info.split('<p title="')[-1]
|
|
.split('" class="wea">')[0]
|
|
.strip()
|
|
)
|
|
|
|
tem = (
|
|
weather_info.split("<span>")[-1]
|
|
.split("</i>")[0]
|
|
.replace("</span>/<i>", " / ")
|
|
.strip()
|
|
)
|
|
if len(tem) > 10:
|
|
tem = weather_info.split("<i>")[1].split("</i>")[0].strip()
|
|
|
|
wind = weather_info.split('<span title="')
|
|
wind_direction = []
|
|
if len(wind) > 2:
|
|
for j in range(2):
|
|
wind_direction.append(
|
|
wind[j + 1]
|
|
.split('"></span>')[0]
|
|
.replace('" class="', " / ")
|
|
)
|
|
else:
|
|
wind_direction.append(
|
|
wind[1].split('"></span>')[0].replace('" class="', " / ")
|
|
)
|
|
wind_power = weather_info.split("<i>")[-1].split("</i>")[0].strip()
|
|
|
|
wind_direction_str = (
|
|
" / ".join(wind_direction) if wind_direction else "未知"
|
|
)
|
|
|
|
weather_data.append(
|
|
{
|
|
"day": day,
|
|
"weather": weather,
|
|
"temperature": tem,
|
|
"wind_power": wind_power,
|
|
"wind_direction": wind_direction_str,
|
|
}
|
|
)
|
|
|
|
except (IndexError, ValueError) as e:
|
|
logger.warning(f"解析第{i + 1}天天气数据失败: {e}")
|
|
continue
|
|
|
|
return {
|
|
"city_name": city_name,
|
|
"weather_data": weather_data,
|
|
"query_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"timestamp": datetime.now().strftime("%Y年%m月%d日 %H:%M"),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取天气数据失败: {e}")
|
|
return None
|
|
|
|
|
|
@matcher.command("天气")
|
|
async def handle_weather(bot, event: MessageEvent, args: List[str]):
|
|
"""
|
|
处理天气查询指令
|
|
|
|
Args:
|
|
bot: Bot实例
|
|
event: 消息事件
|
|
args: 指令参数
|
|
"""
|
|
if not args:
|
|
# 显示支持的城市列表
|
|
city_list = "\n".join(
|
|
[f"{name}: {code}" for name, code in list(CITY_CODES.items())[:10]]
|
|
)
|
|
reply_msg = f"请指定城市名称或城市代码,例如:\n/天气 北京\n/天气 101010100\n\n支持的城市:\n{city_list}\n..."
|
|
await event.reply(reply_msg)
|
|
return
|
|
|
|
city_input = args[0].strip()
|
|
|
|
# 尝试匹配城市名称或直接使用城市代码
|
|
city_code = None
|
|
if city_input in CITY_CODES:
|
|
city_code = CITY_CODES[city_input]
|
|
elif re.match(r"^\d{9}$", city_input):
|
|
city_code = city_input
|
|
else:
|
|
# 尝试模糊匹配城市名称
|
|
for name, code in CITY_CODES.items():
|
|
if city_input in name:
|
|
city_code = code
|
|
break
|
|
|
|
if not city_code:
|
|
await event.reply(f"未找到城市 '{city_input}',请检查城市名称或使用城市代码。")
|
|
return
|
|
|
|
# 获取天气数据
|
|
await event.reply("正在查询天气信息,请稍候...")
|
|
weather_info = get_weather_data(city_code)
|
|
|
|
if not weather_info or not weather_info.get("weather_data"):
|
|
await event.reply("获取天气信息失败,请稍后重试。")
|
|
return
|
|
|
|
try:
|
|
# 渲染HTML模板为图片
|
|
base64_image = await image_manager.render_template_to_base64(
|
|
"weather.html", weather_info, output_name="weather.png"
|
|
)
|
|
|
|
if base64_image:
|
|
# 发送图片消息
|
|
await event.reply(MessageSegment.image(base64_image))
|
|
else:
|
|
await event.reply("生成天气图片失败,请稍后重试。")
|
|
|
|
except Exception as e:
|
|
logger.error(f"渲染天气图片失败: {e}")
|
|
await event.reply("生成天气图片时发生错误,请稍后重试。")
|