Files
fakabot/admin_panel.py

2107 lines
103 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import asyncio
import json
import time
import io
from typing import Callable, Any, Dict
from telegram import Update, InlineKeyboardButton
from utils import send_ephemeral
from utils import row_back, row_home_admin, make_markup
from utils import STATUS_ZH
from telegram.ext import ContextTypes, CommandHandler, CallbackQueryHandler, MessageHandler, filters, Application
from utils import render_home
from utils import parse_date as _parse_date, fmt_ts as _fmt_ts, to_base36 as _to_base36, bar as _bar
# 该模块通过依赖注入方式复用主程序的资源,避免循环依赖
# 使用方式:在 bot.py 中调用 register_admin_handlers(application, deps)
# deps 包含:
# - is_admin: Callable[[int], bool]
# - cur, conn: sqlite cursor/connection
# - CFG_PATH: str
# - START_CFG: dict (引用)
# - sync_products_from_config: Callable[[list], None] (可选)
# - _delete_last_and_send_text, _delete_last_and_send_photo: 发送工具
def register_admin_handlers(app: Application, deps: Dict[str, Any]):
is_admin: Callable[[int], bool] = deps["is_admin"]
cur = deps["cur"]
conn = deps["conn"]
CFG_PATH: str = deps["CFG_PATH"]
START_CFG: dict = deps["START_CFG"]
_send_text = deps["_delete_last_and_send_text"]
_send_photo = deps["_delete_last_and_send_photo"]
mark_paid_and_send_invite = deps.get("mark_paid_and_send_invite")
_get_setting = deps.get("_get_setting")
_set_setting = deps.get("_set_setting")
# ---------- helpers ----------
async def _guard_admin(update: Update) -> bool:
uid = update.effective_user.id
if not is_admin(uid):
try:
# 采用一次性提示,并在数秒后自动删除
await send_ephemeral(
update.get_bot(),
update.effective_chat.id,
"✨ 嗨~这里是官方店后台,您不是管理员呢,无法为您展示😯~",
ttl=5,
)
except Exception:
pass
return False
return True
# settings 表读写由主程序注入;此处不再重复创建表,保持轻量。
# ---------- date/misc helpers ----------
# 复用 utils.misc 中的通用实现(已通过别名导入同名变量)
# 确保 products 表具备 sort 列(防止主程序未先运行迁移时,商品管理点开无响应)
def _ensure_product_sort_column():
try:
# 简单探测列是否存在
cur.execute("SELECT sort FROM products LIMIT 1")
_ = cur.fetchone()
return
except Exception:
pass
# 列不存在则尝试添加并回填
try:
cur.execute("ALTER TABLE products ADD COLUMN sort INTEGER")
conn.commit()
except Exception:
pass
try:
cur.execute("UPDATE products SET sort = id WHERE sort IS NULL")
conn.commit()
except Exception:
pass
async def _admin_menu(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _guard_admin(update):
return
kb = make_markup([
[InlineKeyboardButton("📦 商品管理", callback_data="adm:plist:1"), InlineKeyboardButton("🖼️ 主页编辑", callback_data="adm:home")],
[InlineKeyboardButton("📑 订单管理", callback_data="adm:olist:1:all"), InlineKeyboardButton("📊 统计报表", callback_data="adm:ostat")],
[InlineKeyboardButton("💳 支付设置", callback_data="adm:pay"), InlineKeyboardButton("📢 公告设置", callback_data="adm:announcement")],
[InlineKeyboardButton("🆘 客服设置", callback_data="adm:support")],
[InlineKeyboardButton("🧹 优化数据库", callback_data="adm:vacuum")],
])
await _send_text(update.effective_chat.id, "🔧 管理面板:请选择功能", reply_markup=kb)
async def _send_home_menu(chat_id: int):
cur_cols = (_get_setting("home.products_per_row", str(START_CFG.get("products_per_row") or "2")) or "2").strip()
cur_tpl = (_get_setting("home.button_template", (START_CFG.get("button_template") or " {name} | ¥{price}")) or " {name} | ¥{price}")
# 简短描述
def _tpl_desc(t: str) -> str:
t = str(t)
if "{name}" in t and "{price}" in t:
# 判断常见两种
if "|" in t:
return "名称 | 价格"
if "-" in t:
return "价格 - 名称"
return "名称+价格"
if "{name}" in t and "{price}" not in t:
return "仅名称"
if "{price}" in t and "{name}" not in t:
return "仅价格"
return "自定义"
cur_tpl_desc = _tpl_desc(cur_tpl)
kb = make_markup([
[InlineKeyboardButton("✏️ 改标题", callback_data="adm:home_title"), InlineKeyboardButton("📝 改简介", callback_data="adm:home_intro")],
[InlineKeyboardButton("🖼️ 改封面链接", callback_data="adm:home_cover"), InlineKeyboardButton("👀 预览主页", callback_data="adm:home_preview")],
[InlineKeyboardButton(f"🏷️ 按钮文案:{cur_tpl_desc}", callback_data="adm:home_btntpl"), InlineKeyboardButton(f"🧩 每行商品数:{cur_cols}", callback_data="adm:home_cols")],
row_home_admin(),
])
cur_title = (_get_setting("home.title", (START_CFG.get("title") or "")).strip())
cur_intro = (_get_setting("home.intro", (START_CFG.get("intro") or "")).strip())
cur_cover = (_get_setting("home.cover_url", (START_CFG.get("cover_url") or "")).strip())
text = (
f"主页设置\n"
f"标题:{cur_title or '-'}\n"
f"简介:{(cur_intro or '-')[:200]}\n"
f"封面:{cur_cover or '-'}\n"
f"每行商品数:{cur_cols} (1-4)\n"
f"按钮文案:{cur_tpl_desc}"
)
await _send_text(chat_id, text, reply_markup=kb)
async def _send_home_preview(chat_id: int):
# 复用通用首页渲染,并在末尾追加“返回”按钮
await render_home(
chat_id,
cur,
START_CFG,
_get_setting,
_send_photo,
_send_text,
extra_rows=[row_back("adm:home")],
)
async def _send_product_page(chat_id: int, pid: str):
row = cur.execute("SELECT id, name, price, full_description, cover_url, COALESCE(status,'on'), COALESCE(deliver_type,'join_group'), COALESCE(card_fixed,'') FROM products WHERE id=?", (pid,)).fetchone()
if not row:
kb = make_markup([
row_back("adm:plist:1"),
row_home_admin(),
])
await _send_text(chat_id, "⚠️ 未找到该商品", reply_markup=kb)
return
_pid, name, price, desc, cover, status, deliver_type, card_fixed_val = row
# 统计卡池余量
try:
stock_row = cur.execute("SELECT COUNT(*) FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL", (_pid,)).fetchone()
stock_cnt = int(stock_row[0] or 0)
except Exception:
stock_cnt = 0
kb = make_markup([
[InlineKeyboardButton("✏️ 改名称", callback_data=f"adm:edit_name:{_pid}"), InlineKeyboardButton("💰 改价格", callback_data=f"adm:edit_price:{_pid}")],
[InlineKeyboardButton("📝 改详情", callback_data=f"adm:edit_desc:{_pid}"), InlineKeyboardButton("🖼️ 改封面", callback_data=f"adm:edit_cover:{_pid}")],
[InlineKeyboardButton("🚚 发货方式", callback_data=f"adm:edit_deliver:{_pid}"), InlineKeyboardButton("🧷 通用卡密", callback_data=f"adm:edit_card_fixed:{_pid}")],
[InlineKeyboardButton("🔑 卡密库存", callback_data=f"adm:card_pool:{_pid}:1"), InlineKeyboardButton("👥 改群ID", callback_data=f"adm:edit_group:{_pid}")],
[InlineKeyboardButton("⏯ 上/下架", callback_data=f"adm:toggle:{_pid}"), InlineKeyboardButton("🗑️ 删除", callback_data=f"adm:del:{_pid}")],
row_back("adm:plist:1"),
])
# 本地化发货方式
_deliver_label = {"join_group": "自动拉群", "card_fixed": "通用卡密", "card_pool": "卡池"}.get(str(deliver_type or ""), str(deliver_type or "-"))
text = (
f"商品 #{_pid}\n"
f"名称:{name}\n"
f"价格:¥{price}\n"
f"状态:{'上架' if (status or 'on')=='on' else '下架'}\n"
f"封面:{cover or '-'}\n"
f"发货方式:{_deliver_label}\n"
f"卡池余量:{stock_cnt}\n"
f"详情:{(desc or '-')[:300]}"
)
if cover:
try:
await _send_photo(chat_id, cover, caption=text, reply_markup=kb)
return
except Exception:
pass
await _send_text(chat_id, text, reply_markup=kb)
# ---------- direct render helpers ----------
def _build_order_status_row(status_key: str):
# 合并“已支付+已完成”为“已成交(done)”
filters = [
("全部", "all"),
("待支付", "pending"),
("已成交", "done"),
("已取消", "cancelled"),
]
frow = []
for label, key in filters:
prefix = "" if key == status_key else ""
frow.append(InlineKeyboardButton(f"{prefix}{label}", callback_data=f"adm:olist:1:{key}"))
return frow
def _build_order_pagination(page: int, total_pages: int, status_key: str):
nav = []
if page > 1:
nav.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"adm:olist:{page-1}:{status_key}"))
if page < total_pages:
nav.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"adm:olist:{page+1}:{status_key}"))
return nav
def _build_order_toolbar(status_key: str, page: int, qkw: str, start_ts: int | None, end_ts: int | None):
# 同一行放置:时间范围 | 搜索
return [[
InlineKeyboardButton("⏱️ 设置时间范围", callback_data=f"adm:of_setrange:{status_key}:{page}"),
InlineKeyboardButton("🔎 搜索", callback_data=f"adm:of_search:{status_key}:{page}"),
]]
def _build_stat_toolbar():
# 统计快捷范围:仅保留 今日/本月/本年
return [
[InlineKeyboardButton("📅 今日", callback_data="adm:sf_today"), InlineKeyboardButton("📅 本月", callback_data="adm:sf_month"), InlineKeyboardButton("📅 本年", callback_data="adm:sf_year")],
row_home_admin(),
]
async def _send_order_list(chat_id: int, page: int, status_key: str, ctx: ContextTypes.DEFAULT_TYPE):
page_size = 10
ofilter = ctx.user_data.get("adm_ofilter", {})
start_ts = ofilter.get("start_ts")
end_ts = ofilter.get("end_ts")
osearch = ctx.user_data.get("adm_osearch", {})
qkw = (osearch.get("q") or "").strip()
where = []
params = []
# 状态筛选:all 不限制;done = paid 或 completed
if status_key and status_key != "all":
if status_key == "done":
where.append("o.status IN ('paid','completed')")
else:
where.append("o.status=?")
params.append(status_key)
if start_ts:
where.append("o.create_time>=?")
params.append(int(start_ts))
if end_ts:
where.append("o.create_time<=?")
params.append(int(end_ts))
if qkw:
or_clauses = [
"CAST(o.user_id AS TEXT)=?",
"CAST(o.product_id AS TEXT)=?",
"p.name LIKE ?",
"o.out_trade_no LIKE ?",
]
where.append("(" + " OR ".join(or_clauses) + ")")
params.extend([qkw, qkw, f"%{qkw}%", f"%{qkw}%"])
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
# 先计算总数/总页数,再夹紧页码,避免空页
# 注意:当搜索包含商品名条件(p.name)时,统计也需要 JOIN products
total = cur.execute(
f"SELECT COUNT(*) FROM orders o LEFT JOIN products p ON p.id=o.product_id {where_sql}",
(*params,),
).fetchone()[0]
total_pages = max(1, (total + page_size - 1) // page_size)
# 夹紧页码
if page < 1:
page = 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
rows = cur.execute(
f"SELECT o.id, o.user_id, o.product_id, o.amount, o.payment_method, COALESCE(o.status,'pending'), o.create_time, o.out_trade_no, p.name "
f"FROM orders o LEFT JOIN products p ON p.id=o.product_id {where_sql} "
f"ORDER BY o.id DESC LIMIT ? OFFSET ?",
(*params, page_size, offset),
).fetchall()
buttons = []
for oid, uid, pid, amount, pm, st, cts, out_trade_no, pname in rows:
# 显示为商户单号(out_trade_no)最后一段的“纯数字优先”后缀(如 MJ6K3A-89899 => 89899)。
# 若该段无数字,则显示该段原样;再兜底 Base36(id)。
try:
part = (out_trade_no or "").split("-")[-1]
digits = "".join(ch for ch in part if ch.isdigit()) if part else ""
suffix = digits or part or _to_base36(oid)
except Exception:
suffix = _to_base36(oid)
title = f"#{suffix}"
buttons.append([
InlineKeyboardButton(title, callback_data=f"adm:o:{oid}:{status_key}:{page}"),
InlineKeyboardButton("🗑️ 删除", callback_data=f"adm:odelc:{oid}:{status_key}:{page}")
])
# 筛选状态按钮行
frow = _build_order_status_row(status_key)
if frow:
buttons.append(frow)
# 分页按钮行
nav = _build_order_pagination(page, total_pages, status_key)
if nav:
buttons.append(nav)
# 时间范围显示与设置
sr_text = "未设置"
if start_ts or end_ts:
s = time.strftime('%Y-%m-%d', time.localtime(int(start_ts))) if start_ts else "-"
e = time.strftime('%Y-%m-%d', time.localtime(int(end_ts))) if end_ts else "-"
sr_text = f"{s} ~ {e}"
# 搜索与工具区
q_text = qkw if qkw else "(无)"
for row_btns in _build_order_toolbar(status_key, page, qkw, start_ts, end_ts):
buttons.append(row_btns)
buttons.append(row_home_admin())
# 展示中文状态文案
label_map = {"all": "全部", "pending": "待支付", "done": "已成交", "cancelled": "已取消"}
show_status = label_map.get(status_key, status_key)
await _send_text(chat_id, f"📑 订单列表(第 {page}/{total_pages} 页)\n状态:{show_status}\n时间:{sr_text}\n搜索:{q_text}", reply_markup=make_markup(buttons))
return
async def _send_stat_page(chat_id: int, ctx: ContextTypes.DEFAULT_TYPE):
sfilter = ctx.user_data.get("adm_sfilter", {})
start_ts = sfilter.get("start_ts")
end_ts = sfilter.get("end_ts")
where = []
params = []
if start_ts:
where.append("o.create_time>=?")
params.append(int(start_ts))
if end_ts:
where.append("o.create_time<=?")
params.append(int(end_ts))
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
where_paid_sql = where_sql + ((" AND " if where else " WHERE ") + "o.status IN ('paid','completed')")
row_all = cur.execute(f"SELECT COUNT(*) FROM orders o {where_sql}", (*params,)).fetchone()
row_paid = cur.execute(
f"SELECT COUNT(*), COALESCE(SUM(amount),0) FROM orders o {where_paid_sql}",
(*params,)
).fetchone()
o_all = int(row_all[0] or 0)
o_paid = int(row_paid[0] or 0)
amt_paid = float(row_paid[1] or 0.0)
conv_rate = (o_paid / o_all * 100) if o_all > 0 else 0.0
TOPN = 5
base_where = ("WHERE " + " AND ".join(where) + (" AND " if where else "") + "o.status IN ('paid','completed')") if where else "WHERE o.status IN ('paid','completed')"
prod_rows = cur.execute(
"SELECT COALESCE(p.name,'商品') AS name, COUNT(o.id) AS cnt, COALESCE(SUM(o.amount),0) AS amt "
"FROM orders o LEFT JOIN products p ON p.id=o.product_id "
+ base_where +
" GROUP BY o.product_id ORDER BY amt DESC LIMIT ?",
(*params, TOPN)
).fetchall()
max_amt = max([float(r[2] or 0) for r in prod_rows] + [0])
lines = []
for name, cnt, amt in prod_rows:
bar = _bar(float(amt or 0), max_amt, 20)
lines.append(f"{name[:12]:<12} ¥{float(amt or 0):>8.2f} | {bar} ({int(cnt)}单)")
sr_text = "未设置"
if start_ts or end_ts:
s = time.strftime('%Y-%m-%d', time.localtime(int(start_ts))) if start_ts else "-"
e = time.strftime('%Y-%m-%d', time.localtime(int(end_ts))) if end_ts else "-"
sr_text = f"{s} ~ {e}"
text = (
"📊 统计概览\n"
f"时间范围:{sr_text}\n"
f"成交订单:{o_paid} 单,金额 ¥{amt_paid:.2f}\n"
f"下单/成交:{o_all}/{o_paid},转化率:{conv_rate:.1f}%\n"
"\n🏆 Top 商品(按金额)\n" + ("\n".join(lines) if lines else "(暂无数据)")
)
kb = make_markup(_build_stat_toolbar())
await _send_text(chat_id, text, reply_markup=kb)
async def cmd_admin(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
await _admin_menu(update, ctx)
async def adm_router(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _guard_admin(update):
return
q = update.callback_query
# 一些操作(如清除筛选/搜索/统计筛选)会在同一回调中“伪路由”到其它分支,
# 可能导致对同一个 callback_query 重复 answer,从而抛出异常并中断刷新。
# 这里做兼容,忽略重复 answer 的异常,保证后续渲染继续执行。
try:
await q.answer()
except Exception:
pass
data = q.data # adm:...
parts = data.split(":")
action = parts[1] if len(parts) > 1 else ""
# 清理等待态,避免串台(除编辑步骤继续输入场景外)
# 仅在进入新页面时清理
if action in {"plist", "p", "home", "pnew", "menu", "olist", "o", "ostat", "of_setrange", "of_search", "sf_today", "sf_month", "sf_year"}:
ctx.user_data.pop("adm_wait", None)
# 商品列表(分页 + 行内排序 上/下)
if action == "plist":
page = int(parts[2]) if len(parts) > 2 else 1
page_size = 10
offset = (page - 1) * page_size
# 防御性迁移:确保存在 sort 列并已回填
_ensure_product_sort_column()
rows = cur.execute(
"SELECT id, name, price, COALESCE(status,'on'), COALESCE(sort, id) AS s FROM products ORDER BY s DESC, id DESC LIMIT ? OFFSET ?",
(page_size, offset),
).fetchall()
total = cur.execute("SELECT COUNT(*) FROM products").fetchone()[0]
total_pages = max(1, (total + page_size - 1) // page_size)
buttons = []
row_btns = []
for pid, name, price, status, _ in rows:
# 每行两个,不显示价格
row_btns.append(InlineKeyboardButton(f"{pid} {name}", callback_data=f"adm:p:{pid}"))
if len(row_btns) >= 2:
buttons.append(row_btns)
row_btns = []
if row_btns:
buttons.append(row_btns)
# 底部操作区:一行两个(排序本页 | 新增商品)
buttons.append([
InlineKeyboardButton("✏️ 排序本页", callback_data=f"adm:psort:{page}"),
InlineKeyboardButton(" 新增商品", callback_data="adm:pnew"),
])
# 分页导航
nav = []
if page > 1:
nav.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"adm:plist:{page-1}"))
if page < total_pages:
nav.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"adm:plist:{page+1}"))
if nav:
buttons.append(nav)
buttons.append(row_home_admin())
await _send_text(update.effective_chat.id, f"📦 商品列表(第 {page}/{total_pages} 页)\n当前排序:自定义", reply_markup=make_markup(buttons))
return
# 首页:每行商品数设置入口
if action == "home_cols":
kb = make_markup([
[InlineKeyboardButton("1", callback_data="adm:home_cols_set:1"), InlineKeyboardButton("2", callback_data="adm:home_cols_set:2"), InlineKeyboardButton("3", callback_data="adm:home_cols_set:3"), InlineKeyboardButton("4", callback_data="adm:home_cols_set:4")],
row_back("adm:home"),
])
await _send_text(update.effective_chat.id, "请选择每行商品数量(1-4):", reply_markup=kb)
return
# 首页:保存每行商品数
if action == "home_cols_set":
val = parts[2] if len(parts) > 2 else "2"
try:
n = max(1, min(4, int(val)))
except Exception:
n = 2
_set_setting("home.products_per_row", str(n))
await _send_text(update.effective_chat.id, f"已设置每行商品数为:{n}", reply_markup=make_markup([row_back("adm:home")]))
return
# 支付设置主页
if action == "pay":
cur_cols = (_get_setting("ui.payment_cols", str(START_CFG.get("payment_cols") or "3")) or "3").strip()
# 获取支付方式开关状态
def get_payment_status(channel):
return _get_setting(f"payment.{channel}.enabled", "true") == "true"
# 构建支付方式开关按钮
payment_buttons = []
# 获取支付方式排序
def get_payment_order():
order_str = _get_setting("payment.order", "alipay,wxpay,usdt_lemon,usdt_token188")
return order_str.split(",")
def get_payment_name(channel):
names = {
"alipay": "支付宝",
"wxpay": "微信",
"usdt_lemon": "USDT (柠檬)",
"usdt_token188": "USDT(TRC20)"
}
return names.get(channel, channel)
# 按照保存的顺序显示支付方式
payment_order = get_payment_order()
for i, channel in enumerate(payment_order):
if channel not in ["alipay", "wxpay", "usdt_lemon", "usdt_token188"]:
continue
name = get_payment_name(channel)
enabled = get_payment_status(channel)
status_icon = "" if enabled else ""
# 构建按钮行:开关 + 上移 + 下移
row = [
InlineKeyboardButton(
f"{status_icon} {name}",
callback_data=f"adm:pay_toggle:{channel}"
)
]
# 添加上移按钮(不是第一个)
if i > 0:
row.append(InlineKeyboardButton("⬆️", callback_data=f"adm:pay_up:{channel}"))
else:
row.append(InlineKeyboardButton(" ", callback_data="adm:noop")) # 占位
# 添加下移按钮(不是最后一个)
if i < len(payment_order) - 1:
row.append(InlineKeyboardButton("⬇️", callback_data=f"adm:pay_down:{channel}"))
else:
row.append(InlineKeyboardButton(" ", callback_data="adm:noop")) # 占位
payment_buttons.append(row)
kb = make_markup([
[InlineKeyboardButton(f"🧩 每行支付按钮:{cur_cols}", callback_data="adm:pay_cols")],
*payment_buttons,
row_home_admin(),
])
text = (
"💳 支付设置\n"
f"每行按钮数:{cur_cols} (1-4)\n"
"\n📋 支付方式管理:\n"
"• 点击支付方式名称:开启/关闭\n"
"• 点击 ⬆️ ⬇️:调整显示顺序"
)
await _send_text(update.effective_chat.id, text, reply_markup=kb)
return
# 支付设置:选择每行按钮数
if action == "pay_cols":
kb = make_markup([
[InlineKeyboardButton("1", callback_data="adm:pay_cols_set:1"), InlineKeyboardButton("2", callback_data="adm:pay_cols_set:2"), InlineKeyboardButton("3", callback_data="adm:pay_cols_set:3"), InlineKeyboardButton("4", callback_data="adm:pay_cols_set:4")],
row_back("adm:pay"),
])
await _send_text(update.effective_chat.id, "请选择每行支付按钮数量(1-4):", reply_markup=kb)
return
# 支付设置:切换支付方式开关
if action == "pay_toggle":
channel = parts[2] if len(parts) > 2 else ""
if channel:
# 获取当前状态
current_status = _get_setting(f"payment.{channel}.enabled", "true") == "true"
# 切换状态
new_status = "false" if current_status else "true"
_set_setting(f"payment.{channel}.enabled", new_status)
# 获取支付方式名称
try:
with open(CFG_PATH, "r", encoding="utf-8") as f:
cfg_content = _strip_json_comments(f.read())
cfg = json.loads(cfg_content)
payments = cfg.get("PAYMENTS", {})
name = payments.get(channel, {}).get("name", channel)
except Exception:
name = channel
status_text = "开启" if new_status == "true" else "关闭"
await send_ephemeral(
update.get_bot(),
update.effective_chat.id,
f"{name}{status_text}",
ttl=2
)
# 刷新支付设置页面
await adm_router(type("obj", (), {
"callback_query": type("q", (), {"data": "adm:pay"}),
"effective_user": update.effective_user,
"effective_chat": update.effective_chat,
"get_bot": update.get_bot
})(), ctx)
return
# 支付方式上移
if action == "pay_up":
channel = parts[2] if len(parts) > 2 else ""
if channel:
# 获取当前排序
order_str = _get_setting("payment.order", "alipay,wxpay,usdt_lemon,usdt_token188")
order_list = order_str.split(",")
# 找到当前位置并上移
if channel in order_list:
current_index = order_list.index(channel)
if current_index > 0:
# 交换位置
order_list[current_index], order_list[current_index - 1] = order_list[current_index - 1], order_list[current_index]
# 保存新排序
new_order = ",".join(order_list)
_set_setting("payment.order", new_order)
await send_ephemeral(
update.get_bot(),
update.effective_chat.id,
f"✅ 已上移",
ttl=1
)
# 刷新页面
await adm_router(type("obj", (), {
"callback_query": type("q", (), {"data": "adm:pay"}),
"effective_user": update.effective_user,
"effective_chat": update.effective_chat,
"get_bot": update.get_bot
})(), ctx)
return
# 支付方式下移
if action == "pay_down":
channel = parts[2] if len(parts) > 2 else ""
if channel:
# 获取当前排序
order_str = _get_setting("payment.order", "alipay,wxpay,usdt_lemon,usdt_token188")
order_list = order_str.split(",")
# 找到当前位置并下移
if channel in order_list:
current_index = order_list.index(channel)
if current_index < len(order_list) - 1:
# 交换位置
order_list[current_index], order_list[current_index + 1] = order_list[current_index + 1], order_list[current_index]
# 保存新排序
new_order = ",".join(order_list)
_set_setting("payment.order", new_order)
await send_ephemeral(
update.get_bot(),
update.effective_chat.id,
f"✅ 已下移",
ttl=1
)
# 刷新页面
await adm_router(type("obj", (), {
"callback_query": type("q", (), {"data": "adm:pay"}),
"effective_user": update.effective_user,
"effective_chat": update.effective_chat,
"get_bot": update.get_bot
})(), ctx)
return
# 空操作(占位按钮)
if action == "noop":
await query.answer()
return
# 支付设置:保存每行按钮数
if action == "pay_cols_set":
val = parts[2] if len(parts) > 2 else "3"
try:
n = max(1, min(4, int(val)))
except Exception:
n = 3
_set_setting("ui.payment_cols", str(n))
await _send_text(update.effective_chat.id, f"已设置每行支付按钮数为:{n}", reply_markup=make_markup([row_back("adm:pay")]))
return
# 首页:按钮文案模板设置入口
if action == "home_btntpl":
kb = make_markup([
[InlineKeyboardButton("名称 | 价格", callback_data="adm:home_btntpl_set:n_p"), InlineKeyboardButton("价格 - 名称", callback_data="adm:home_btntpl_set:p_n")],
[InlineKeyboardButton("仅名称(隐藏价格)", callback_data="adm:home_btntpl_set:n_only")],
row_back("adm:home"),
])
await _send_text(update.effective_chat.id, "请选择按钮文案模板:", reply_markup=kb)
return
# 首页:保存按钮文案模板
if action == "home_btntpl_set":
key = parts[2] if len(parts) > 2 else "n_p"
mapping = {
"n_p": " {name} | ¥{price}",
"p_n": " ¥{price} - {name}",
"n_only": " {name}",
}
tpl = mapping.get(key, " {name} | ¥{price}")
_set_setting("home.button_template", tpl)
await _send_text(update.effective_chat.id, "已更新按钮文案模板", reply_markup=make_markup([row_back("adm:home")]))
return
# 商品排序(整页):进入文本输入模式
if action == "psort":
# 格式:adm:psort:{page}
page = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else 1
page_size = 10
offset = (page - 1) * page_size
_ensure_product_sort_column()
rows = cur.execute(
"SELECT id, COALESCE(sort, id) AS s FROM products ORDER BY s DESC, id DESC LIMIT ? OFFSET ?",
(page_size, offset),
).fetchall()
ids_line = " ".join(str(r[0]) for r in rows)
ctx.user_data["adm_wait"] = {"type": "psort", "data": {"page": page, "ids": [int(r[0]) for r in rows]}}
kb = make_markup([row_back(f"adm:plist:{page}")])
await _send_text(update.effective_chat.id, f"请输入该页的新顺序(仅数字,空格分隔),例如:{ids_line}\n未写到的将按原顺序排在后面。", reply_markup=kb)
return
# 商品排序:上移一位
if action == "pmoveu":
# 格式:adm:pmoveu:{pid}:{page}
if len(parts) < 3:
return
pid = int(parts[2])
page = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 1
_ensure_product_sort_column()
try:
row = cur.execute("SELECT id, COALESCE(sort, id) AS s FROM products WHERE id=?", (pid,)).fetchone()
if row:
cur_id, cur_s = int(row[0]), int(row[1])
# 找到在“当前显示顺序(按 s DESC, id DESC)”下的前一个(更靠上)邻居
nb = cur.execute(
"SELECT id, COALESCE(sort, id) AS s FROM products "
"WHERE (COALESCE(sort, id) > ?) OR (COALESCE(sort, id) = ? AND id > ?) "
"ORDER BY COALESCE(sort, id) ASC, id ASC LIMIT 1",
(cur_s, cur_s, cur_id)
).fetchone()
if nb:
nb_id, nb_s = int(nb[0]), int(nb[1])
# 采用“提升到邻居之上一格”的策略,避免相等值交换无效
new_s = nb_s + 1
cur.execute("UPDATE products SET sort=? WHERE id=?", (new_s, cur_id))
conn.commit()
try:
await update.callback_query.answer("已上移", show_alert=False)
except Exception:
pass
except Exception:
pass
# 刷新当前页
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:plist:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 商品排序:下移一位
if action == "pmoved":
# 格式:adm:pmoved:{pid}:{page}
if len(parts) < 3:
return
pid = int(parts[2])
page = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 1
_ensure_product_sort_column()
try:
row = cur.execute("SELECT id, COALESCE(sort, id) AS s FROM products WHERE id=?", (pid,)).fetchone()
if row:
cur_id, cur_s = int(row[0]), int(row[1])
# 找到在“当前显示顺序(按 s DESC, id DESC)”下的后一个(更靠下)邻居
nb = cur.execute(
"SELECT id, COALESCE(sort, id) AS s FROM products "
"WHERE (COALESCE(sort, id) < ?) OR (COALESCE(sort, id) = ? AND id < ?) "
"ORDER BY COALESCE(sort, id) DESC, id DESC LIMIT 1",
(cur_s, cur_s, cur_id)
).fetchone()
if nb:
nb_id, nb_s = int(nb[0]), int(nb[1])
new_s = nb_s - 1
cur.execute("UPDATE products SET sort=? WHERE id=?", (new_s, cur_id))
conn.commit()
try:
await update.callback_query.answer("已下移", show_alert=False)
except Exception:
pass
except Exception:
pass
# 刷新当前页
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:plist:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 订单列表(分页 + 状态 + 时间范围筛选)
if action == "olist":
page = 1
status_key = "all"
if len(parts) > 2 and parts[2].isdigit():
page = int(parts[2])
if len(parts) > 3:
status_key = parts[3]
await _send_order_list(update.effective_chat.id, page, status_key, ctx)
return
# 设置订单筛选时间范围(开始)
if action == "of_setrange":
status_key = parts[2] if len(parts) > 2 else "all"
page = parts[3] if len(parts) > 3 else "1"
ctx.user_data["adm_wait"] = {"type": "of_start", "data": {"status_key": status_key, "page": page}}
await _send_text(update.effective_chat.id, "请输入【开始日期】(YYYY-MM-DD),留空表示不限制:", reply_markup=make_markup([row_back(f"adm:olist:{page}:{status_key}")]))
return
# 搜索(启动)
if action == "of_search":
status_key = parts[2] if len(parts) > 2 else "all"
page = parts[3] if len(parts) > 3 else "1"
ctx.user_data["adm_wait"] = {"type": "osearch_q", "data": {"status_key": status_key, "page": page}}
await _send_text(update.effective_chat.id, "请输入搜索关键词:\n- 支持用户ID/商品ID(精确)\n- 支持商品名/商户单号(模糊)", reply_markup=make_markup([row_back(f"adm:olist:{page}:{status_key}")]))
return
# 单个商品菜单
if action == "p":
pid = parts[2]
await _send_product_page(update.effective_chat.id, pid)
return
# 单个订单详情
if action == "o":
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_back("adm:olist:1:all")]))
return
oid = parts[2]
status_key = parts[3] if len(parts) > 3 else "all"
back_page = parts[4] if len(parts) > 4 else "1"
row = cur.execute(
"SELECT o.id, o.user_id, o.product_id, o.amount, o.payment_method, COALESCE(o.status,'pending'), o.create_time, o.out_trade_no, p.name "
"FROM orders o LEFT JOIN products p ON p.id=o.product_id WHERE o.id=?",
(oid,)
).fetchone()
if not row:
await _send_text(update.effective_chat.id, "未找到该订单", reply_markup=make_markup([row_back(f"adm:olist:{back_page}:{status_key}")]))
return
_oid, uid, pid, amount, pm, st, cts, out_trade_no, pname = row
txt = (
f"订单 #{_oid}\n"
f"用户ID{uid}\n"
f"商品:{pname or pid}\n"
f"金额:¥{amount}\n"
f"支付方式:{pm}\n"
f"状态:{STATUS_ZH.get((st or '').lower(), st)}\n"
f"下单时间:{_fmt_ts(cts)}\n"
f"商户单号:{out_trade_no}"
)
btn_rows = []
# 待支付可追加“标记为已支付”
if (st or "").lower() == "pending":
btn_rows.append([InlineKeyboardButton("✅ 标记为已支付", callback_data=f"adm:opaidc:{_oid}:{status_key}:{back_page}")])
btn_rows.append(row_back(f"adm:olist:{back_page}:{status_key}"))
btn_rows.append(row_home_admin())
kb = make_markup(btn_rows)
await _send_text(update.effective_chat.id, txt, reply_markup=kb)
return
# 数据库优化:VACUUM
if action == "vacuum":
try:
# VACUUM 需要在非事务状态下执行;这里直接使用连接执行
cur.execute("VACUUM")
conn.commit()
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已完成数据库优化 (VACUUM)")
except Exception:
pass
except Exception:
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ VACUUM 执行失败")
except Exception:
pass
# 返回主菜单
await _admin_menu(update, ctx)
return
# 客服设置主页
if action == "support":
cur_val = (_get_setting("support.contact", "")).strip()
show = cur_val if cur_val else "(未设置)"
kb = make_markup([
[InlineKeyboardButton("✏️ 修改客服联系方式", callback_data="adm:support_edit")],
row_home_admin(),
])
text = (
"🆘 客服设置\n"
f"当前值:{show}\n\n"
"支持以下格式:\n"
"- 直接填写链接:https://t.me/username\n"
"- 用户名:@username\n"
"- 纯文本:将作为说明文本展示给用户\n"
)
await _send_text(update.effective_chat.id, text, reply_markup=kb, disable_web_page_preview=True)
return
# 客服设置:进入编辑
if action == "support_edit":
ctx.user_data["adm_wait"] = {"type": "support_contact", "data": {}}
kb = make_markup([row_back("adm:support")])
await _send_text(update.effective_chat.id, "请输入新的【客服联系方式】:", reply_markup=kb)
return
# 公告设置:查看/编辑
if action == "announcement":
# 获取各支付方式的公告开关状态
usdt_enabled = _get_setting("announcement.usdt.enabled", "true") == "true"
usdt_token188_enabled = _get_setting("announcement.usdt_token188.enabled", "true") == "true"
alipay_enabled = _get_setting("announcement.alipay.enabled", "true") == "true"
wxpay_enabled = _get_setting("announcement.wxpay.enabled", "true") == "true"
status_text = (
f"📊 各支付方式公告状态:\n"
f"• USDT(柠檬): {'✅ 已启用' if usdt_enabled else '❌ 已关闭'}\n"
f"• USDT(TOKEN188): {'✅ 已启用' if usdt_token188_enabled else '❌ 已关闭'}\n"
f"• 支付宝: {'✅ 已启用' if alipay_enabled else '❌ 已关闭'}\n"
f"• 微信支付: {'✅ 已启用' if wxpay_enabled else '❌ 已关闭'}\n\n"
)
kb = make_markup([
[InlineKeyboardButton("✏️ USDT公告", callback_data="adm:announcement_edit:usdt")],
[InlineKeyboardButton("✏️ 支付宝/微信公告", callback_data="adm:announcement_edit:alipay_wxpay")],
[InlineKeyboardButton("⚙️ 公告开关设置", callback_data="adm:announcement_switches")],
row_home_admin(),
])
text = (
"📢 支付公告设置\n\n"
f"{status_text}"
"💡 提示:\n"
"• USDT和支付宝/微信使用不同的公告内容\n"
"• 用户选择支付方式时会显示对应公告\n"
"• 点击【我知道了,继续支付】后显示付款链接\n"
"• 后台会并行加载付款链接,减少等待时间"
)
await _send_text(update.effective_chat.id, text, reply_markup=kb)
return
# 公告设置:进入编辑
if action == "announcement_edit":
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误")
return
announcement_type = parts[2] # usdt 或 alipay_wxpay
# 获取当前公告内容
current_text = (_get_setting(f"announcement.{announcement_type}.text", "")).strip()
if announcement_type == "usdt":
title = "USDT支付公告"
default_text = (
"📢 USDT支付重要提醒\n\n\n"
"⚠️ 请注意手续费问题\n\n"
"🏦 交易所转账(火币/欧易/币安)\n"
" 会扣 1U 手续费\n"
" 商品价格 10U → 请转 11U\n"
" 否则到账不足,无法自动拉群\n\n"
"💳 钱包转账(推荐 ✅)\n"
" 支持 Bitpie / TP / imToken 等钱包\n"
" 直接按商品金额转(例:10U 转 10U)\n"
" 钱包自动扣矿工费,到账准确,更省钱!\n\n"
"⚡️ 付款即发货,1-3分钟快速到账\n"
" 机器人自动拉你进会员群 ✅"
)
else:
title = "支付宝/微信支付公告"
default_text = (
"📢 欢迎光临官方商店\n\n\n"
"💳 微信 / 支付宝付款说明\n\n"
"✅ 按提示金额准确付款即可\n"
"✅ 支持微信扫码、支付宝扫码\n"
"✅ 付款后请勿关闭页面\n\n"
"⚡️ 付款即发货,1-3分钟快速到账\n"
" 机器人自动拉你进会员群 ✅"
)
ctx.user_data["adm_wait"] = {"type": "announcement_text", "data": {"announcement_type": announcement_type}}
kb = make_markup([
[InlineKeyboardButton("🔄 使用默认公告", callback_data=f"adm:announcement_use_default:{announcement_type}")],
row_back("adm:announcement")
])
preview_text = current_text if current_text else f"(当前使用默认公告)\n\n{default_text}"
await _send_text(
update.effective_chat.id,
f"请输入新的【{title}】内容:\n\n"
f"当前公告:\n{preview_text}\n\n"
"💡 提示:\n"
"- 支持多行文本\n"
"- 支持Emoji表情\n"
"- 建议简洁明了",
reply_markup=kb
)
return
# 使用默认公告
if action == "announcement_use_default":
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误")
return
announcement_type = parts[2]
_set_setting(f"announcement.{announcement_type}.text", "")
await _send_text(update.effective_chat.id, "✅ 已恢复默认公告")
await asyncio.sleep(1)
# 返回公告设置页
await adm_router(
type("obj", (), {
"callback_query": type("q", (), {"data": "adm:announcement"})(),
"effective_user": update.effective_user,
"effective_chat": update.effective_chat,
"get_bot": update.get_bot
})(),
ctx
)
return
# 公告设置:恢复默认
if action == "announcement_reset":
try:
_set_setting("announcement.text", "")
await _send_text(update.effective_chat.id, "✅ 已恢复默认公告")
await asyncio.sleep(1)
except Exception:
pass
# 返回公告设置页
await adm_router(
type("obj", (), {
"callback_query": type("q", (), {"data": "adm:announcement"})(),
"effective_user": update.effective_user,
"effective_chat": update.effective_chat,
"get_bot": update.get_bot
})(),
ctx
)
return
# 公告开关设置页面
if action == "announcement_switches":
usdt_enabled = _get_setting("announcement.usdt.enabled", "true") == "true"
usdt_token188_enabled = _get_setting("announcement.usdt_token188.enabled", "true") == "true"
alipay_enabled = _get_setting("announcement.alipay.enabled", "true") == "true"
wxpay_enabled = _get_setting("announcement.wxpay.enabled", "true") == "true"
kb = make_markup([
[InlineKeyboardButton(
f"{'' if usdt_enabled else ''} USDT(柠檬)",
callback_data="adm:announcement_toggle:usdt"
)],
[InlineKeyboardButton(
f"{'' if usdt_token188_enabled else ''} USDT(TOKEN188)",
callback_data="adm:announcement_toggle:usdt_token188"
)],
[InlineKeyboardButton(
f"{'' if alipay_enabled else ''} 支付宝",
callback_data="adm:announcement_toggle:alipay"
)],
[InlineKeyboardButton(
f"{'' if wxpay_enabled else ''} 微信支付",
callback_data="adm:announcement_toggle:wxpay"
)],
row_back("adm:announcement"),
])
text = (
"⚙️ 公告开关设置\n\n"
"点击按钮切换各支付方式的公告开关:\n\n"
f"• USDT(柠檬): {'✅ 已启用' if usdt_enabled else '❌ 已关闭'}\n"
f"• USDT(TOKEN188): {'✅ 已启用' if usdt_token188_enabled else '❌ 已关闭'}\n"
f"• 支付宝: {'✅ 已启用' if alipay_enabled else '❌ 已关闭'}\n"
f"• 微信支付: {'✅ 已启用' if wxpay_enabled else '❌ 已关闭'}\n\n"
"💡 启用后,用户选择该支付方式时会先显示公告"
)
await _send_text(update.effective_chat.id, text, reply_markup=kb)
return
# 切换公告开关
if action == "announcement_toggle":
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误")
return
channel = parts[2]
current_status = _get_setting(f"announcement.{channel}.enabled", "true")
new_status = "false" if current_status == "true" else "true"
_set_setting(f"announcement.{channel}.enabled", new_status)
# 返回开关设置页
await adm_router(
type("obj", (), {
"callback_query": type("q", (), {"data": "adm:announcement_switches"})(),
"effective_user": update.effective_user,
"effective_chat": update.effective_chat,
"get_bot": update.get_bot
})(),
ctx
)
return
# 订单删除:弹出确认
if action == "odelc":
# 格式:adm:odelc:{oid}:{status_key}:{page}
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_back("adm:olist:1:all")]))
return
oid = parts[2]
status_key = parts[3] if len(parts) > 3 else "all"
page = parts[4] if len(parts) > 4 else "1"
kb = make_markup([
[
InlineKeyboardButton("✅ 确认删除", callback_data=f"adm:odel:{oid}:{status_key}:{page}"),
InlineKeyboardButton("❌ 取消", callback_data=f"adm:odelx:{status_key}:{page}")
]
])
await _send_text(update.effective_chat.id, f"确认删除订单 #{oid}?此操作不可恢复。", reply_markup=kb)
return
# 订单删除:执行硬删除
if action == "odel":
# 格式:adm:odel:{oid}:{status_key}:{page}
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_back("adm:olist:1:all")]))
return
oid = parts[2]
status_key = parts[3] if len(parts) > 3 else "all"
page = int(parts[4]) if len(parts) > 4 and parts[4].isdigit() else 1
try:
cur.execute("DELETE FROM orders WHERE id=?", (oid,))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已删除订单,返回列表…", ttl=2)
except Exception:
# 删除失败也返回列表
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 删除失败,已返回列表。", ttl=3)
except Exception:
pass
await _send_order_list(update.effective_chat.id, page, status_key, ctx)
return
# 订单删除:取消并返回当前列表
if action == "odelx":
# 格式:adm:odelx:{status_key}:{page}
status_key = parts[2] if len(parts) > 2 else "all"
page = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 1
await _send_order_list(update.effective_chat.id, page, status_key, ctx)
return
# 订单人工回调:确认弹窗
if action == "opaidc":
# 格式:adm:opaidc:{oid}:{status_key}:{page}
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_back("adm:olist:1:all")]))
return
oid = parts[2]
status_key = parts[3] if len(parts) > 3 else "all"
page = int(parts[4]) if len(parts) > 4 and parts[4].isdigit() else 1
kb = make_markup([
[
InlineKeyboardButton("✅ 确认标记为已支付", callback_data=f"adm:opaid:{oid}:{status_key}:{page}"),
InlineKeyboardButton("❌ 取消", callback_data=f"adm:o:{oid}:{status_key}:{page}")
]
])
await _send_text(update.effective_chat.id, f"确认将订单 #{oid} 标记为已支付并发放邀请链接?", reply_markup=kb)
return
# 订单人工回调:标记为已支付并发邀请
if action == "opaid":
# 格式:adm:opaid:{oid}:{status_key}:{page}
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_back("adm:olist:1:all")]))
return
oid = parts[2]
status_key = parts[3] if len(parts) > 3 else "all"
page = int(parts[4]) if len(parts) > 4 and parts[4].isdigit() else 1
try:
row = cur.execute("SELECT out_trade_no, COALESCE(status,'pending') FROM orders WHERE id=?", (oid,)).fetchone()
if not row:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 未找到该订单", ttl=2)
# 返回详情页(如果找不到也回列表)
await _send_order_list(update.effective_chat.id, page, status_key, ctx)
return
out_trade_no, st = row
if (st or "").lower() not in ("pending", "paid"):
await send_ephemeral(update.get_bot(), update.effective_chat.id, "⚠️ 订单状态不可标记为已支付", ttl=2)
# 返回详情
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:o:{oid}:{status_key}:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
if not out_trade_no:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 订单缺少商户单号", ttl=2)
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:o:{oid}:{status_key}:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
if callable(mark_paid_and_send_invite):
try:
mark_paid_and_send_invite(out_trade_no)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已标记为已支付,正在发放自动拉群邀请…", ttl=3)
except Exception:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 标记失败,请稍后重试", ttl=3)
else:
# 兜底:仅置为 paid
try:
cur.execute("UPDATE orders SET status='paid' WHERE id=?", (oid,))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已标记为已支付", ttl=2)
except Exception:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 数据库更新失败", ttl=2)
except Exception:
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 处理失败", ttl=2)
except Exception:
pass
# 返回订单详情页以展示最新状态
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:o:{oid}:{status_key}:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 统计报表(支持时间范围 + TopN + ASCII图)
if action == "ostat":
# 默认落在“今日”
if not ctx.user_data.get("adm_sfilter"):
now = time.localtime()
day_start = int(time.mktime((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0, now.tm_wday, now.tm_yday, now.tm_isdst)))
ctx.user_data["adm_sfilter"] = {"start_ts": day_start, "end_ts": int(time.time())}
await _send_stat_page(update.effective_chat.id, ctx)
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 刷新完成", ttl=2)
except Exception:
pass
return
# 统计范围快捷切换:今日/本月/本年(对齐自然区间)
if action in {"sf_today", "sf_month", "sf_year"}:
now = time.localtime()
# 当天 00:00
day_start = int(time.mktime((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0, now.tm_wday, now.tm_yday, now.tm_isdst)))
if action == "sf_today":
start_ts = day_start
elif action == "sf_month":
# 本月1号 00:00
start_ts = int(time.mktime((now.tm_year, now.tm_mon, 1, 0, 0, 0, 0, 0, now.tm_isdst)))
else: # sf_year
# 当年1月1日 00:00
start_ts = int(time.mktime((now.tm_year, 1, 1, 0, 0, 0, 0, 0, now.tm_isdst)))
end_ts = int(time.time())
ctx.user_data["adm_sfilter"] = {"start_ts": start_ts, "end_ts": end_ts}
label = {"sf_today": "今日", "sf_month": "本月", "sf_year": "本年"}[action]
await send_ephemeral(update.get_bot(), update.effective_chat.id, f"✅ 已切换统计范围:{label},正在刷新…", ttl=2)
await _send_stat_page(update.effective_chat.id, ctx)
return
# 新增商品 - 启动流程
if action == "pnew":
ctx.user_data["adm_wait"] = {"type": "pnew_name", "data": {}}
kb = make_markup([row_home_admin()])
await _send_text(update.effective_chat.id, "请输入新商品【名称】:", reply_markup=kb)
return
# 删除商品
if action == "del":
pid = parts[2]
cur.execute("DELETE FROM products WHERE id=?", (pid,))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已删除,返回列表…", ttl=2)
# 返回列表
await _send_text(
update.effective_chat.id,
"📦 商品列表",
reply_markup=make_markup([
[InlineKeyboardButton("刷新列表", callback_data="adm:plist:1")],
row_home_admin(),
]),
)
return
# 上/下架
if action == "toggle":
pid = parts[2]
row = cur.execute("SELECT COALESCE(status,'on') FROM products WHERE id=?", (pid,)).fetchone()
if not row:
kb = make_markup([
[InlineKeyboardButton("📋 返回列表", callback_data="adm:plist:1")],
row_home_admin(),
])
await _send_text(update.effective_chat.id, "⚠️ 未找到该商品", reply_markup=kb)
return
cur_status = row[0] or 'on'
new_status = 'off' if cur_status == 'on' else 'on'
cur.execute("UPDATE products SET status=? WHERE id=?", (new_status, pid))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, f"✅ 已{'下架' if new_status=='off' else '上架'},返回商品页…", ttl=2)
# 返回单品页面(直接渲染,避免伪回调)
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "正在刷新…", ttl=2)
except Exception:
pass
await _send_product_page(update.effective_chat.id, pid)
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 刷新完成", ttl=2)
except Exception:
pass
return
# 主页编辑菜单
if action == "home":
await _send_home_menu(update.effective_chat.id)
return
# 主页预览
if action == "home_preview":
await _send_home_preview(update.effective_chat.id)
return
# 主页编辑 - 启动等待态
if action in {"home_title", "home_intro", "home_cover"}:
kind = action.split("_")[1] # title/intro/cover
ctx.user_data["adm_wait"] = {"type": f"home_{kind}", "data": {}}
prompt = {
"title": "请输入新的【主页标题】:",
"intro": "请输入新的【主页简介】:",
"cover": "请发送新的【封面】:可直接发图片(将保存 file_id),或发图片URL",
}[kind]
kb = make_markup([row_back("adm:home")])
await _send_text(update.effective_chat.id, prompt, reply_markup=kb)
return
# 编辑商品发货方式:改为内联按钮选择
if action == "edit_deliver":
# 格式:adm:edit_deliver:{pid}
pid = parts[2]
kb = make_markup([
[
InlineKeyboardButton("👥 自动拉群", callback_data=f"adm:set_deliver:{pid}:join_group"),
InlineKeyboardButton("🧷 通用卡密", callback_data=f"adm:set_deliver:{pid}:card_fixed"),
InlineKeyboardButton("🔑 卡池", callback_data=f"adm:set_deliver:{pid}:card_pool"),
],
row_back(f"adm:p:{pid}")
])
await _send_text(update.effective_chat.id, "请选择【发货方式】:", reply_markup=kb)
return
# 发货方式保存
if action == "set_deliver":
# 格式:adm:set_deliver:{pid}:{method}
if len(parts) < 4:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_back("adm:plist:1")]))
return
pid = parts[2]
method = parts[3]
if method not in {"join_group", "card_fixed", "card_pool"}:
kb = make_markup([row_back(f"adm:p:{pid}")])
await _send_text(update.effective_chat.id, "不支持的发货方式", reply_markup=kb)
return
try:
cur.execute("UPDATE products SET deliver_type=? WHERE id=?", (method, pid))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已保存发货方式,返回商品页…", ttl=2)
except Exception:
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 保存失败", ttl=2)
except Exception:
pass
# 返回商品页
await _send_product_page(update.effective_chat.id, pid)
return
# 新增商品:选择发货方式(内联按钮回调)
if action == "pnew_set_deliver":
# 格式:adm:pnew_set_deliver:{method}
if len(parts) < 3:
await _send_text(update.effective_chat.id, "参数错误", reply_markup=make_markup([row_home_admin()]))
return
method = parts[2]
# 仅在新增商品等待选择阶段有效
state = ctx.user_data.get("adm_wait") or {}
if state.get("type") != "pnew_wait_deliver":
await _send_text(update.effective_chat.id, "未处于新增商品的发货方式选择阶段,请重新开始新增商品流程。", reply_markup=make_markup([row_back("adm:plist:1")]))
return
data = state.get("data") or {}
name = data.get("name")
price = data.get("price")
desc = data.get("desc")
cover = data.get("cover")
if method not in {"join_group", "card_fixed", "card_pool"}:
await _send_text(update.effective_chat.id, "不支持的发货方式", reply_markup=make_markup([row_back("adm:plist:1")]))
return
if method == "join_group":
# 先选择了自动拉群,再去填写群ID
state["data"]["deliver_type"] = method
ctx.user_data["adm_wait"] = {"type": "pnew_group", "data": state["data"]}
kb = make_markup([row_home_admin()])
await _send_text(update.effective_chat.id, "请输入目标群组ID(需为机器人所在群,且机器人为管理员):", reply_markup=kb)
return
# 其它发货方式:立即创建,群ID置为空字符串
try:
cur.execute(
"INSERT INTO products(name, price, full_description, cover_url, tg_group_id, deliver_type) VALUES (?,?,?,?,?,?)",
(name, price, desc, cover, "", method),
)
conn.commit()
pid = cur.execute("SELECT last_insert_rowid()").fetchone()[0]
except Exception:
await _send_text(update.effective_chat.id, "保存失败,请稍后重试。", reply_markup=make_markup([row_back("adm:plist:1")]))
return
ctx.user_data.pop("adm_wait", None)
await _send_text(update.effective_chat.id, "✅ 新商品已创建,返回商品页…", reply_markup=make_markup([row_back(f"adm:p:{pid}")]))
await _send_product_page(update.effective_chat.id, str(pid))
return
# 编辑商品字段 - 启动等待态
if action.startswith("edit_"):
field = action.split(":")[0][5:] # name/price/desc/cover/group/deliver/card_fixed
pid = parts[2]
ctx.user_data["adm_wait"] = {"type": f"edit_{field}", "data": {"pid": pid}}
asks = {
"name": "请输入新的【商品名称】:",
"price": "请输入新的【价格】(数字):",
"desc": "请输入新的【详情描述】:",
"cover": "请发送新的【封面】:可直接发图片(保存 file_id)或发URL",
"group": "请输入新的【群组ID】:例如 -1001234567890",
"deliver": "发货方式已改为按钮选择,请点击上方“发货方式”按钮进行设置。若未看到按钮,请返回商品页重试。",
"card_fixed": "请输入新的【通用卡密】:",
}
kb = make_markup([row_back(f"adm:p:{pid}")])
await _send_text(update.effective_chat.id, asks[field], reply_markup=kb)
return
# 卡池管理页面
if action == "card_pool":
# 格式:adm:card_pool:{pid}:{page}
if len(parts) < 3:
return
pid = parts[2]
try:
page = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 1
except Exception:
page = 1
page = max(1, page)
page_size = 10
try:
stock_row = cur.execute("SELECT COUNT(*) FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL", (pid,)).fetchone()
stock_cnt = int(stock_row[0] or 0)
except Exception:
stock_cnt = 0
try:
used_row = cur.execute("SELECT COUNT(*) FROM card_keys WHERE product_id=? AND used_by_order_id IS NOT NULL", (pid,)).fetchone()
used_cnt = int(used_row[0] or 0)
except Exception:
used_cnt = 0
total_pages = (stock_cnt + page_size - 1) // page_size if stock_cnt > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
# 分页预览未使用卡密
rows = []
try:
rows = cur.execute(
"SELECT id, key_text FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL ORDER BY id ASC LIMIT ? OFFSET ?",
(pid, page_size, offset)
).fetchall()
except Exception:
rows = []
preview = "\n".join([f"#{r[0]} {str(r[1])[:60]}" for r in rows]) if rows else "(本页暂无未使用卡密)"
text = (
f"🔑 商品 #{pid} 的卡密库存\n"
f"未使用:{stock_cnt} | 已使用:{used_cnt}\n"
f"页码:{page}/{max(1,total_pages)}\n\n"
f"预览(每页最多{page_size}条,未使用):\n{preview}"
)
# 删除按钮(每行放置最多 5 个)
del_btns = []
row_buf = []
for _id, _ in rows:
row_buf.append(InlineKeyboardButton(f"❌#{_id}", callback_data=f"adm:cp_del:{pid}:{_id}:{page}"))
if len(row_buf) >= 5:
del_btns.append(row_buf)
row_buf = []
if row_buf:
del_btns.append(row_buf)
# 翻页按钮
nav = []
if page > 1:
nav.append(InlineKeyboardButton("⬅️ 上一页", callback_data=f"adm:card_pool:{pid}:{page-1}"))
if page < total_pages:
nav.append(InlineKeyboardButton("下一页 ➡️", callback_data=f"adm:card_pool:{pid}:{page+1}"))
kb_rows = [
[InlineKeyboardButton("📥 导入卡密", callback_data=f"adm:cp_import:{pid}"), InlineKeyboardButton("⬇️ 导出未用", callback_data=f"adm:cp_export:{pid}"), InlineKeyboardButton("🧹 清空未用", callback_data=f"adm:cp_clearc:{pid}")],
[InlineKeyboardButton("🧽 去重未用", callback_data=f"adm:cp_dedupc:{pid}:{page}"), InlineKeyboardButton("🗑 删除已用", callback_data=f"adm:cp_clear_usedc:{pid}")],
]
if del_btns:
kb_rows.extend(del_btns)
if nav:
kb_rows.append(nav)
kb_rows.append(row_back(f"adm:p:{pid}"))
kb_rows.append(row_home_admin())
kb = make_markup(kb_rows)
await _send_text(update.effective_chat.id, text, reply_markup=kb)
return
# 卡池导入:进入等待态
if action == "cp_import":
pid = parts[2]
ctx.user_data["adm_wait"] = {"type": "cp_import", "data": {"pid": pid}}
kb = make_markup([row_back(f"adm:card_pool:{pid}:1")])
await _send_text(update.effective_chat.id, "请粘贴要导入的卡密文本:\n- 每行一条\n- 将自动忽略空行\n- 同一商品下的重复行会被跳过", reply_markup=kb)
return
# 卡池清空未用:确认
if action == "cp_clearc":
pid = parts[2]
kb = make_markup([
[InlineKeyboardButton("✅ 确认清空未使用", callback_data=f"adm:cp_clear:{pid}"), InlineKeyboardButton("❌ 取消", callback_data=f"adm:card_pool:{pid}:1")]
])
await _send_text(update.effective_chat.id, f"确认清空商品 #{pid} 的未使用卡密吗?此操作不可恢复。", reply_markup=kb)
return
# 卡池清空未用:执行
if action == "cp_clear":
pid = parts[2]
try:
cur.execute("DELETE FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL", (pid,))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已清空未使用卡密", ttl=2)
except Exception:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 清空失败", ttl=2)
# 返回卡池页
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:1"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 卡池删除已使用:确认
if action == "cp_clear_usedc":
pid = parts[2]
kb = make_markup([
[InlineKeyboardButton("✅ 确认删除已使用", callback_data=f"adm:cp_clear_used:{pid}"), InlineKeyboardButton("❌ 取消", callback_data=f"adm:card_pool:{pid}:1")]
])
await _send_text(update.effective_chat.id, f"确认删除商品 #{pid} 的已使用卡密吗?此操作不可恢复。", reply_markup=kb)
return
# 卡池删除已使用:执行
if action == "cp_clear_used":
pid = parts[2]
try:
cur.execute("DELETE FROM card_keys WHERE product_id=? AND used_by_order_id IS NOT NULL", (pid,))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已删除已使用卡密", ttl=2)
except Exception:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 删除失败", ttl=2)
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:1"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 单条删除未使用卡密
if action == "cp_del":
# 格式:adm:cp_del:{pid}:{key_id}:{page}
if len(parts) < 4:
return
pid = parts[2]
try:
key_id = int(parts[3])
except Exception:
key_id = None
try:
page = int(parts[4]) if len(parts) > 4 and parts[4].isdigit() else 1
except Exception:
page = 1
if key_id is None:
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
try:
# 仅删除未使用的目标 id
cur.execute("DELETE FROM card_keys WHERE id=? AND product_id=? AND used_by_order_id IS NULL", (key_id, pid))
conn.commit()
await send_ephemeral(update.get_bot(), update.effective_chat.id, f"✅ 已删除 #{key_id}", ttl=2)
except Exception:
await send_ephemeral(update.get_bot(), update.effective_chat.id, f"❗ 删除失败 #{key_id}", ttl=2)
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 去重未用:确认
if action == "cp_dedupc":
pid = parts[2]
try:
page = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 1
except Exception:
page = 1
kb = make_markup([
[InlineKeyboardButton("✅ 确认去重未用", callback_data=f"adm:cp_dedup:{pid}:{page}"), InlineKeyboardButton("❌ 取消", callback_data=f"adm:card_pool:{pid}:{page}")]
])
await _send_text(update.effective_chat.id, f"将删除相同内容的重复未使用卡密,仅保留每组的最早一条。确定继续?", reply_markup=kb)
return
# 去重未用:执行
if action == "cp_dedup":
pid = parts[2]
try:
page = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 1
except Exception:
page = 1
# 扫描未使用卡密并删除重复(同 key_text, 仅保留最小 id
try:
rows = cur.execute("SELECT id, key_text FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL ORDER BY id ASC", (pid,)).fetchall()
except Exception:
rows = []
seen = set()
to_del = []
for rid, k in rows:
k = str(k)
if k in seen:
to_del.append(int(rid))
else:
seen.add(k)
removed = 0
if to_del:
# 分批删除,避免 SQL 变量过多
chunk = 200
for i in range(0, len(to_del), chunk):
ids = to_del[i:i+chunk]
qmarks = ",".join(["?"] * len(ids))
try:
cur.execute(f"DELETE FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL AND id IN ({qmarks})", (pid, *ids))
conn.commit()
removed += len(ids)
except Exception:
pass
await send_ephemeral(update.get_bot(), update.effective_chat.id, f"✅ 去重完成,删除 {removed}", ttl=3)
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
if action == "menu":
await _admin_menu(update, ctx)
return
# 导出未使用卡密为文本
if action == "cp_export":
pid = parts[2]
try:
rows = cur.execute(
"SELECT key_text FROM card_keys WHERE product_id=? AND used_by_order_id IS NULL ORDER BY id ASC",
(pid,)
).fetchall()
except Exception:
rows = []
if not rows:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "暂无未使用卡密可导出", ttl=2)
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:1"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
content = "\n".join([str(r[0]) for r in rows])
try:
bio = io.BytesIO(content.encode("utf-8"))
filename = f"product_{pid}_unused_{int(time.time())}.txt"
bio.name = filename
await app.bot.send_document(chat_id=update.effective_chat.id, document=bio, caption=f"商品 #{pid} 未使用卡密导出,共 {len(rows)}")
except Exception:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "导出失败", ttl=2)
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:1"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
async def adm_text_input(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not await _guard_admin(update):
return
state = ctx.user_data.get("adm_wait")
if not state:
return
kind = state.get("type")
# 兼容图片消息
msg = update.message
text = (getattr(msg, "text", None) or "").strip()
async def _check_and_warn_bot_admin(gid_text: str) -> bool:
try:
# 将群ID尽量转为 intTelegram 超级群通常形如 -100xxxxxxxxxx
try:
gid_int = int(gid_text)
except Exception:
gid_int = gid_text
me = await app.bot.get_me()
bot_id = getattr(me, "id", None)
if not bot_id:
return False
cm = await app.bot.get_chat_member(chat_id=gid_int, user_id=bot_id)
status = getattr(cm, "status", "")
if status not in ("administrator", "creator"):
try:
await update.message.reply_text("⚠️ 注意:机器人不是该群的管理员,自动拉群邀请与撤销可能失败。请将机器人设为管理员后再使用。")
except Exception:
pass
return False
return True
except Exception:
# 验证失败不阻断流程,仅忽略
return False
# 设置订单筛选开始日期(订单列表)
if kind == "of_start":
status_key = state["data"].get("status_key", "all")
page = state["data"].get("page", "1")
s = text
start_ts = _parse_date(s)
# 不限制留空
if s == "":
start_ts = None
ctx.user_data.setdefault("adm_ofilter", {})["start_ts"] = start_ts
ctx.user_data["adm_wait"] = {"type": "of_end", "data": {"status_key": status_key, "page": page}}
kb = make_markup([row_back(f"adm:olist:{page}:{status_key}")])
await update.message.reply_text("请输入【结束日期】(YYYY-MM-DD),留空表示不限制:", reply_markup=kb)
return
if kind == "of_end":
status_key = state["data"].get("status_key", "all")
page = int(state["data"].get("page", "1"))
s = text
end_ts = _parse_date(s)
if s == "":
end_ts = None
# 包含当日 23:59:59
if end_ts is not None:
end_ts = end_ts + 86399
ctx.user_data.setdefault("adm_ofilter", {})["end_ts"] = end_ts
ctx.user_data.pop("adm_wait", None)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已设置时间范围,返回订单列表…", ttl=2)
await _send_order_list(update.effective_chat.id, page, status_key, ctx)
return
# 搜索关键词输入
if kind == "osearch_q":
status_key = state["data"].get("status_key", "all")
page = int(state["data"].get("page", "1"))
qkw = text.strip()
if qkw == "":
# 为空等价清除
ctx.user_data.pop("adm_osearch", None)
tip = "✅ 已清除搜索条件,返回订单列表…"
else:
ctx.user_data["adm_osearch"] = {"q": qkw}
tip = "✅ 已设置搜索条件,返回订单列表…"
ctx.user_data.pop("adm_wait", None)
await send_ephemeral(update.get_bot(), update.effective_chat.id, tip, ttl=2)
await _send_order_list(update.effective_chat.id, page, status_key, ctx)
return
# 保存客服联系方式(客服设置)
if kind == "support_contact":
val = text
try:
_set_setting("support.contact", val)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已保存客服联系方式", ttl=2)
except Exception:
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "❗ 保存失败", ttl=2)
except Exception:
pass
ctx.user_data.pop("adm_wait", None)
# 返回客服设置主页
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": "adm:support"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 保存公告内容(公告设置)
if kind == "announcement_text":
val = text.strip()
announcement_type = state.get("data", {}).get("announcement_type", "usdt") # 默认为usdt
try:
_set_setting(f"announcement.{announcement_type}.text", val)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已保存公告内容", ttl=2)
except Exception:
try:
await _send_text(update.effective_chat.id, "❌ 保存失败")
except Exception:
pass
ctx.user_data.pop("adm_wait", None)
# 返回公告设置主页
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": "adm:announcement"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 卡池导入:处理文本
if kind == "cp_import":
pid = state["data"].get("pid")
# 拆分行,去空白
lines = [ln.strip() for ln in (text or "").splitlines()]
lines = [ln for ln in lines if ln]
if not lines:
kb = make_markup([row_back(f"adm:card_pool:{pid}:1")])
await update.message.reply_text("未检测到有效内容,请重新粘贴。", reply_markup=kb)
return
# 去重(同一商品范围内)
try:
exist_rows = cur.execute("SELECT key_text FROM card_keys WHERE product_id=?", (pid,)).fetchall()
exist_set = set(str(r[0]) for r in exist_rows)
except Exception:
exist_set = set()
to_insert = [(pid, ln, int(time.time())) for ln in lines if ln not in exist_set]
inserted = 0
if to_insert:
try:
cur.executemany("INSERT INTO card_keys(product_id, key_text, create_time) VALUES (?,?,?)", to_insert)
conn.commit()
inserted = len(to_insert)
except Exception:
inserted = 0
ctx.user_data.pop("adm_wait", None)
await send_ephemeral(update.get_bot(), update.effective_chat.id, f"✅ 导入完成,本次新增 {inserted}", ttl=3)
# 返回卡池页面
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:card_pool:{pid}:1"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 统计页设置开始日期
if kind == "sf_start":
s = text
start_ts = _parse_date(s)
if s == "":
start_ts = None
ctx.user_data.setdefault("adm_sfilter", {})["start_ts"] = start_ts
ctx.user_data["adm_wait"] = {"type": "sf_end", "data": {}}
kb = make_markup([row_back("adm:ostat")])
await update.message.reply_text("请输入【结束日期】(YYYY-MM-DD),留空表示不限制:", reply_markup=kb)
return
if kind == "sf_end":
s = text
end_ts = _parse_date(s)
if s == "":
end_ts = None
if end_ts is not None:
end_ts = end_ts + 86399
ctx.user_data.setdefault("adm_sfilter", {})["end_ts"] = end_ts
ctx.user_data.pop("adm_wait", None)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已设置统计时间范围,返回统计…", ttl=2)
await _send_stat_page(update.effective_chat.id, ctx)
return
# 商品排序(整页):按输入的 ID 序列重排当前页
if kind == "psort":
try:
page = int(state.get("data", {}).get("page", 1))
except Exception:
page = 1
ids_in_page = state.get("data", {}).get("ids", [])
# 解析输入:支持 "#1 #2 3 4" 形式
import re as _re
toks = [t for t in _re.split(r"[\s,;]+", text) if t]
parsed_ids = []
for t in toks:
t = t.strip()
if t.startswith("#"):
t = t[1:]
if t.isdigit():
try:
parsed_ids.append(int(t))
except Exception:
pass
# 去重并仅保留本页存在的 ID
seen = set()
parsed_ids_unique = []
for _id in parsed_ids:
if _id in seen:
continue
seen.add(_id)
if _id in ids_in_page:
parsed_ids_unique.append(_id)
# 目标顺序 = 用户指定的在前 + 其余未指定的按原顺序在后
rest_ids = [i for i in ids_in_page if i not in set(parsed_ids_unique)]
new_order_ids = parsed_ids_unique + rest_ids
# 读取当前页的 sort 值集合(按当前显示顺序:s DESC, id DESC
page_size = 10
offset = (page - 1) * page_size
_ensure_product_sort_column()
rows = cur.execute(
"SELECT id, COALESCE(sort, id) AS s FROM products ORDER BY s DESC, id DESC LIMIT ? OFFSET ?",
(page_size, offset),
).fetchall()
# 构造 id -> s 映射,并取按当前顺序排列的 s 值列表
id_to_s = {int(r[0]): int(r[1]) for r in rows}
s_vals_current_order = [int(r[1]) for r in rows]
# 仅重排本页这些 ID:用同一组 s 值重新分配,保持与其它页的相对位置
updates = []
for idx, pid in enumerate(new_order_ids):
if pid in id_to_s and idx < len(s_vals_current_order):
new_s = s_vals_current_order[idx]
if id_to_s.get(pid) != new_s:
updates.append((new_s, pid))
if updates:
try:
cur.executemany("UPDATE products SET sort=? WHERE id=?", updates)
conn.commit()
except Exception:
pass
# 结束等待并反馈
ctx.user_data.pop("adm_wait", None)
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已更新排序", ttl=2)
except Exception:
pass
# 刷新当前页
await adm_router(type("obj", (), {"callback_query": type("q", (), {"data": f"adm:plist:{page}"}), "effective_user": update.effective_user, "effective_chat": update.effective_chat, "get_bot": update.get_bot})(), ctx)
return
# 新增商品流程:name -> price -> desc -> cover -> group -> save
if kind == "pnew_name":
state["type"] = "pnew_price"
state["data"]["name"] = text
kb = make_markup([row_home_admin()])
await update.message.reply_text("请输入【价格】(数字):", reply_markup=kb)
return
if kind == "pnew_price":
try:
price = float(text)
except Exception:
kb = make_markup([row_home_admin()])
await update.message.reply_text("格式不正确,请输入数字价格:", reply_markup=kb)
return
state["type"] = "pnew_desc"
state["data"]["price"] = price
kb = make_markup([row_home_admin()])
await update.message.reply_text("请输入【详情描述】:(可换行,尽量简洁)", reply_markup=kb)
return
if kind == "pnew_desc":
state["type"] = "pnew_cover"
state["data"]["desc"] = text
kb = make_markup([row_home_admin()])
await update.message.reply_text("请发送【封面】:可直接发送图片(将保存为 file_id),或发送图片 URL。请务必提供。", reply_markup=kb)
return
if kind == "pnew_cover":
name = state["data"].get("name")
price = state["data"].get("price")
desc = state["data"].get("desc")
# 支持直接发送图片作为封面
cover = None
try:
photos = getattr(msg, "photo", None)
if photos:
cover = photos[-1].file_id
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已收到图片封面,正在保存…", ttl=2)
except Exception:
pass
elif text:
cover = text
except Exception:
cover = text if text else None
state["data"]["cover"] = cover
# 直接创建商品:默认发货方式设为 join_group(自动拉群,可在商品页修改)
try:
cur.execute(
"INSERT INTO products(name, price, full_description, cover_url, tg_group_id, deliver_type) VALUES (?,?,?,?,?,?)",
(name, price, desc, cover, "", "join_group"),
)
conn.commit()
pid = cur.execute("SELECT last_insert_rowid()").fetchone()[0]
except Exception:
kb = make_markup([row_home_admin()])
await update.message.reply_text("保存失败,请稍后重试。", reply_markup=kb)
return
ctx.user_data.pop("adm_wait", None)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 新商品已创建(发货方式默认为自动拉群,可在商品页修改)…", ttl=2)
await _send_product_page(update.effective_chat.id, str(pid))
return
if kind == "pnew_group":
gid = text
if not gid:
kb = make_markup([row_home_admin()])
await update.message.reply_text("群组ID 不能为空,请重新输入:", reply_markup=kb)
return
# 简单校验:必须以 -100 开头或为纯数字
ok = gid.startswith("-100") or gid.lstrip("-").isdigit()
if not ok:
kb = make_markup([row_home_admin()])
await update.message.reply_text("格式不正确,请输入正确的群组ID,例如 -1001234567890", reply_markup=kb)
return
# 强校验:机器人必须是该群管理员,否则不允许保存,停留在本步骤重试
ok_admin = await _check_and_warn_bot_admin(gid)
if not ok:
kb = make_markup([row_home_admin()])
await update.message.reply_text("已取消保存。请为机器人授予群管理员后,重新输入群组ID:", reply_markup=kb)
# 继续等待同一步骤输入
ctx.user_data["adm_wait"] = {"type": "pnew_group", "data": state["data"]}
return
# 使用已选择的发货方式创建商品
method = state["data"].get("deliver_type")
if method not in {"join_group", "card_fixed", "card_pool"}:
# 未选择发货方式,退回选择
state["type"] = "pnew_wait_deliver"
ctx.user_data["adm_wait"] = state
kb = make_markup([
[
InlineKeyboardButton("👥 自动拉群", callback_data="adm:pnew_set_deliver:join_group"),
InlineKeyboardButton("🧷 通用卡密", callback_data="adm:pnew_set_deliver:card_fixed"),
],
[InlineKeyboardButton("🔑 卡池", callback_data="adm:pnew_set_deliver:card_pool")],
row_home_admin(),
])
await update.message.reply_text("请先选择【发货方式】:", reply_markup=kb)
return
name = state["data"].get("name")
price = state["data"].get("price")
desc = state["data"].get("desc")
cover = state["data"].get("cover")
try:
cur.execute(
"INSERT INTO products(name, price, full_description, cover_url, tg_group_id, deliver_type) VALUES (?,?,?,?,?,?)",
(name, price, desc, cover, gid, method),
)
conn.commit()
pid = cur.execute("SELECT last_insert_rowid()").fetchone()[0]
except Exception:
await update.message.reply_text("保存失败,请稍后重试。")
return
ctx.user_data.pop("adm_wait", None)
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 新商品已创建,正在打开商品页…", ttl=2)
await _send_product_page(update.effective_chat.id, str(pid))
return
# 编辑商品字段
if kind and kind.startswith("edit_"):
pid = state["data"].get("pid")
field = kind.split("_", 1)[1]
if field == "price":
try:
val = float(text)
except Exception:
pid_back = state["data"].get("pid")
kb = make_markup([row_back(f"adm:p:{pid_back}")])
await update.message.reply_text("格式不正确,请输入数字价格:", reply_markup=kb)
return
else:
# 支持直接发图片作为封面
if field == "cover":
photos = getattr(msg, "photo", None)
if photos:
val = photos[-1].file_id
try:
await send_ephemeral(update.get_bot(), update.effective_chat.id, "✅ 已收到图片封面,正在保存…", ttl=2)
except Exception:
pass
else:
val = text
else:
val = text
col = {
"name": "name",
"price": "price",
"desc": "full_description",
"cover": "cover_url",
"group": "tg_group_id",
"deliver": "deliver_type",
"card_fixed": "card_fixed",
}[field]
# 若修改群ID,先做强校验,不通过则返回商品页不保存
if field == "group":
ok_admin = await _check_and_warn_bot_admin(str(val))
if not ok_admin:
try:
await update.message.reply_text("已取消保存。请先将机器人设为该群管理员。")
except Exception:
pass
# 返回单品页面
await _send_product_page(update.effective_chat.id, pid)
return
cur.execute(f"UPDATE products SET {col}=? WHERE id=?", (val, pid))
conn.commit()
ctx.user_data.pop("adm_wait", None)
await _send_text(update.effective_chat.id, "✅ 已保存发货方式,返回商品页…", reply_markup=make_markup([row_back(f"adm:p:{pid}")]))
await _send_text(update.effective_chat.id, "正在刷新…")
await _send_product_page(update.effective_chat.id, pid)
return
# 主页编辑(DB settings
if kind and kind.startswith("home_"):
key = kind.split("_", 1)[1] # title/intro/cover
if key == "title":
_set_setting("home.title", text)
elif key == "intro":
_set_setting("home.intro", text)
elif key == "cover":
# 支持直接发送图片作为主页封面(保存 file_id),或输入 URL 文本
val = text
try:
photos = getattr(update.message, "photo", None)
if photos:
val = photos[-1].file_id
try:
await update.message.reply_text("✅ 已收到图片封面,正在保存…")
except Exception:
pass
except Exception:
# 回退到文本
pass
_set_setting("home.cover_url", val)
ctx.user_data.pop("adm_wait", None)
m = await update.message.reply_text("✅ 主页设置已更新(已保存到数据库),正在返回主页设置…")
await asyncio.sleep(1)
try:
await update.get_bot().delete_message(update.effective_chat.id, m.message_id)
except Exception:
pass
await _send_home_menu(update.effective_chat.id)
return
# 本模块需要用到的去注释 JSON 解析(复用 bot.py 的工具若未提供)
def _strip_json_comments(s: str) -> str:
import re as _re
# 删除 // 和 /* */ 注释
s = _re.sub(r"/\*.*?\*/", "", s, flags=_re.S)
s = _re.sub(r"//.*", "", s)
return s
# 注册 handlers
app.add_handler(CommandHandler("admin", cmd_admin))
app.add_handler(CallbackQueryHandler(adm_router, pattern=r"^adm:"))
# 管理员文本输入(逐步问答)
# 管理员文本/图片输入(逐步问答)
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, adm_text_input))
app.add_handler(MessageHandler(filters.PHOTO, adm_text_input))
# 可选:用于自动化测试时导出内部函数引用
if deps.get("EXPOSE_TEST_HOOKS"):
return {
"_send_order_list": _send_order_list,
"_build_order_toolbar": _build_order_toolbar,
"_build_order_pagination": _build_order_pagination,
"adm_router": adm_router,
"_send_stat_page": _send_stat_page,
}