跳到主要内容

Redis Streams

Redis Stream 是 Redis 5.0 引入的数据结构,它类似于一个只能追加的日志文件,但提供了更强大的功能:支持 O(1) 时间复杂度的随机访问、消费者组、消息确认等特性。Stream 特别适合用于消息队列、事件溯源、日志收集等场景。

概述

什么是 Stream?

Stream 是一个有序的、只能追加的数据结构,每条消息都有唯一的时间戳 ID。与 Pub/Sub 不同,Stream 中的消息会持久化存储,支持多消费者、消息确认和历史消息回溯。

Stream 结构示意:

Stream (race:france)
┌──────────────────────────────────────────────────────┐
│ ID: 1692632086370-0 │ rider: Castilla, speed: 30.2 │
│ ID: 1692632086370-1 │ rider: Miller, speed: 28.5 │
│ ID: 1692632147973-0 │ rider: Zhang, speed: 29.9 │
│ ... │ ... │
└──────────────────────────────────────────────────────┘

时间有序的 ID

Stream vs Pub/Sub vs List

特性StreamPub/SubList
消息持久化支持不支持支持
多消费者支持(消费者组)支持需手动实现
消息确认支持不支持不支持
历史消息支持不支持支持
阻塞读取支持支持支持
消费进度支持不支持需手动记录
适用场景消息队列、事件溯源实时通知简单队列

典型应用场景

  • 消息队列:可靠的消息传递,支持消费者组
  • 事件溯源:记录系统状态变更事件
  • 日志收集:收集和处理日志数据
  • 传感器数据:收集物联网设备的时序数据
  • 通知系统:存储用户通知记录

基本命令

XADD 添加消息

向 Stream 添加一条新消息:

# 语法
XADD key [NOMKSTREAM] [<MAXLEN|MINID threshold [LIMIT length]> | LIMIT threshold length] *|ID field value [field value ...]

# 添加消息(自动生成 ID)
127.0.0.1:6379> XADD mystream * name "张三" age 25
"1692632086370-0" # 返回生成的消息 ID

# 添加消息并指定最大长度
127.0.0.1:6379> XADD mystream MAXLEN 1000 * name "李四"
"1692632086370-1"

# 使用 MINID 修剪(保留 ID 大于指定值的消息)
127.0.0.1:6379> XADD mystream MINID 1692632000000 * name "王五"
"1692632147973-0"

# 添加消息但不存在时不创建 Stream(NOMKSTREAM)
127.0.0.1:6379> XADD nonexistent NOMKSTREAM * field value
(nil)

ID 格式说明

格式:<毫秒时间戳>-<序列号>

示例:1692632086370-0
└──────┬──────┘ └┬┘
时间戳 序列号

特点:
- 时间戳:Redis 服务器本地时间(毫秒级)
- 序列号:同一毫秒内的序号(64位,足够大)
- 保证单调递增

MAXLEN 与 MINID

选项说明使用场景
MAXLEN限制消息数量固定大小的流
MINID保留 ID 大于指定值的消息基于时间的清理
LIMIT配合 MAXLEN/MINID 使用,限制每次修剪数量避免一次性修剪过多
# 限制最多 1000 条消息
XADD mystream MAXLEN 1000 * field value

# 大约限制 1000 条(性能更好,实际数量可能略多)
XADD mystream MAXLEN ~ 1000 * field value

# 保留最近 1 小时的消息
XADD mystream MINID 1692632000000 * field value

XLEN 获取长度

获取 Stream 中的消息数量:

127.0.0.1:6379> XLEN mystream
(integer) 5

XRANGE 获取范围消息

获取指定 ID 范围内的消息:

# 语法
XRANGE key start end [COUNT count]

# 获取所有消息
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
3) "age"
4) "25"
2) 1) "1692632086370-1"
2) 1) "name"
2) "李四"

# 获取指定范围的消息
127.0.0.1:6379> XRANGE mystream 1692632086370-0 1692632086370-1

# 获取前 2 条消息
127.0.0.1:6379> XRANGE mystream - + COUNT 2

