跳到主要内容

Redis 最佳实践与应用模式

Redis 的强大不仅在于它的命令,而在于开发者如何利用它的特性解决分布式系统的常见挑战。本章将介绍 Redis 在实际生产环境中的最佳实践和常见应用模式。

缓存设计模式

缓存是 Redis 最核心的使用场景,以下是几种核心模式。

Cache Aside(旁路缓存)

这是最通用的模式,应用管理缓存和数据库:

读操作流程

  1. 应用先读缓存
  2. 缓存命中:直接返回
  3. 缓存未命中:读数据库,写入缓存,返回

写操作流程

  1. 应用先写数据库
  2. 成功后删除缓存(而非更新)

为什么写操作是"删除"而不是"更新"缓存?

在高并发场景下,更新缓存可能导致脏数据。例如:

  • 线程 A 更新数据库为值 1
  • 线程 B 更新数据库为值 2
  • 线程 B 更新缓存为值 2
  • 线程 A 更新缓存为值 1(此时缓存中的值是错误的)

删除机制能利用缓存未命中的特性懒加载最新数据,避免这种竞态条件。

def get_user(user_id):
# 1. 先读缓存
cache_key = f"user:{user_id}"
cached = redis.get(cache_key)
if cached:
return json.loads(cached)

# 2. 缓存未命中,读数据库
user = db.query_user(user_id)
if user:
# 3. 写入缓存
redis.setex(cache_key, 3600, json.dumps(user))

return user

def update_user(user_id, data):
# 1. 更新数据库
db.update_user(user_id, data)

# 2. 删除缓存
redis.delete(f"user:{user_id}")

Read/Write Through

缓存代理负责读写数据库,应用只与缓存交互:

  • :缓存未命中时,缓存代理从数据库加载数据
  • :缓存代理同步更新缓存和数据库

这种模式需要缓存中间件支持(如 Redis 不原生支持,需要应用层实现)。

Write Behind

写操作只更新缓存,由后台进程异步批量写入数据库:

优点

  • 写入性能极高
  • 可以合并多次更新

缺点

  • 数据可能丢失
  • 实现复杂

缓存常见问题

缓存雪崩

问题:大量 Key 同时过期,请求全部打到数据库

解决方案

import random

# 方案一:过期时间加随机偏移
def set_cache(key, value, base_ttl=3600):
# 添加 0-300 秒的随机偏移
ttl = base_ttl + random.randint(0, 300)
redis.setex(key, ttl, value)

# 方案二:设置逻辑过期,后台刷新
def get_with_refresh(key, ttl=3600):
data = redis.get(key)
if data:
expire_time = redis.get(f"{key}:expire")
if expire_time and time.time() > float(expire_time):
# 触发后台刷新
async_refresh(key)
return data
return None

缓存击穿

问题:热点 Key 在失效瞬间,大量请求打到数据库

解决方案

import threading

# 方案一:互斥锁
def get_with_lock(key):
value = redis.get(key)
if value:
return value

# 获取分布式锁
lock_key = f"lock:{key}"
if redis.set(lock_key, "1", nx=True, ex=10):
try:
# 双重检查
value = redis.get(key)
if value:
return value

# 从数据库加载
value = db.query(key)
redis.setex(key, 3600, value)
return value
finally:
redis.delete(lock_key)
else:
# 等待并重试
time.sleep(0.1)
return get_with_lock(key)

# 方案二:永不过期,逻辑过期
def get_hot_data(key):
data = redis.hgetall(key)
if not data:
return load_and_cache(key)

expire_time = float(data.get('expire_time', 0))
if time.time() > expire_time:
# 异步刷新
async_refresh(key)

return data.get('value')

缓存穿透

问题:查询不存在的数据,每次都穿透到数据库

解决方案

# 方案一:缓存空值
def get_user(user_id):
cache_key = f"user:{user_id}"
cached = redis.get(cache_key)

if cached == "NULL": # 缓存的空值
return None

if cached:
return json.loads(cached)

