Files
fakabot/utils.py

482 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Consolidated utilities module: merged from utils/*.py
# Sections:
# - constants: STATUS_ZH, MSG
# - home: render_home
# - keyboards: build_payment_rows, row_back, row_home_admin, make_markup
# - misc: parse_date, fmt_ts, to_base36, bar
# - notify: notify_admin
# - sender: send_ephemeral
# - settings: ensure_settings_table, get_setting, set_setting
from __future__ import annotations
import asyncio
import datetime
import time
from types import SimpleNamespace
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
try:
from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup
except Exception: # 测试环境兜底桩:不影响真实运行
class InlineKeyboardButton: # type: ignore
def __init__(self, text: str, callback_data: Optional[str] = None):
self.text = text
self.callback_data = callback_data
class InlineKeyboardMarkup: # type: ignore
def __init__(self, inline_keyboard):
self.inline_keyboard = inline_keyboard
class Bot: # type: ignore
async def send_message(self, chat_id: int, text: str, **kwargs):
# 返回与 python-telegram-bot 类似的对象属性
return SimpleNamespace(message_id=1, chat_id=chat_id, text=text)
async def delete_message(self, chat_id: int, message_id: int):
return None
__all__ = [
# constants
"STATUS_ZH",
"MSG",
# home
"render_home",
# keyboards
"build_payment_rows",
"row_back",
"row_home_admin",
"make_markup",
# misc
"parse_date",
"fmt_ts",
"to_base36",
"bar",
# notify
"notify_admin",
# sender
"send_ephemeral",
# settings
"ensure_settings_table",
"get_setting",
"set_setting",
]
# ---------------- constants.py ----------------
# 统一的状态/文案常量
STATUS_ZH: Dict[str, str] = {
"pending": "待支付",
"paid": "已支付",
"processing": "处理中",
"completed": "已完成",
"cancelled": "已取消",
"expired": "已超时",
"refunded": "已退款",
"failed": "支付失败",
}
# 常用短句(可逐步接入以实现统一文案/i18n)
MSG: Dict[str, str] = {
"saved_and_back": "✅ 已保存变更,返回商品页…",
"created_and_back": "✅ 新商品已创建,返回列表…",
"refreshing": "正在刷新…",
"refreshed": "✅ 刷新完成",
}
# ---------------- home.py ----------------
# 类型注释仅作参考,不强制
_GetSetting = Callable[[str, Optional[str]], Optional[str]]
async def render_home(
chat_id: int,
cur,
START_CFG,
_get_setting: _GetSetting,
_delete_last_and_send_photo: Callable[..., Any],
_delete_last_and_send_text: Callable[..., Any],
*,
extra_rows: Optional[list[list[InlineKeyboardButton]]] = None,
):
"""渲染首页(封面 + 标题/简介 + 商品按钮)。
所有依赖通过参数传入,方便在不同模块中复用。
"""
try:
title = (_get_setting("home.title", (START_CFG.get("title") or "欢迎选购")) or "欢迎选购").strip()
except Exception:
title = "欢迎选购"
try:
intro = (_get_setting("home.intro", (START_CFG.get("intro") or "请选择下方商品进行购买")) or "请选择下方商品进行购买").strip()
except Exception:
intro = "请选择下方商品进行购买"
try:
cover = _get_setting("home.cover_url", START_CFG.get("cover_url") or None)
except Exception:
cover = None
try:
rows: List[Tuple[int, str, float]] = cur.execute(
"SELECT id, name, price FROM products WHERE status='on'"
).fetchall()
except Exception:
rows = []
# 每行商品数:从 settings 读取,可选 1-4,默认 2
try:
cols_raw = _get_setting("home.products_per_row", (START_CFG.get("products_per_row") or 2))
cols = int(cols_raw or 2)
except Exception:
cols = 2
cols = max(1, min(4, cols))
# 读取按钮文案模板。支持占位符:{name}、{price}
try:
btn_tpl = _get_setting("home.button_template", (START_CFG.get("button_template") or " {name} | ¥{price}")) or " {name} | ¥{price}"
except Exception:
btn_tpl = " {name} | ¥{price}"
buttons: List[List[InlineKeyboardButton]] = []
row_btn: List[InlineKeyboardButton] = []
for pid, name, price in rows:
try:
label = str(btn_tpl).replace("{name}", str(name)).replace("{price}", str(price))
except Exception:
label = f" {name} | ¥{price}"
row_btn.append(InlineKeyboardButton(label, callback_data=f"detail:{pid}"))
if len(row_btn) >= cols:
buttons.append(row_btn)
row_btn = []
if row_btn:
buttons.append(row_btn)
# 追加额外按钮行(例如:返回)
if extra_rows:
for r in extra_rows:
if isinstance(r, list) and r:
buttons.append(r)
# 客服入口改为独立命令 /support,此处不再在首页展示按钮
caption = f"{title}\n\n{intro}\n\n请选择商品:"
if cover:
try:
await _delete_last_and_send_photo(
chat_id,
cover,
caption=caption,
reply_markup=InlineKeyboardMarkup(buttons) if buttons else None,
)
return
except Exception:
pass
await _delete_last_and_send_text(
chat_id,
caption,
reply_markup=InlineKeyboardMarkup(buttons) if buttons else None,
)
# ---------------- keyboards.py ----------------
def build_payment_rows(
paycfg: Dict[str, dict],
*,
enabled_key: str = "enabled",
priority_key: str = "priority",
name_key: str = "name",
callback_fmt: str = "pay:{channel}:{pid}",
pid: Optional[str] = None,
max_cols: int = 2,
get_setting_func: Optional[Callable[[str, str], str]] = None,
skip_single: bool = False,
) -> List[List[InlineKeyboardButton]]:
"""
根据支付方式配置生成按钮行:
- 过滤掉未启用项(enabled=False 或数据库设置为关闭)
- 按 priority 从小到大排序(默认 100)
- 每行最多 max_cols 个
paycfg 示例:{
"alipay": {"name": "支付宝", "enabled": true, "priority": 10},
"wxpay": {"name": "微信", "enabled": false, "priority": 20},
}
"""
# 如果有get_setting_func,使用管理员设置的排序
if get_setting_func:
order_str = get_setting_func("payment.order", "alipay,wxpay,usdt_lemon,usdt_token188")
payment_order = order_str.split(",")
items: List[Tuple[int, str, str]] = []
for i, ch in enumerate(payment_order):
if ch not in paycfg:
continue
cfg = paycfg[ch]
# 检查配置文件中的enabled
if not cfg.get(enabled_key, True):
continue
# 检查数据库中的开关设置(管理员可控制)
db_enabled = get_setting_func(f"payment.{ch}.enabled", "true") == "true"
if not db_enabled:
continue
label = str(cfg.get(name_key) or ch)
items.append((i, ch, label)) # 使用顺序索引而不是priority
else:
# 回退到原来的priority排序
items: List[Tuple[int, str, str]] = []
for ch, cfg in paycfg.items():
# 检查配置文件中的enabled
if not cfg.get(enabled_key, True):
continue
pri = int(cfg.get(priority_key, 100) or 100)
label = str(cfg.get(name_key) or ch)
items.append((pri, ch, label))
items.sort(key=lambda x: x[0])
# 如果启用skip_single且只有一个支付方式,返回空列表
if skip_single and len(items) == 1:
return []
rows_kb: List[List[InlineKeyboardButton]] = []
row: List[InlineKeyboardButton] = []
for _, channel, label in items:
cb = callback_fmt.format(channel=channel, pid=pid or "")
row.append(InlineKeyboardButton(label, callback_data=cb))
if len(row) >= max_cols:
rows_kb.append(row)
row = []
if row:
rows_kb.append(row)
return rows_kb
def get_first_enabled_payment(
paycfg: Dict[str, dict],
*,
enabled_key: str = "enabled",
get_setting_func: Optional[Callable[[str, str], str]] = None,
) -> Optional[str]:
"""
获取第一个启用的支付方式
"""
# 如果有get_setting_func,使用管理员设置的排序
if get_setting_func:
order_str = get_setting_func("payment.order", "alipay,wxpay,usdt_lemon,usdt_token188")
payment_order = order_str.split(",")
for ch in payment_order:
if ch not in paycfg:
continue
cfg = paycfg[ch]
# 检查配置文件中的enabled
if not cfg.get(enabled_key, True):
continue
# 检查数据库中的开关设置(管理员可控制)
db_enabled = get_setting_func(f"payment.{ch}.enabled", "true") == "true"
if not db_enabled:
continue
return ch
else:
# 回退到原来的priority排序
items = []
for ch, cfg in paycfg.items():
# 检查配置文件中的enabled
if not cfg.get(enabled_key, True):
continue
pri = int(cfg.get("priority", 100) or 100)
items.append((pri, ch))
items.sort(key=lambda x: x[0])
if items:
return items[0][1]
return None
def row_back(callback_data: str, label: str = "⬅️ 返回") -> List[InlineKeyboardButton]:
return [InlineKeyboardButton(label, callback_data=callback_data)]
def row_home_admin(label: str = "🏠 返回面板") -> List[InlineKeyboardButton]:
return [InlineKeyboardButton(label, callback_data="adm:menu")]
def make_markup(rows: Sequence[Sequence[InlineKeyboardButton]] | None) -> Optional[InlineKeyboardMarkup]:
if not rows:
return None
return InlineKeyboardMarkup(list(rows))
# 统一的付款台控制行:用于“重新检查/取消付款”等
def rows_pay_console(otn: str) -> List[List[InlineKeyboardButton]]:
return [[
InlineKeyboardButton("🔄 我已支付,重新检查", callback_data=f"recheck:{otn}"),
InlineKeyboardButton("❌ 取消本次付款", callback_data=f"ask:cancel:{otn}"),
]]
# 通用确认对话行:yes/no 两个按钮在同一行
def build_confirm_rows(yes_cb: str, no_cb: str, yes_label: str = "✅ 确定", no_label: str = "↩️ 返回") -> List[List[InlineKeyboardButton]]:
return [[
InlineKeyboardButton(yes_label, callback_data=yes_cb),
InlineKeyboardButton(no_label, callback_data=no_cb),
]]
# ---------------- misc.py ----------------
def parse_date(s: str):
"""Parse YYYY-MM-DD to unix timestamp (seconds). Return None on failure/empty."""
try:
s = (s or "").strip()
if not s:
return None
y, m, d = s.split("-")
tm = time.strptime(f"{int(y):04d}-{int(m):02d}-{int(d):02d}", "%Y-%m-%d")
return int(time.mktime(tm))
except Exception:
return None
def fmt_ts(ts: int) -> str:
try:
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(ts or 0)))
except Exception:
return "-"
def to_base36(n: int) -> str:
"""Encode non-negative int to uppercase base36 string."""
try:
x = int(n)
if x < 0:
x = -x
if x == 0:
return "0"
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
s: List[str] = []
while x > 0:
x, r = divmod(x, 36)
s.append(chars[r])
return "".join(reversed(s))
except Exception:
return str(n)
def bar(val: float, maxv: float, width: int = 20) -> str:
if maxv <= 0:
return ""
n = int(round((float(val) / float(maxv)) * width))
n = max(0, min(width, n))
return "" * n + "·" * (width - n)
# ---------------- notify.py ----------------
async def notify_admin(
bot: Bot,
text: str,
admin_id: int,
*,
prefix: str = "[通知]",
attach_time: bool = True,
context: Optional[str] = None,
) -> None:
"""
统一的管理员通知工具。
参数:
- bot: Telegram Bot 实例
- text: 主体文本
- admin_id: 管理员聊天ID(从配置读取并传入)
- prefix: 前缀标签,如 "[错误]""[告警]""[通知]"
- attach_time: 是否追加时间戳
- context: 可选上下文信息,追加到消息末尾
"""
try:
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") if attach_time else None
parts: List[str] = []
if prefix:
parts.append(prefix)
parts.append(text.strip())
if context:
parts.append(str(context).strip())
if ts:
parts.append(f"@{ts}")
msg = " ".join(part for part in parts if part)
await bot.send_message(admin_id, text=msg)
except Exception:
# 通知失败不影响主流程
pass
# ---------------- sender.py ----------------
async def send_ephemeral(bot: Bot, chat_id: int, text: str, ttl: int = 5) -> Optional[int]:
"""
发送一条会在 ttl 秒后自动删除的临时文本消息。
:param bot: telegram.Bot 实例
:param chat_id: 目标聊天 ID
:param text: 文本内容
:param ttl: 存活时间(秒),默认 5
:return: 已发送消息的 message_id(若发送失败则返回 None)
"""
msg = None
try:
msg = await bot.send_message(chat_id=chat_id, text=text)
except Exception:
return None
async def _del_later(c_id: int, m_id: int, delay: int):
try:
await asyncio.sleep(max(1, int(delay)))
await bot.delete_message(chat_id=c_id, message_id=m_id)
except Exception:
pass
try:
asyncio.create_task(_del_later(msg.chat_id, msg.message_id, ttl))
except Exception:
pass
return getattr(msg, "message_id", None)
# ---------------- settings.py ----------------
def ensure_settings_table(cur, conn) -> None:
try:
cur.execute(
"CREATE TABLE IF NOT EXISTS settings(\n"
" key TEXT PRIMARY KEY,\n"
" value TEXT\n"
")"
)
conn.commit()
except Exception:
pass
def get_setting(cur, key: str, default: Optional[str] = "") -> Optional[str]:
try:
row = cur.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
if row and row[0] is not None:
return str(row[0])
except Exception:
pass
return default
def set_setting(cur, conn, key: str, value: str) -> None:
try:
cur.execute(
"INSERT INTO settings(key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
conn.commit()
except Exception:
pass