# 从指定 ID 开始获取 2 条
127.0.0.1:6379> XRANGE mystream 1692632086370-0 + COUNT 2

ID 范围符号说明

符号说明
-最小 ID(开头)
+最大 ID(结尾)
(不包含指定 ID(开区间)
# 不包含结束 ID
XRANGE mystream - (1692632086370-1

XREVRANGE 反向获取消息

从后向前获取消息:

# 语法
XREVRANGE key end start [COUNT count]

# 获取最后 2 条消息
127.0.0.1:6379> XREVRANGE mystream + - COUNT 2
1) 1) "1692632147973-0"
2) 1) "name"
2) "王五"
2) 1) "1692632086370-1"
2) 1) "name"
2) "李四"

XREAD 读取消息

读取一条或多条消息(支持阻塞):

# 语法
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

# 从开头读取 2 条消息
127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"

# 读取新消息($ 表示最新消息 ID)
127.0.0.1:6379> XREAD STREAMS mystream $
(nil) # 没有新消息

# 阻塞等待新消息(最多等待 5000 毫秒)
127.0.0.1:6379> XREAD BLOCK 5000 STREAMS mystream $
# 如果 5 秒内有新消息,返回消息
# 否则返回 (nil)

ID 参数说明

ID说明
0从开头读取
$只读取新消息(当前最新 ID 之后)
具体ID从指定 ID 之后开始读取

XDEL 删除消息

删除指定消息:

127.0.0.1:6379> XDEL mystream 1692632086370-0
(integer) 1 # 返回删除的消息数量

注意:XDEL 只是标记删除,内存不会立即释放。只有在 Stream 被修剪(XTRIM)时才会真正释放。

XTRIM 修剪 Stream

修剪 Stream 到指定大小:

# 语法
XTRIM key <MAXLEN|MINID> threshold [LIMIT length]

# 限制最多 1000 条
127.0.0.1:6379> XTRIM mystream MAXLEN 1000
(integer) 50 # 返回删除的消息数量

# 近似修剪(性能更好)
127.0.0.1:6379> XTRIM mystream MAXLEN ~ 1000
(integer) 45

# 保留 ID 大于指定值的消息
127.0.0.1:6379> XTRIM mystream MINID 1692632000000
(integer) 30

消费者组

消费者组是 Stream 最强大的特性之一,它允许多个消费者协作处理同一个 Stream 中的消息。

概念说明

消费者组工作原理:

Stream: mystream
┌────────────────────────────────────────────────────────┐
│ msg1 │ msg2 │ msg3 │ msg4 │ msg5 │ msg6 │ msg7 │ msg8 │
└────────────────────────────────────────────────────────┘
│ │ │ │ │ │ │ │
└──────┴──────┼──────┴──────┼──────┴──────┘
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
│ Consumer1 │ │ Consumer2 │ │ Consumer3 │
│ [msg1,msg2]│ │ [msg3,msg4]│ │ [msg5,msg6]│
└───────────┘ └───────────┘ └───────────┘

消费者组保证:
1. 每条消息只被一个消费者处理
2. 消费者可以独立处理消息
3. 支持消息确认和重新投递

XGROUP 创建消费者组

# 语法
XGROUP CREATE key groupname ID|$ [MKSTREAM]

# 从头开始消费
127.0.0.1:6379> XGROUP CREATE mystream mygroup 0
OK

# 只消费新消息
127.0.0.1:6379> XGROUP CREATE mystream newgroup $
OK

# 如果 Stream 不存在则创建(MKSTREAM)
127.0.0.1:6379> XGROUP CREATE newstream mygroup 0 MKSTREAM
OK

XGROUP 其他操作

# 创建消费者(通常不需要手动创建,XREADGROUP 会自动创建)
127.0.0.1:6379> XGROUP CREATECONSUMER mystream mygroup consumer1
(integer) 1

# 删除消费者
127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup consumer1
(integer) 0 # 返回该消费者待处理消息数量

# 删除消费者组
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1

# 设置消费者组的起始 ID
127.0.0.1:6379> XGROUP SETID mystream mygroup $
OK

XREADGROUP 消费消息