user = db.query_user(user_id)
if user:
redis.setex(cache_key, 3600, json.dumps(user))
else:
# 缓存空值,设置较短过期时间
redis.setex(cache_key, 60, "NULL")

return user

# 方案二:布隆过滤器
from pybloom_live import ScalableBloomFilter

# 初始化布隆过滤器
bf = ScalableBloomFilter(initial_capacity=1000000, error_rate=0.001)

def init_bloom_filter():
# 预热:加载所有存在的 ID
for user_id in db.get_all_user_ids():
bf.add(user_id)

def get_user_safe(user_id):
# 先检查布隆过滤器
if user_id not in bf:
return None # 一定不存在

return get_user(user_id) # 可能存在,正常查询

分布式锁

基本实现

# 获取锁(设置 10 秒超时)
SET lock:order:123 "uuid_value" NX PX 10000

# 释放锁(使用 Lua 脚本核对 UUID,避免误删他人锁)
EVAL "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 lock:order:123 "uuid_value"

Python 实现

import uuid
import redis

class DistributedLock:
def __init__(self, redis_client, lock_name, timeout=10):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())

def acquire(self):
"""获取锁"""
return self.redis.set(
self.lock_name,
self.identifier,
nx=True,
px=self.timeout * 1000
)

def release(self):
"""释放锁"""
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
return self.redis.eval(script, 1, self.lock_name, self.identifier)

def __enter__(self):
self.acquire()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.release()

# 使用
r = redis.Redis()
with DistributedLock(r, "order:123", timeout=10) as lock:
# 执行业务逻辑
process_order()

Redlock 算法

对于更高可靠性要求的场景,可以使用 Redlock 算法(多个独立的 Redis 实例):

from redlock import RedLock

# 连接多个独立的 Redis 实例
lock = RedLock("distributed_lock",
hosts=[
{"host": "redis1.example.com", "port": 6379},
{"host": "redis2.example.com", "port": 6379},
{"host": "redis3.example.com", "port": 6379},
])

if lock.acquire():
try:
# 执行业务逻辑
pass
finally:
lock.release()

分布式限流

简单计数器限流

def rate_limit(key, limit, window):
"""简单计数器限流"""
current = redis.incr(key)
if current == 1:
redis.expire(key, window)

if current > limit:
return False # 限流
return True # 允许

滑动窗口限流

def sliding_window_rate_limit(key, limit, window):
"""滑动窗口限流"""
now = time.time()
window_start = now - window

pipe = redis.pipeline()
# 移除过期的请求
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(uuid.uuid4()): now})
# 设置过期时间
pipe.expire(key, window)

results = pipe.execute()
count = results[1]

if count >= limit:
return False # 限流
return True # 允许

令牌桶限流

-- token_bucket.lua
-- KEYS[1]: 限流键
-- ARGV[1]: 桶容量
-- ARGV[2]: 令牌产生速率(个/秒)
-- ARGV[3]: 当前时间戳
-- ARGV[4]: 请求数量
-- 返回: 1 允许, 0 拒绝

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 获取当前桶状态
local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or now

-- 计算新令牌数
local elapsed = now - last_time
local new_tokens = elapsed * rate
tokens = math.min(capacity, tokens + new_tokens)

-- 判断是否允许
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
return 1
else
return 0
end

排行榜系统

基本实现

# 提交分数
ZADD games:score 1500 user:1
ZADD games:score 2000 user:2

# 获取 Top 10
ZREVRANGE games:score 0 9 WITHSCORES

# 获取用户排名
ZREVRANK games:score user:1

# 获取用户分数
ZSCORE games:score user:1

# 分数范围查询
ZRANGEBYSCORE games:score 1000 2000 WITHSCORES

分页排行榜

def get_leaderboard(game_id, page, page_size=10):
"""分页获取排行榜"""
key = f"games:{game_id}:score"
start = (page - 1) * page_size
end = start + page_size - 1

# 获取排名和分数
results = redis.zrevrange(key, start, end, withscores=True)

