Redis 最佳实践与应用模式
Redis 的强大不仅在于它的命令,而在于开发者如何利用它的特性解决分布式系统的常见挑战。本章将介绍 Redis 在实际生产环境中的最佳实践和常见应用模式。
缓存设计模式
缓存是 Redis 最核心的使用场景,以下是几种核心模式。
Cache Aside(旁路缓存)
这是最通用的模式,应用管理缓存和数据库:
读操作流程:
- 应用先读缓存
- 缓存命中:直接返回
- 缓存未命中:读数据库,写入缓存,返回
写操作流程:
- 应用先写数据库
- 成功后删除缓存(而非更新)
为什么写操作是"删除"而不是"更新"缓存?
在高并发场景下,更新缓存可能导致脏数据。例如:
- 线程 A 更新数据库为值 1
- 线程 B 更新数据库为值 2
- 线程 B 更新缓存为值 2
- 线程 A 更新缓存为值 1(此时缓存中的值是错误的)
删除机制能利用缓存未命中的特性懒加载最新数据,避免这种竞态条件。
def get_user(user_id):
# 1. 先读缓存
cache_key = f"user:{user_id}"
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,读数据库
user = db.query_user(user_id)
if user:
# 3. 写入缓存
redis.setex(cache_key, 3600, json.dumps(user))
return user
def update_user(user_id, data):
# 1. 更新数据库
db.update_user(user_id, data)
# 2. 删除缓存
redis.delete(f"user:{user_id}")
Read/Write Through
缓存代理负责读写数据库,应用只与缓存交互:
- 读:缓存未命中时,缓存代理从数据库加载数据
- 写:缓存代理同步更新缓存和数据库
这种模式需要缓存中间件支持(如 Redis 不原生支持,需要应用层实现)。
Write Behind
写操作只更新缓存,由后台进程异步批量写入数据库:
优点:
- 写入性能极高
- 可以合并多次更新
缺点:
- 数据可能丢失
- 实现复杂
缓存常见问题
缓存雪崩
问题:大量 Key 同时过期,请求全部打到数据库
解决方案:
import random
# 方案一:过期时间加随机偏移
def set_cache(key, value, base_ttl=3600):
# 添加 0-300 秒的随机偏移
ttl = base_ttl + random.randint(0, 300)
redis.setex(key, ttl, value)
# 方案二:设置逻辑过期,后台刷新
def get_with_refresh(key, ttl=3600):
data = redis.get(key)
if data:
expire_time = redis.get(f"{key}:expire")
if expire_time and time.time() > float(expire_time):
# 触发后台刷新
async_refresh(key)
return data
return None
缓存击穿
问题:热点 Key 在失效瞬间,大量请求打到数据库
解决方案:
import threading
# 方案一:互斥锁
def get_with_lock(key):
value = redis.get(key)
if value:
return value
# 获取分布式锁
lock_key = f"lock:{key}"
if redis.set(lock_key, "1", nx=True, ex=10):
try:
# 双重检查
value = redis.get(key)
if value:
return value
# 从数据库加载
value = db.query(key)
redis.setex(key, 3600, value)
return value
finally:
redis.delete(lock_key)
else:
# 等待并重试
time.sleep(0.1)
return get_with_lock(key)
# 方案二:永不过期,逻辑过期
def get_hot_data(key):
data = redis.hgetall(key)
if not data:
return load_and_cache(key)
expire_time = float(data.get('expire_time', 0))
if time.time() > expire_time:
# 异步刷新
async_refresh(key)
return data.get('value')
缓存穿透
问题:查询不存在的数据,每次都穿透到数据库
解决方案:
# 方案一:缓存空值
def get_user(user_id):
cache_key = f"user:{user_id}"
cached = redis.get(cache_key)
if cached == "NULL": # 缓存的空值
return None
if cached:
return json.loads(cached)
user = db.query_user(user_id)
if user:
redis.setex(cache_key, 3600, json.dumps(user))
else:
# 缓存空值,设置较短过期时间
redis.setex(cache_key, 60, "NULL")
return user
# 方案二:布隆过滤器
from pybloom_live import ScalableBloomFilter
# 初始化布隆过滤器
bf = ScalableBloomFilter(initial_capacity=1000000, error_rate=0.001)
def init_bloom_filter():
# 预热:加载所有存在的 ID
for user_id in db.get_all_user_ids():
bf.add(user_id)
def get_user_safe(user_id):
# 先检查布隆过滤器
if user_id not in bf:
return None # 一定不存在
return get_user(user_id) # 可能存在,正常查询
分布式锁
基本实现
# 获取锁(设置 10 秒超时)
SET lock:order:123 "uuid_value" NX PX 10000
# 释放锁(使用 Lua 脚本核对 UUID,避免误删他人锁)
EVAL "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 lock:order:123 "uuid_value"
Python 实现
import uuid
import redis
class DistributedLock:
def __init__(self, redis_client, lock_name, timeout=10):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self):
"""获取锁"""
return self.redis.set(
self.lock_name,
self.identifier,
nx=True,
px=self.timeout * 1000
)
def release(self):
"""释放锁"""
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
return self.redis.eval(script, 1, self.lock_name, self.identifier)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用
r = redis.Redis()
with DistributedLock(r, "order:123", timeout=10) as lock:
# 执行业务逻辑
process_order()
Redlock 算法
对于更高可靠性要求的场景,可以使用 Redlock 算法(多个独立的 Redis 实例):
from redlock import RedLock
# 连接多个独立的 Redis 实例
lock = RedLock("distributed_lock",
hosts=[
{"host": "redis1.example.com", "port": 6379},
{"host": "redis2.example.com", "port": 6379},
{"host": "redis3.example.com", "port": 6379},
])
if lock.acquire():
try:
# 执行业务逻辑
pass
finally:
lock.release()
分布式限流
简单计数器限流
def rate_limit(key, limit, window):
"""简单计数器限流"""
current = redis.incr(key)
if current == 1:
redis.expire(key, window)
if current > limit:
return False # 限流
return True # 允许
滑动窗口限流
def sliding_window_rate_limit(key, limit, window):
"""滑动窗口限流"""
now = time.time()
window_start = now - window
pipe = redis.pipeline()
# 移除过期的请求
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(uuid.uuid4()): now})
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
count = results[1]
if count >= limit:
return False # 限流
return True # 允许
令牌桶限流
-- token_bucket.lua
-- KEYS[1]: 限流键
-- ARGV[1]: 桶容量
-- ARGV[2]: 令牌产生速率(个/秒)
-- ARGV[3]: 当前时间戳
-- ARGV[4]: 请求数量
-- 返回: 1 允许, 0 拒绝
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- 获取当前桶状态
local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or now
-- 计算新令牌数
local elapsed = now - last_time
local new_tokens = elapsed * rate
tokens = math.min(capacity, tokens + new_tokens)
-- 判断是否允许
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
return 1
else
return 0
end
排行榜系统
基本实现
# 提交分数
ZADD games:score 1500 user:1
ZADD games:score 2000 user:2
# 获取 Top 10
ZREVRANGE games:score 0 9 WITHSCORES
# 获取用户排名
ZREVRANK games:score user:1
# 获取用户分数
ZSCORE games:score user:1
# 分数范围查询
ZRANGEBYSCORE games:score 1000 2000 WITHSCORES
分页排行榜
def get_leaderboard(game_id, page, page_size=10):
"""分页获取排行榜"""
key = f"games:{game_id}:score"
start = (page - 1) * page_size
end = start + page_size - 1
# 获取排名和分数
results = redis.zrevrange(key, start, end, withscores=True)
# 获取总数
total = redis.zcard(key)
return {
'list': [{'user': r[0], 'score': r[1]} for r in results],
'total': total,
'page': page,
'page_size': page_size
}
消息队列
List 实现简单队列
# 生产者
def produce(queue_name, message):
redis.lpush(queue_name, json.dumps(message))
# 消费者(阻塞)
def consume(queue_name, timeout=0):
result = redis.brpop(queue_name, timeout)
if result:
return json.loads(result[1])
return None
Stream 实现可靠队列(Redis 5.0+)
# 生产者
def produce(stream_name, message):
return redis.xadd(stream_name, message)
# 消费者组
def create_consumer_group(stream_name, group_name):
try:
redis.xgroup_create(stream_name, group_name, id='0')
except redis.ResponseError:
pass # 组已存在
# 消费消息
def consume(stream_name, group_name, consumer_name, count=1):
messages = redis.xreadgroup(
group_name,
consumer_name,
{stream_name: '>'},
count=count
)
return messages
# 确认消息
def ack(stream_name, group_name, message_id):
redis.xack(stream_name, group_name, message_id)
延迟队列
使用 Sorted Set 实现延迟任务:
import time
# 添加延迟任务
def add_delay_task(queue_name, task, delay_seconds):
execute_time = time.time() + delay_seconds
redis.zadd(queue_name, {json.dumps(task): execute_time})
# 消费延迟任务
def consume_delay_task(queue_name):
now = time.time()
# 获取到期的任务
tasks = redis.zrangebyscore(queue_name, 0, now)
for task_json in tasks:
# 尝试获取任务(避免竞争)
if redis.zrem(queue_name, task_json):
task = json.loads(task_json)
# 执行任务
execute_task(task)
高性能开发建议
1. 使用 Pipeline
# 不好的做法:多次往返
for i in range(1000):
redis.set(f"key:{i}", f"value:{i}")
# 好的做法:使用 Pipeline
pipe = redis.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
pipe.execute()
2. 避免 Big Key
# 单个 Key 成员数量建议控制在 5000 以内
# 使用 SCAN 替代 KEYS
SCAN 0 MATCH user:* COUNT 100
3. 合理使用连接池
import redis
from redis.connection import ConnectionPool
# 创建连接池
pool = ConnectionPool(
host='localhost',
port=6379,
max_connections=100,
decode_responses=True
)
# 使用连接池
redis_client = redis.Redis(connection_pool=pool)
4. 选择合适的数据结构
| 场景 | 推荐数据结构 |
|---|---|
| 简单缓存 | String |
| 对象存储 | Hash |
| 消息队列 | List / Stream |
| 标签/去重 | Set |
| 排行榜 | Sorted Set |
| 计数器 | String (INCR) |
| 时间线 | List / Sorted Set |
安全红线
绝对禁止的操作
- 禁止
KEYS *:生产环境使用SCAN代替 - 设置 maxmemory:防止 Redis 无限制吞掉服务器内存
- 绑定内网 IP:绝不允许 Redis 暴露在公网
必要的安全配置
# redis.conf
bind 127.0.0.1 192.168.1.100
requirepass your_strong_password
maxmemory 4gb
maxmemory-policy allkeys-lru
# 禁用危险命令
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command KEYS ""
小结
本章我们学习了:
- 缓存设计模式:Cache Aside、缓存雪崩/击穿/穿透的解决方案
- 分布式锁:基本实现和 Redlock 算法
- 分布式限流:计数器、滑动窗口、令牌桶
- 排行榜系统:使用 Sorted Set 实现
- 消息队列:List 和 Stream 实现
- 延迟队列:Sorted Set 实现
- 高性能建议:Pipeline、避免 Big Key、连接池
- 安全红线:禁止的操作和必要配置
练习
- 实现一个完整的缓存系统,包含雪崩、击穿、穿透的保护机制
- 使用 Redis 实现分布式锁,并测试其正确性
- 实现一个滑动窗口限流器
- 使用 Sorted Set 实现一个延迟任务队列