Redis Streams
Redis Stream 是 Redis 5.0 引入的数据结构,它类似于一个只能追加的日志文件,但提供了更强大的功能:支持 O(1) 时间复杂度的随机访问、消费者组、消息确认等特性。Stream 特别适合用于消息队列、事件溯源、日志收集等场景。
概述
什么是 Stream?
Stream 是一个有序的、只能追加的数据结构,每条消息都有唯一的时间戳 ID。与 Pub/Sub 不同,Stream 中的消息会持久化存储,支持多消费者、消息确认和历史消息回溯。
Stream 结构示意:
Stream (race:france)
┌──────────────────────────────────────────────────────┐
│ ID: 1692632086370-0 │ rider: Castilla, speed: 30.2 │
│ ID: 1692632086370-1 │ rider: Miller, speed: 28.5 │
│ ID: 1692632147973-0 │ rider: Zhang, speed: 29.9 │
│ ... │ ... │
└──────────────────────────────────────────────────────┘
↑
时间有序的 ID
Stream vs Pub/Sub vs List
| 特性 | Stream | Pub/Sub | List |
|---|---|---|---|
| 消息持久化 | 支持 | 不支持 | 支持 |
| 多消费者 | 支持(消费者组) | 支持 | 需手动实现 |
| 消息确认 | 支持 | 不支持 | 不支持 |
| 历史消息 | 支持 | 不支持 | 支持 |
| 阻塞读取 | 支持 | 支持 | 支持 |
| 消费进度 | 支持 | 不支持 | 需手动记录 |
| 适用场景 | 消息队列、事件溯源 | 实时通知 | 简单队列 |
典型应用场景
- 消息队列:可靠的消息传递,支持消费者组
- 事件溯源:记录系统状态变更事件
- 日志收集:收集和处理日志数据
- 传感器数据:收集物联网设备的时序数据
- 通知系统:存储用户通知记录
基本命令
XADD 添加消息
向 Stream 添加一条新消息:
# 语法
XADD key [NOMKSTREAM] [<MAXLEN|MINID threshold [LIMIT length]> | LIMIT threshold length] *|ID field value [field value ...]
# 添加消息(自动生成 ID)
127.0.0.1:6379> XADD mystream * name "张三" age 25
"1692632086370-0" # 返回生成的消息 ID
# 添加消息并指定最大长度
127.0.0.1:6379> XADD mystream MAXLEN 1000 * name "李四"
"1692632086370-1"
# 使用 MINID 修剪(保留 ID 大于指定值的消息)
127.0.0.1:6379> XADD mystream MINID 1692632000000 * name "王五"
"1692632147973-0"
# 添加消息但不存在时不创建 Stream(NOMKSTREAM)
127.0.0.1:6379> XADD nonexistent NOMKSTREAM * field value
(nil)
ID 格式说明:
格式:<毫秒时间戳>-<序列号>
示例:1692632086370-0
└──────┬──────┘ └┬┘
时间戳 序列号
特点:
- 时间戳:Redis 服务器本地时间(毫秒级)
- 序列号:同一毫秒内的序号(64位,足够大)
- 保证单调递增
MAXLEN 与 MINID:
| 选项 | 说明 | 使用场景 |
|---|---|---|
| MAXLEN | 限制消息数量 | 固定大小的流 |
| MINID | 保留 ID 大于指定值的消息 | 基于时间的清理 |
| LIMIT | 配合 MAXLEN/MINID 使用,限制每次修剪数量 | 避免一次性修剪过多 |
# 限制最多 1000 条消息
XADD mystream MAXLEN 1000 * field value
# 大约限制 1000 条(性能更好,实际数量可能略多)
XADD mystream MAXLEN ~ 1000 * field value
# 保留最近 1 小时的消息
XADD mystream MINID 1692632000000 * field value
XLEN 获取长度
获取 Stream 中的消息数量:
127.0.0.1:6379> XLEN mystream
(integer) 5
XRANGE 获取范围消息
获取指定 ID 范围内的消息:
# 语法
XRANGE key start end [COUNT count]
# 获取所有消息
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
3) "age"
4) "25"
2) 1) "1692632086370-1"
2) 1) "name"
2) "李四"
# 获取指定范围的消息
127.0.0.1:6379> XRANGE mystream 1692632086370-0 1692632086370-1
# 获取前 2 条消息
127.0.0.1:6379> XRANGE mystream - + COUNT 2
# 从指定 ID 开始获取 2 条
127.0.0.1:6379> XRANGE mystream 1692632086370-0 + COUNT 2
ID 范围符号说明:
| 符号 | 说明 |
|---|---|
- | 最小 ID(开头) |
+ | 最大 ID(结尾) |
( | 不包含指定 ID(开区间) |
# 不包含结束 ID
XRANGE mystream - (1692632086370-1
XREVRANGE 反向获取消息
从后向前获取消息:
# 语法
XREVRANGE key end start [COUNT count]
# 获取最后 2 条消息
127.0.0.1:6379> XREVRANGE mystream + - COUNT 2
1) 1) "1692632147973-0"
2) 1) "name"
2) "王五"
2) 1) "1692632086370-1"
2) 1) "name"
2) "李四"
XREAD 读取消息
读取一条或多条消息(支持阻塞):
# 语法
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 从开头读取 2 条消息
127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
# 读取新消息($ 表示最新消息 ID)
127.0.0.1:6379> XREAD STREAMS mystream $
(nil) # 没有新消息
# 阻塞等待新消息(最多等待 5000 毫秒)
127.0.0.1:6379> XREAD BLOCK 5000 STREAMS mystream $
# 如果 5 秒内有新消息,返回消息
# 否则返回 (nil)
ID 参数说明:
| ID | 说明 |
|---|---|
0 | 从开头读取 |
$ | 只读取新消息(当前最新 ID 之后) |
具体ID | 从指定 ID 之后开始读取 |
XDEL 删除消息
删除指定消息:
127.0.0.1:6379> XDEL mystream 1692632086370-0
(integer) 1 # 返回删除的消息数量
注意:XDEL 只是标记删除,内存不会立即释放。只有在 Stream 被修剪(XTRIM)时才会真正释放。
XTRIM 修剪 Stream
修剪 Stream 到指定大小:
# 语法
XTRIM key <MAXLEN|MINID> threshold [LIMIT length]
# 限制最多 1000 条
127.0.0.1:6379> XTRIM mystream MAXLEN 1000
(integer) 50 # 返回删除的消息数量
# 近似修剪(性能更好)
127.0.0.1:6379> XTRIM mystream MAXLEN ~ 1000
(integer) 45
# 保留 ID 大于指定值的消息
127.0.0.1:6379> XTRIM mystream MINID 1692632000000
(integer) 30
消费者组
消费者组是 Stream 最强大的特性之一,它允许多个消费者协作处理同一个 Stream 中的消息。
概念说明
消费者组工作原理:
Stream: mystream
┌────────────────────────────────────────────────────────┐
│ msg1 │ msg2 │ msg3 │ msg4 │ msg5 │ msg6 │ msg7 │ msg8 │
└────────────────────────────────────────────────────────┘
│ │ │ │ │ │ │ │
└──────┴──────┼──────┴──────┼──────┴──────┘
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
│ Consumer1 │ │ Consumer2 │ │ Consumer3 │
│ [msg1,msg2]│ │ [msg3,msg4]│ │ [msg5,msg6]│
└───────────┘ └───────────┘ └───────────┘
消费者组保证:
1. 每条消息只被一个消费者处理
2. 消费者可以独立处理消息
3. 支持消息确认和重新投递
XGROUP 创建消费者组
# 语法
XGROUP CREATE key groupname ID|$ [MKSTREAM]
# 从头开始消费
127.0.0.1:6379> XGROUP CREATE mystream mygroup 0
OK
# 只消费新消息
127.0.0.1:6379> XGROUP CREATE mystream newgroup $
OK
# 如果 Stream 不存在则创建(MKSTREAM)
127.0.0.1:6379> XGROUP CREATE newstream mygroup 0 MKSTREAM
OK
XGROUP 其他操作
# 创建消费者(通常不需要手动创建,XREADGROUP 会自动创建)
127.0.0.1:6379> XGROUP CREATECONSUMER mystream mygroup consumer1
(integer) 1
# 删除消费者
127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup consumer1
(integer) 0 # 返回该消费者待处理消息数量
# 删除消费者组
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
# 设置消费者组的起始 ID
127.0.0.1:6379> XGROUP SETID mystream mygroup $
OK
XREADGROUP 消费消息
以消费者组模式读取消息:
# 语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
# 读取待处理消息(> 表示所有未处理的消息)
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
# 读取指定数量的消息
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream >
# 阻塞等待新消息
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 BLOCK 5000 STREAMS mystream >
# 读取自己的待处理消息(历史消息,不再投递给其他消费者)
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0
# 读取消息但不加入待处理队列(NOACK)
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 NOACK STREAMS mystream >
ID 参数说明:
| ID | 说明 |
|---|---|
> | 读取新消息(未被任何消费者处理过的) |
0 | 读取自己待处理的消息(已投递但未确认的) |
具体ID | 读取指定 ID 之后的消息 |
XACK 确认消息
确认消息已成功处理:
# 语法
XACK key group ID [ID ...]
# 确认单条消息
127.0.0.1:6379> XACK mystream mygroup 1692632086370-0
(integer) 1 # 返回确认的消息数量
# 确认多条消息
127.0.0.1:6379> XACK mystream mygroup 1692632086370-0 1692632086370-1
(integer) 2
XPENDING 查看待处理消息
查看消费者组中的待处理消息:
# 查看待处理消息概览
127.0.0.1:6379> XPENDING mystream mygroup
1) (integer) 3 # 待处理消息总数
2) "1692632086370-0" # 最小 ID
3) "1692632147973-0" # 最大 ID
4) 1) 1) "consumer1"
2) "2" # consumer1 有 2 条待处理消息
2) 1) "consumer2"
2) "1" # consumer2 有 1 条待处理消息
# 查看详细待处理消息列表
127.0.0.1:6379> XPENDING mystream mygroup - + 10
1) 1) "1692632086370-0" # 消息 ID
2) "consumer1" # 消费者名称
3) (integer) 123456 # 空闲时间(毫秒)
4) (integer) 1 # 投递次数
# 查看特定消费者的待处理消息
127.0.0.1:6379> XPENDING mystream mygroup - + 10 consumer1
XCLAIM 转移消息
将消息从一个消费者转移到另一个消费者:
# 语法
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
# 将空闲超过 60000 毫秒的消息转移给 consumer2
127.0.0.1:6379> XCLAIM mystream mygroup consumer2 60000 1692632086370-0
1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
# 只返回 ID,不返回消息内容
127.0.0.1:6379> XCLAIM mystream mygroup consumer2 60000 1692632086370-0 JUSTID
1) "1692632086370-0"
# 强制转移(不考虑空闲时间)
127.0.0.1:6379> XCLAIM mystream mygroup consumer2 0 1692632086370-0 FORCE
XAUTOCLAIM 自动转移消息
自动转移长时间未确认的消息(Redis 6.2+):
# 语法
XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]
# 自动转移空闲超过 60000 毫秒的消息
127.0.0.1:6379> XAUTOCLAIM mystream mygroup consumer2 60000 0 COUNT 10
1) "0-0" # 下一个扫描起始 ID
2) 1) "1692632086370-0" # 被转移的消息
2) 1) "name"
2) "张三"
3) (empty array) # 被删除的消息 ID
XINFO 查看信息
查看 Stream 和消费者组的详细信息:
# 查看 Stream 信息
127.0.0.1:6379> XINFO STREAM mystream
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1692632147973-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 5
13) "recorded-first-entry-id"
14) "1692632086370-0"
15) "groups"
16) (integer) 1
17) "first-entry"
18) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
19) "last-entry"
20) 1) "1692632147973-0"
2) 1) "name"
2) "王五"
# 查看消费者组信息
127.0.0.1:6379> XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1692632147973-0"
9) "entries-read"
10) (integer) 5
11) "lag"
12) (integer) 0
# 查看消费者信息
127.0.0.1:6379> XINFO CONSUMERS mystream mygroup
1) 1) "name"
2) "consumer1"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 12345
实际应用示例
场景一:消息队列
import redis
import json
import threading
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 初始化消费者组
try:
r.xgroup_create('task_queue', 'workers', id='0', mkstream=True)
except redis.ResponseError:
pass # 组已存在
def producer():
"""生产者:添加任务"""
tasks = [
{'type': 'email', 'to': '[email protected]', 'subject': 'Welcome'},
{'type': 'sms', 'phone': '13800138000', 'message': 'Hello'},
{'type': 'push', 'user_id': '123', 'title': 'Notification'},
]
for task in tasks:
msg_id = r.xadd('task_queue', {'data': json.dumps(task)})
print(f"Produced: {msg_id}")
time.sleep(0.1)
def consumer(name):
"""消费者:处理任务"""
while True:
# 读取新消息
messages = r.xreadgroup(
groupname='workers',
consumername=name,
streams={'task_queue': '>'},
count=1,
block=5000
)
if not messages:
continue
for stream, msgs in messages:
for msg_id, data in msgs:
try:
task = json.loads(data['data'])
print(f"[{name}] Processing: {task}")
# 模拟处理
time.sleep(0.5)
# 确认消息
r.xack('task_queue', 'workers', msg_id)
print(f"[{name}] Completed: {msg_id}")
except Exception as e:
print(f"[{name}] Error: {e}")
# 启动多个消费者
threads = []
for i in range(3):
t = threading.Thread(target=consumer, args=(f'worker{i+1}',), daemon=True)
t.start()
threads.append(t)
# 生产消息
producer()
场景二:事件溯源
import redis
import json
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def append_event(aggregate_id, event_type, data):
"""追加事件"""
event = {
'type': event_type,
'data': json.dumps(data),
'timestamp': datetime.now().isoformat()
}
stream_key = f'events:{aggregate_id}'
return r.xadd(stream_key, event)
def get_events(aggregate_id, start='-', end='+'):
"""获取事件列表"""
stream_key = f'events:{aggregate_id}'
events = r.xrange(stream_key, start, end)
return [(e[0], json.loads(e[1]['data'])) for e in events]
# 订单事件溯源示例
order_id = 'order_001'
# 追加事件
append_event(order_id, 'OrderCreated', {'user_id': 'user_123', 'items': ['item1', 'item2']})
append_event(order_id, 'PaymentReceived', {'amount': 100, 'method': 'credit_card'})
append_event(order_id, 'OrderShipped', {'tracking': 'SF123456'})
append_event(order_id, 'OrderDelivered', {'signed_by': '张三'})
# 获取所有事件重建订单状态
events = get_events(order_id)
for event_id, data in events:
print(f"{event_id}: {data}")
场景三:日志收集
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def log(level, service, message, **kwargs):
"""写入日志"""
entry = {
'level': level,
'service': service,
'message': message,
'timestamp': time.time(),
**kwargs
}
# 使用 MAXLEN 限制日志大小
r.xadd('logs:app', entry, maxlen=10000)
def tail_logs(follow=False, count=10):
"""读取日志"""
if follow:
# 实时跟踪日志
last_id = '$'
while True:
messages = r.xread({'logs:app': last_id}, block=1000, count=10)
if messages:
for stream, msgs in messages:
for msg_id, data in msgs:
print(f"[{data['level']}] {data['service']}: {data['message']}")
last_id = msg_id
else:
# 读取最近的日志
messages = r.xrevrange('logs:app', count=count)
for msg_id, data in reversed(messages):
print(f"[{data['level']}] {data['service']}: {data['message']}")
# 写入日志
log('INFO', 'api', 'Server started', port=8080)
log('WARN', 'api', 'High memory usage', usage='85%')
log('ERROR', 'db', 'Connection failed', host='localhost', port=5432)
# 读取日志
tail_logs(count=5)
场景四:聊天室
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
class ChatRoom:
def __init__(self, room_id):
self.room_id = room_id
self.stream_key = f'chat:{room_id}'
def send_message(self, user, content):
"""发送消息"""
return r.xadd(self.stream_key, {
'user': user,
'content': content,
'timestamp': str(time.time())
}, maxlen=1000) # 保留最近 1000 条消息
def get_messages(self, count=50):
"""获取历史消息"""
messages = r.xrevrange(self.stream_key, count=count)
return [{'id': m[0], **m[1]} for m in reversed(messages)]
def listen(self, callback):
"""监听新消息"""
last_id = '$'
while True:
messages = r.xread({self.stream_key: last_id}, block=5000, count=10)
if messages:
for stream, msgs in messages:
for msg_id, data in msgs:
callback({'id': msg_id, **data})
last_id = msg_id
# 使用示例
room = ChatRoom('general')
# 发送消息
room.send_message('张三', '大家好!')
room.send_message('李四', '欢迎新朋友!')
# 监听新消息(在另一个线程)
def on_message(msg):
print(f"[{msg['user']}]: {msg['content']}")
listener = threading.Thread(target=room.listen, args=(on_message,), daemon=True)
listener.start()
消费者组最佳实践
1. 消息处理模式
def process_messages(stream_key, group_name, consumer_name, handler):
"""标准消息处理模式"""
while True:
try:
# 1. 读取新消息
messages = r.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={stream_key: '>'},
count=10,
block=5000
)
if not messages:
continue
for stream, msgs in messages:
for msg_id, data in msgs:
try:
# 2. 处理消息
handler(data)
# 3. 确认消息
r.xack(stream_key, group_name, msg_id)
except Exception as e:
# 处理失败,消息保持在待处理队列
print(f"Error processing {msg_id}: {e}")
# 可以选择记录错误或重试
except Exception as e:
print(f"Consumer error: {e}")
time.sleep(1)
2. 处理超时消息
def claim_timeout_messages(stream_key, group_name, consumer_name, timeout_ms=60000):
"""接管超时消息"""
# 获取待处理消息
pending = r.xpending_range(stream_key, group_name, '-', '+', 100)
for msg in pending:
msg_id = msg['message_id']
idle_time = msg['time_since_delivered']
# 空闲时间超过阈值
if idle_time > timeout_ms:
# 转移消息
claimed = r.xclaim(stream_key, group_name, consumer_name, timeout_ms, [msg_id])
for claim_id, data in claimed:
# 重新处理
print(f"Reprocessing timeout message: {claim_id}")
# ... 处理逻辑
r.xack(stream_key, group_name, claim_id)
3. 优雅关闭
import signal
import sys
running = True
def signal_handler(sig, frame):
global running
print("Shutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def consumer_loop():
while running:
messages = r.xreadgroup(
groupname='mygroup',
consumername='worker1',
streams={'mystream': '>'},
count=10,
block=1000
)
# ... 处理消息
# 关闭前处理完待处理消息
pending = r.xpending_range('mystream', 'mygroup', '-', '+', 100, 'worker1')
for msg in pending:
# 处理待处理消息
pass
性能优化
1. 合理设置 MAXLEN
# 精确限制(每次写入都检查)
XADD mystream MAXLEN 10000 * field value
# 近似限制(性能更好,数量可能略多)
XADD mystream MAXLEN ~ 10000 * field value
2. 批量读取
# 一次读取多条消息
XREADGROUP GROUP mygroup consumer1 COUNT 100 STREAMS mystream >
3. 使用 Pipeline
pipe = r.pipeline()
for i in range(100):
pipe.xadd('mystream', {'field': f'value{i}'})
pipe.execute()
小结
本章我们学习了:
- Stream 概述:有序的追加日志结构,支持持久化和多消费者
- 基本命令:XADD、XLEN、XRANGE、XREAD、XDEL、XTRIM
- 消费者组:XGROUP、XREADGROUP、XACK、XPENDING、XCLAIM
- 应用场景:消息队列、事件溯源、日志收集、聊天室
- 最佳实践:消息处理模式、超时处理、优雅关闭
练习
- 创建一个 Stream 并添加多条消息,使用 XRANGE 和 XREAD 读取
- 创建一个消费者组,模拟多消费者协作处理消息
- 实现一个简单的消息队列系统
- 处理消费者崩溃后的待处理消息