# 获取总数
total = redis.zcard(key)

return {
'list': [{'user': r[0], 'score': r[1]} for r in results],
'total': total,
'page': page,
'page_size': page_size
}

消息队列

List 实现简单队列

# 生产者
def produce(queue_name, message):
redis.lpush(queue_name, json.dumps(message))

# 消费者(阻塞)
def consume(queue_name, timeout=0):
result = redis.brpop(queue_name, timeout)
if result:
return json.loads(result[1])
return None

Stream 实现可靠队列(Redis 5.0+)

# 生产者
def produce(stream_name, message):
return redis.xadd(stream_name, message)

# 消费者组
def create_consumer_group(stream_name, group_name):
try:
redis.xgroup_create(stream_name, group_name, id='0')
except redis.ResponseError:
pass # 组已存在

# 消费消息
def consume(stream_name, group_name, consumer_name, count=1):
messages = redis.xreadgroup(
group_name,
consumer_name,
{stream_name: '>'},
count=count
)
return messages

# 确认消息
def ack(stream_name, group_name, message_id):
redis.xack(stream_name, group_name, message_id)

延迟队列

使用 Sorted Set 实现延迟任务:

import time

# 添加延迟任务
def add_delay_task(queue_name, task, delay_seconds):
execute_time = time.time() + delay_seconds
redis.zadd(queue_name, {json.dumps(task): execute_time})

# 消费延迟任务
def consume_delay_task(queue_name):
now = time.time()

# 获取到期的任务
tasks = redis.zrangebyscore(queue_name, 0, now)

for task_json in tasks:
# 尝试获取任务(避免竞争)
if redis.zrem(queue_name, task_json):
task = json.loads(task_json)
# 执行任务
execute_task(task)

高性能开发建议

1. 使用 Pipeline

# 不好的做法:多次往返
for i in range(1000):
redis.set(f"key:{i}", f"value:{i}")

# 好的做法:使用 Pipeline
pipe = redis.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", f"value:{i}")
pipe.execute()

2. 避免 Big Key

# 单个 Key 成员数量建议控制在 5000 以内
# 使用 SCAN 替代 KEYS
SCAN 0 MATCH user:* COUNT 100

3. 合理使用连接池

import redis
from redis.connection import ConnectionPool

# 创建连接池
pool = ConnectionPool(
host='localhost',
port=6379,
max_connections=100,
decode_responses=True
)

# 使用连接池
redis_client = redis.Redis(connection_pool=pool)

4. 选择合适的数据结构

场景推荐数据结构
简单缓存String
对象存储Hash
消息队列List / Stream
标签/去重Set
排行榜Sorted Set
计数器String (INCR)
时间线List / Sorted Set

安全红线

绝对禁止的操作

  1. 禁止 KEYS *:生产环境使用 SCAN 代替
  2. 设置 maxmemory:防止 Redis 无限制吞掉服务器内存
  3. 绑定内网 IP:绝不允许 Redis 暴露在公网

必要的安全配置

# redis.conf
bind 127.0.0.1 192.168.1.100
requirepass your_strong_password
maxmemory 4gb
maxmemory-policy allkeys-lru

# 禁用危险命令
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command KEYS ""

小结

本章我们学习了:

  1. 缓存设计模式:Cache Aside、缓存雪崩/击穿/穿透的解决方案
  2. 分布式锁:基本实现和 Redlock 算法
  3. 分布式限流:计数器、滑动窗口、令牌桶
  4. 排行榜系统:使用 Sorted Set 实现
  5. 消息队列:List 和 Stream 实现
  6. 延迟队列:Sorted Set 实现
  7. 高性能建议:Pipeline、避免 Big Key、连接池
  8. 安全红线:禁止的操作和必要配置

练习

  1. 实现一个完整的缓存系统,包含雪崩、击穿、穿透的保护机制
  2. 使用 Redis 实现分布式锁,并测试其正确性
  3. 实现一个滑动窗口限流器
  4. 使用 Sorted Set 实现一个延迟任务队列

参考资料