Files
fakabot/redis_cache.py

289 lines
7.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Redis缓存模块
提供商品信息、配置、用户会话等数据的缓存功能
"""
import redis
import json
import os
from typing import Any, Optional
from functools import wraps
import time
# Redis连接配置
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_DB = int(os.getenv('REDIS_DB', 0))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
# 缓存过期时间配置(秒)
CACHE_TTL = {
'product': 300, # 商品信息:5分钟
'config': 600, # 配置信息:10分钟
'user_session': 3600, # 用户会话:1小时
'rate_limit': 60, # 频率限制:1分钟
'stock': 30, # 库存信息:30秒
}
class RedisCache:
"""Redis缓存管理类"""
def __init__(self):
self.enabled = False
self.client = None
self._connect()
def _connect(self):
"""连接Redis"""
try:
self.client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
decode_responses=True,
socket_connect_timeout=3,
socket_timeout=3,
retry_on_timeout=True,
health_check_interval=30
)
# 测试连接
self.client.ping()
self.enabled = True
print(f"✅ Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
except Exception as e:
self.enabled = False
print(f"⚠️ Redis连接失败,缓存功能已禁用: {e}")
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
if not self.enabled:
return None
try:
value = self.client.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
print(f"❌ Redis GET失败: {key}, {e}")
return None
def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""设置缓存"""
if not self.enabled:
return False
try:
json_value = json.dumps(value, ensure_ascii=False)
if ttl:
self.client.setex(key, ttl, json_value)
else:
self.client.set(key, json_value)
return True
except Exception as e:
print(f"❌ Redis SET失败: {key}, {e}")
return False
def delete(self, key: str) -> bool:
"""删除缓存"""
if not self.enabled:
return False
try:
self.client.delete(key)
return True
except Exception as e:
print(f"❌ Redis DELETE失败: {key}, {e}")
return False
def exists(self, key: str) -> bool:
"""检查key是否存在"""
if not self.enabled:
return False
try:
return self.client.exists(key) > 0
except Exception:
return False
def incr(self, key: str, amount: int = 1) -> Optional[int]:
"""递增计数器"""
if not self.enabled:
return None
try:
return self.client.incrby(key, amount)
except Exception as e:
print(f"❌ Redis INCR失败: {key}, {e}")
return None
def expire(self, key: str, ttl: int) -> bool:
"""设置过期时间"""
if not self.enabled:
return False
try:
return self.client.expire(key, ttl)
except Exception:
return False
def ttl(self, key: str) -> int:
"""获取剩余过期时间"""
if not self.enabled:
return -1
try:
return self.client.ttl(key)
except Exception:
return -1
# 全局缓存实例
cache = RedisCache()
# 缓存装饰器
def cached(key_prefix: str, ttl: int = 300):
"""
缓存装饰器
使用示例:
@cached('product', ttl=300)
def get_product(pid):
return db.query(...)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
cache_key = f"{key_prefix}:{':'.join(map(str, args))}"
# 尝试从缓存获取
cached_value = cache.get(cache_key)
if cached_value is not None:
return cached_value
# 缓存未命中,执行函数
result = func(*args, **kwargs)
# 写入缓存
if result is not None:
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# 商品缓存
def get_product_cached(cur, pid: str):
"""获取商品信息(带缓存)"""
cache_key = f"product:{pid}"
# 尝试从缓存获取
cached = cache.get(cache_key)
if cached:
return cached
# 缓存未命中,查询数据库
try:
row = cur.execute(
"SELECT id, name, price, cover_url, full_description, status FROM products WHERE id=?",
(pid,)
).fetchone()
if row:
product = {
'id': row[0],
'name': row[1],
'price': row[2],
'cover_url': row[3],
'full_description': row[4],
'status': row[5]
}
# 写入缓存
cache.set(cache_key, product, CACHE_TTL['product'])
return product
except Exception as e:
print(f"❌ 查询商品失败: {e}")
return None
def invalidate_product_cache(pid: str):
"""清除商品缓存"""
cache.delete(f"product:{pid}")
# 配置缓存
def get_setting_cached(cur, key: str, default: str = "") -> str:
"""获取配置(带缓存)"""
cache_key = f"setting:{key}"
# 尝试从缓存获取
cached = cache.get(cache_key)
if cached is not None:
return cached
# 缓存未命中,查询数据库
try:
row = cur.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
value = row[0] if row else default
# 写入缓存
cache.set(cache_key, value, CACHE_TTL['config'])
return value
except Exception:
return default
def invalidate_setting_cache(key: str):
"""清除配置缓存"""
cache.delete(f"setting:{key}")
# 用户会话缓存
def set_user_session(user_id: int, data: dict, ttl: int = None):
"""设置用户会话数据"""
cache_key = f"session:{user_id}"
cache.set(cache_key, data, ttl or CACHE_TTL['user_session'])
def get_user_session(user_id: int) -> Optional[dict]:
"""获取用户会话数据"""
cache_key = f"session:{user_id}"
return cache.get(cache_key)
def clear_user_session(user_id: int):
"""清除用户会话"""
cache.delete(f"session:{user_id}")
if __name__ == "__main__":
# 测试Redis连接
print("测试Redis连接...")
print(f"Redis状态: {'✅ 已启用' if cache.enabled else '❌ 已禁用'}")
if cache.enabled:
# 测试基本操作
cache.set("test_key", {"hello": "world"}, 10)
value = cache.get("test_key")
print(f"测试读写: {value}")
# 测试计数器
count = cache.incr("test_counter")
print(f"测试计数器: {count}")
# 清理测试数据
cache.delete("test_key")
cache.delete("test_counter")
print("✅ Redis测试通过")