以消费者组模式读取消息:

# 语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

# 读取待处理消息(> 表示所有未处理的消息)
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"

# 读取指定数量的消息
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 2 STREAMS mystream >

# 阻塞等待新消息
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 BLOCK 5000 STREAMS mystream >

# 读取自己的待处理消息(历史消息,不再投递给其他消费者)
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0

# 读取消息但不加入待处理队列(NOACK)
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 NOACK STREAMS mystream >

ID 参数说明

ID说明
>读取新消息(未被任何消费者处理过的)
0读取自己待处理的消息(已投递但未确认的)
具体ID读取指定 ID 之后的消息

XACK 确认消息

确认消息已成功处理:

# 语法
XACK key group ID [ID ...]

# 确认单条消息
127.0.0.1:6379> XACK mystream mygroup 1692632086370-0
(integer) 1 # 返回确认的消息数量

# 确认多条消息
127.0.0.1:6379> XACK mystream mygroup 1692632086370-0 1692632086370-1
(integer) 2

XPENDING 查看待处理消息

查看消费者组中的待处理消息:

# 查看待处理消息概览
127.0.0.1:6379> XPENDING mystream mygroup
1) (integer) 3 # 待处理消息总数
2) "1692632086370-0" # 最小 ID
3) "1692632147973-0" # 最大 ID
4) 1) 1) "consumer1"
2) "2" # consumer1 有 2 条待处理消息
2) 1) "consumer2"
2) "1" # consumer2 有 1 条待处理消息

# 查看详细待处理消息列表
127.0.0.1:6379> XPENDING mystream mygroup - + 10
1) 1) "1692632086370-0" # 消息 ID
2) "consumer1" # 消费者名称
3) (integer) 123456 # 空闲时间(毫秒)
4) (integer) 1 # 投递次数

# 查看特定消费者的待处理消息
127.0.0.1:6379> XPENDING mystream mygroup - + 10 consumer1

XCLAIM 转移消息

将消息从一个消费者转移到另一个消费者:

# 语法
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

# 将空闲超过 60000 毫秒的消息转移给 consumer2
127.0.0.1:6379> XCLAIM mystream mygroup consumer2 60000 1692632086370-0
1) 1) "1692632086370-0"
2) 1) "name"
2) "张三"

# 只返回 ID,不返回消息内容
127.0.0.1:6379> XCLAIM mystream mygroup consumer2 60000 1692632086370-0 JUSTID
1) "1692632086370-0"

# 强制转移(不考虑空闲时间)
127.0.0.1:6379> XCLAIM mystream mygroup consumer2 0 1692632086370-0 FORCE

XAUTOCLAIM 自动转移消息

自动转移长时间未确认的消息(Redis 6.2+):

# 语法
XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]

# 自动转移空闲超过 60000 毫秒的消息
127.0.0.1:6379> XAUTOCLAIM mystream mygroup consumer2 60000 0 COUNT 10
1) "0-0" # 下一个扫描起始 ID
2) 1) "1692632086370-0" # 被转移的消息
2) 1) "name"
2) "张三"
3) (empty array) # 被删除的消息 ID

XINFO 查看信息

查看 Stream 和消费者组的详细信息:

# 查看 Stream 信息
127.0.0.1:6379> XINFO STREAM mystream
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1692632147973-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 5
13) "recorded-first-entry-id"
14) "1692632086370-0"
15) "groups"
16) (integer) 1
17) "first-entry"
18) 1) "1692632086370-0"
2) 1) "name"
2) "张三"
19) "last-entry"
20) 1) "1692632147973-0"
2) 1) "name"
2) "王五"

# 查看消费者组信息
127.0.0.1:6379> XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1692632147973-0"
9) "entries-read"
10) (integer) 5
11) "lag"
12) (integer) 0

# 查看消费者信息
127.0.0.1:6379> XINFO CONSUMERS mystream mygroup
1) 1) "name"
2) "consumer1"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 12345

实际应用示例

场景一:消息队列

import redis
import json
import threading
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 初始化消费者组
try:
r.xgroup_create('task_queue', 'workers', id='0', mkstream=True)
except redis.ResponseError:
pass # 组已存在

