mirror of
https://github.com/yanguo888/fakabot.git
synced 2026-06-20 20:40:40 +00:00
205 lines
6.4 KiB
Python
205 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# 授权检查 - 请勿删除此部分,否则程序无法运行
|
|
import _auth_check
|
|
|
|
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
频率限制模块
|
|
防止恶意刷单、暴力攻击等
|
|
"""
|
|
|
|
import time
|
|
from typing import Optional, Tuple
|
|
from redis_cache import cache
|
|
|
|
|
|
class RateLimiter:
|
|
"""频率限制器"""
|
|
|
|
# 限制规则配置
|
|
RULES = {
|
|
# 用户操作限制
|
|
'user_command': {'limit': 20, 'window': 60, 'desc': '命令操作'},
|
|
'user_payment': {'limit': 5, 'window': 300, 'desc': '创建订单'},
|
|
'user_query': {'limit': 10, 'window': 60, 'desc': '查询订单'},
|
|
|
|
# IP限制
|
|
'ip_callback': {'limit': 100, 'window': 60, 'desc': '支付回调'},
|
|
'ip_request': {'limit': 200, 'window': 60, 'desc': 'HTTP请求'},
|
|
|
|
# 全局限制
|
|
'global_order': {'limit': 1000, 'window': 60, 'desc': '全局订单创建'},
|
|
}
|
|
|
|
def __init__(self):
|
|
self.enabled = cache.enabled
|
|
|
|
def check_rate_limit(self, key: str, rule_name: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
检查频率限制
|
|
|
|
Args:
|
|
key: 限制对象标识(如user_id, ip等)
|
|
rule_name: 规则名称
|
|
|
|
Returns:
|
|
(是否允许, 错误消息)
|
|
"""
|
|
if not self.enabled:
|
|
return True, None
|
|
|
|
rule = self.RULES.get(rule_name)
|
|
if not rule:
|
|
return True, None
|
|
|
|
cache_key = f"rate_limit:{rule_name}:{key}"
|
|
|
|
try:
|
|
# 获取当前计数
|
|
current = cache.get(cache_key)
|
|
|
|
if current is None:
|
|
# 首次访问,初始化计数器
|
|
cache.set(cache_key, {'count': 1, 'start_time': int(time.time())}, rule['window'])
|
|
return True, None
|
|
|
|
# 检查时间窗口
|
|
elapsed = int(time.time()) - current['start_time']
|
|
|
|
if elapsed > rule['window']:
|
|
# 时间窗口已过,重置计数器
|
|
cache.set(cache_key, {'count': 1, 'start_time': int(time.time())}, rule['window'])
|
|
return True, None
|
|
|
|
# 在时间窗口内,检查计数
|
|
if current['count'] >= rule['limit']:
|
|
# 超过限制
|
|
remaining = rule['window'] - elapsed
|
|
error_msg = f"⚠️ {rule['desc']}过于频繁,请 {remaining} 秒后再试"
|
|
return False, error_msg
|
|
|
|
# 未超过限制,增加计数
|
|
current['count'] += 1
|
|
cache.set(cache_key, current, rule['window'])
|
|
return True, None
|
|
|
|
except Exception as e:
|
|
print(f"❌ 频率限制检查失败: {e}")
|
|
# 出错时放行,避免影响正常业务
|
|
return True, None
|
|
|
|
def get_remaining_quota(self, key: str, rule_name: str) -> dict:
|
|
"""
|
|
获取剩余配额
|
|
|
|
Returns:
|
|
{'used': 已使用次数, 'limit': 限制次数, 'remaining': 剩余次数, 'reset_in': 重置时间(秒)}
|
|
"""
|
|
if not self.enabled:
|
|
return {'used': 0, 'limit': 999, 'remaining': 999, 'reset_in': 0}
|
|
|
|
rule = self.RULES.get(rule_name)
|
|
if not rule:
|
|
return {'used': 0, 'limit': 999, 'remaining': 999, 'reset_in': 0}
|
|
|
|
cache_key = f"rate_limit:{rule_name}:{key}"
|
|
|
|
try:
|
|
current = cache.get(cache_key)
|
|
|
|
if current is None:
|
|
return {
|
|
'used': 0,
|
|
'limit': rule['limit'],
|
|
'remaining': rule['limit'],
|
|
'reset_in': 0
|
|
}
|
|
|
|
elapsed = int(time.time()) - current['start_time']
|
|
reset_in = max(0, rule['window'] - elapsed)
|
|
|
|
return {
|
|
'used': current['count'],
|
|
'limit': rule['limit'],
|
|
'remaining': max(0, rule['limit'] - current['count']),
|
|
'reset_in': reset_in
|
|
}
|
|
except Exception:
|
|
return {'used': 0, 'limit': 999, 'remaining': 999, 'reset_in': 0}
|
|
|
|
def reset_limit(self, key: str, rule_name: str):
|
|
"""重置限制(管理员功能)"""
|
|
cache_key = f"rate_limit:{rule_name}:{key}"
|
|
cache.delete(cache_key)
|
|
|
|
|
|
# 全局限制器实例
|
|
rate_limiter = RateLimiter()
|
|
|
|
|
|
# 装饰器:用户命令限制
|
|
def rate_limit_user_command(func):
|
|
"""用户命令频率限制装饰器"""
|
|
async def wrapper(update, context, *args, **kwargs):
|
|
user_id = update.effective_user.id
|
|
|
|
allowed, error_msg = rate_limiter.check_rate_limit(str(user_id), 'user_command')
|
|
|
|
if not allowed:
|
|
try:
|
|
await update.message.reply_text(error_msg)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
return await func(update, context, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
# 装饰器:用户支付限制
|
|
def rate_limit_user_payment(func):
|
|
"""用户支付频率限制装饰器"""
|
|
async def wrapper(update, context, *args, **kwargs):
|
|
user_id = update.effective_user.id
|
|
|
|
allowed, error_msg = rate_limiter.check_rate_limit(str(user_id), 'user_payment')
|
|
|
|
if not allowed:
|
|
try:
|
|
await update.callback_query.answer(error_msg, show_alert=True)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
return await func(update, context, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
# IP限制检查
|
|
def check_ip_rate_limit(ip: str, rule_name: str = 'ip_request') -> Tuple[bool, Optional[str]]:
|
|
"""检查IP频率限制"""
|
|
return rate_limiter.check_rate_limit(ip, rule_name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 测试频率限制
|
|
print("测试频率限制...")
|
|
|
|
# 测试用户命令限制
|
|
for i in range(25):
|
|
allowed, msg = rate_limiter.check_rate_limit("test_user_123", "user_command")
|
|
print(f"第{i+1}次请求: {'✅ 允许' if allowed else f'❌ 拒绝 - {msg}'}")
|
|
|
|
if not allowed:
|
|
# 查看剩余配额
|
|
quota = rate_limiter.get_remaining_quota("test_user_123", "user_command")
|
|
print(f"配额信息: {quota}")
|
|
break
|
|
|
|
print("\n✅ 频率限制测试完成")
|
|
|