def producer():
"""生产者:添加任务"""
tasks = [
{'type': 'email', 'to': '[email protected]', 'subject': 'Welcome'},
{'type': 'sms', 'phone': '13800138000', 'message': 'Hello'},
{'type': 'push', 'user_id': '123', 'title': 'Notification'},
]

for task in tasks:
msg_id = r.xadd('task_queue', {'data': json.dumps(task)})
print(f"Produced: {msg_id}")
time.sleep(0.1)

def consumer(name):
"""消费者:处理任务"""
while True:
# 读取新消息
messages = r.xreadgroup(
groupname='workers',
consumername=name,
streams={'task_queue': '>'},
count=1,
block=5000
)

if not messages:
continue

for stream, msgs in messages:
for msg_id, data in msgs:
try:
task = json.loads(data['data'])
print(f"[{name}] Processing: {task}")
# 模拟处理
time.sleep(0.5)
# 确认消息
r.xack('task_queue', 'workers', msg_id)
print(f"[{name}] Completed: {msg_id}")
except Exception as e:
print(f"[{name}] Error: {e}")

# 启动多个消费者
threads = []
for i in range(3):
t = threading.Thread(target=consumer, args=(f'worker{i+1}',), daemon=True)
t.start()
threads.append(t)

# 生产消息
producer()

场景二:事件溯源

import redis
import json
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def append_event(aggregate_id, event_type, data):
"""追加事件"""
event = {
'type': event_type,
'data': json.dumps(data),
'timestamp': datetime.now().isoformat()
}
stream_key = f'events:{aggregate_id}'
return r.xadd(stream_key, event)

def get_events(aggregate_id, start='-', end='+'):
"""获取事件列表"""
stream_key = f'events:{aggregate_id}'
events = r.xrange(stream_key, start, end)
return [(e[0], json.loads(e[1]['data'])) for e in events]

# 订单事件溯源示例
order_id = 'order_001'

# 追加事件
append_event(order_id, 'OrderCreated', {'user_id': 'user_123', 'items': ['item1', 'item2']})
append_event(order_id, 'PaymentReceived', {'amount': 100, 'method': 'credit_card'})
append_event(order_id, 'OrderShipped', {'tracking': 'SF123456'})
append_event(order_id, 'OrderDelivered', {'signed_by': '张三'})

# 获取所有事件重建订单状态
events = get_events(order_id)
for event_id, data in events:
print(f"{event_id}: {data}")

场景三:日志收集

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def log(level, service, message, **kwargs):
"""写入日志"""
entry = {
'level': level,
'service': service,
'message': message,
'timestamp': time.time(),
**kwargs
}
# 使用 MAXLEN 限制日志大小
r.xadd('logs:app', entry, maxlen=10000)

def tail_logs(follow=False, count=10):
"""读取日志"""
if follow:
# 实时跟踪日志
last_id = '$'
while True:
messages = r.xread({'logs:app': last_id}, block=1000, count=10)
if messages:
for stream, msgs in messages:
for msg_id, data in msgs:
print(f"[{data['level']}] {data['service']}: {data['message']}")
last_id = msg_id
else:
# 读取最近的日志
messages = r.xrevrange('logs:app', count=count)
for msg_id, data in reversed(messages):
print(f"[{data['level']}] {data['service']}: {data['message']}")

# 写入日志
log('INFO', 'api', 'Server started', port=8080)
log('WARN', 'api', 'High memory usage', usage='85%')
log('ERROR', 'db', 'Connection failed', host='localhost', port=5432)

# 读取日志
tail_logs(count=5)

场景四:聊天室

import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class ChatRoom:
def __init__(self, room_id):
self.room_id = room_id
self.stream_key = f'chat:{room_id}'

def send_message(self, user, content):
"""发送消息"""
return r.xadd(self.stream_key, {
'user': user,
'content': content,
'timestamp': str(time.time())
}, maxlen=1000) # 保留最近 1000 条消息

def get_messages(self, count=50):
"""获取历史消息"""
messages = r.xrevrange(self.stream_key, count=count)
return [{'id': m[0], **m[1]} for m in reversed(messages)]

def listen(self, callback):
"""监听新消息"""
last_id = '$'
while True:
messages = r.xread({self.stream_key: last_id}, block=5000, count=10)
if messages:
for stream, msgs in messages:
for msg_id, data in msgs:
callback({'id': msg_id, **data})
last_id = msg_id

# 使用示例
room = ChatRoom('general')

# 发送消息
room.send_message('张三', '大家好!')
room.send_message('李四', '欢迎新朋友!')

# 监听新消息(在另一个线程)
def on_message(msg):
print(f"[{msg['user']}]: {msg['content']}")

listener = threading.Thread(target=room.listen, args=(on_message,), daemon=True)
listener.start()

消费者组最佳实践

1. 消息处理模式

def process_messages(stream_key, group_name, consumer_name, handler):
"""标准消息处理模式"""
while True:
try:
# 1. 读取新消息
messages = r.xreadgroup(
groupname=group_name,
consumername=consumer_name,
streams={stream_key: '>'},
count=10,
block=5000
)

if not messages:
continue

for stream, msgs in messages:
for msg_id, data in msgs:
try:
# 2. 处理消息
handler(data)
# 3. 确认消息
r.xack(stream_key, group_name, msg_id)
except Exception as e:
# 处理失败,消息保持在待处理队列
print(f"Error processing {msg_id}: {e}")
# 可以选择记录错误或重试

except Exception as e:
print(f"Consumer error: {e}")
time.sleep(1)

2. 处理超时消息

def claim_timeout_messages(stream_key, group_name, consumer_name, timeout_ms=60000):
"""接管超时消息"""
# 获取待处理消息
pending = r.xpending_range(stream_key, group_name, '-', '+', 100)

for msg in pending:
msg_id = msg['message_id']
idle_time = msg['time_since_delivered']

# 空闲时间超过阈值
if idle_time > timeout_ms:
# 转移消息
claimed = r.xclaim(stream_key, group_name, consumer_name, timeout_ms, [msg_id])
for claim_id, data in claimed:
# 重新处理
print(f"Reprocessing timeout message: {claim_id}")
# ... 处理逻辑
r.xack(stream_key, group_name, claim_id)

3. 优雅关闭

import signal
import sys

running = True

def signal_handler(sig, frame):
global running
print("Shutting down...")
running = False

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def consumer_loop():
while running:
messages = r.xreadgroup(
groupname='mygroup',
consumername='worker1',
streams={'mystream': '>'},
count=10,
block=1000
)
# ... 处理消息

# 关闭前处理完待处理消息
pending = r.xpending_range('mystream', 'mygroup', '-', '+', 100, 'worker1')
for msg in pending:
# 处理待处理消息
pass

性能优化

1. 合理设置 MAXLEN

# 精确限制(每次写入都检查)
XADD mystream MAXLEN 10000 * field value

# 近似限制(性能更好,数量可能略多)
XADD mystream MAXLEN ~ 10000 * field value

2. 批量读取

# 一次读取多条消息
XREADGROUP GROUP mygroup consumer1 COUNT 100 STREAMS mystream >

3. 使用 Pipeline

pipe = r.pipeline()
for i in range(100):
pipe.xadd('mystream', {'field': f'value{i}'})
pipe.execute()

小结

本章我们学习了:

  1. Stream 概述:有序的追加日志结构,支持持久化和多消费者
  2. 基本命令:XADD、XLEN、XRANGE、XREAD、XDEL、XTRIM
  3. 消费者组:XGROUP、XREADGROUP、XACK、XPENDING、XCLAIM
  4. 应用场景:消息队列、事件溯源、日志收集、聊天室
  5. 最佳实践:消息处理模式、超时处理、优雅关闭

练习

  1. 创建一个 Stream 并添加多条消息,使用 XRANGE 和 XREAD 读取
  2. 创建一个消费者组,模拟多消费者协作处理消息
  3. 实现一个简单的消息队列系统
  4. 处理消费者崩溃后的待处理消息

参考资料