Redis 发布订阅
发布订阅(Pub/Sub)是一种消息传递模式,发送者(发布者)将消息发送到频道,接收者(订阅者)订阅频道接收消息。Redis 提供了完整的发布订阅功能。
发布订阅概述
什么是发布订阅?
发布订阅是一种消息传递范式,发布者和订阅者之间解耦,通过频道(Channel)进行通信。
┌─────────────────────────────────────────────────────────────┐
│ 发布订阅模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 发布者 订阅者 │
│ ┌─────────┐ ┌─────────┐ │
│ │Publisher│ │Sub 1 │ │
│ └─────────┘ └─────────┘ │
│ │ ▲ │
│ │ PUBLISH │ │
│ ▼ │ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Channel (频道) │ │
│ │ news.sports │ │
│ └─────────────────────────────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │Publisher│ │Sub 2 │ │
│ └─────────┘ └─────────┘ │
│ │
│ 特点: │
│ - 发布者不知道订阅者存在 │
│ - 订阅者不知道发布者存在 │
│ - 消息实时推送,不持久化 │
│ │
└─────────────────────────────────────────────────────────────┘
Redis Pub/Sub 的特点
| 特性 | 说明 |
|---|---|
| 实时推送 | 消息即时发送给所有订阅者 |
| 不持久化 | 订阅者离线时消息丢失 |
| 至多一次 | 每条消息最多送达一次 |
| 解耦合 | 发布者和订阅者互不感知 |
与消息队列的区别
| 特性 | Redis Pub/Sub | Redis Stream | 专业 MQ |
|---|---|---|---|
| 消息持久化 | 无 | 有 | 有 |
| 离线消息 | 不支持 | 支持 | 支持 |
| 消息确认 | 无 | 有 | 有 |
| 消费组 | 无 | 有 | 有 |
| 适用场景 | 实时通知 | 日志流 | 企业消息 |
基本命令
SUBSCRIBE 订阅频道
订阅一个或多个频道:
# 订阅单个频道
127.0.0.1:6379> SUBSCRIBE news.sports
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 消息类型
2) "news.sports" # 频道名称
3) (integer) 1 # 当前订阅的频道数
# 订阅多个频道
127.0.0.1:6379> SUBSCRIBE news.sports news.tech
1) "subscribe"
2) "news.sports"
3) (integer) 1
1) "subscribe"
2) "news.tech"
3) (integer) 2
订阅后,客户端进入订阅状态,只能接收消息,不能执行其他命令。
PUBLISH 发布消息
向频道发布消息:
127.0.0.1:6379> PUBLISH news.sports "中国队获胜!"
(integer) 2 # 返回收到消息的订阅者数量
127.0.0.1:6379> PUBLISH news.tech "Redis 7.0 发布"
(integer) 1
UNSUBSCRIBE 取消订阅
取消订阅频道:
# 取消特定频道
127.0.0.1:6379> UNSUBSCRIBE news.sports
1) "unsubscribe"
2) "news.sports"
3) (integer) 1 # 剩余订阅数
# 取消所有订阅
127.0.0.1:6379> UNSUBSCRIBE
1) "unsubscribe"
2) "news.sports"
3) (integer) 0
模式订阅
PSUBSCRIBE 订阅模式
使用通配符订阅匹配的频道:
# 订阅所有 news 开头的频道
127.0.0.1:6379> PSUBSCRIBE news.*
1) "psubscribe"
2) "news.*"
3) (integer) 1
# 匹配的频道:
# news.sports、news.tech、news.weather 等
通配符规则:
| 符号 | 说明 |
|---|---|
* | 匹配任意字符(包括空) |
? | 匹配单个字符 |
[abc] | 匹配 a、b 或 c |
# 示例
PSUBSCRIBE user:*:msg # 匹配 user:123:msg, user:456:msg
PSUBSCRIBE log:?:error # 匹配 log:a:error, log:b:error
PSUBSCRIBE news:[abc] # 匹配 news:a, news:b, news:c
PUNSUBSCRIBE 取消模式订阅
# 取消特定模式
127.0.0.1:6379> PUNSUBSCRIBE news.*
# 取消所有模式订阅
127.0.0.1:6379> PUNSUBSCRIBE
消息格式
频道订阅消息格式
订阅频道时收到的消息格式:
# 订阅确认
1) "subscribe" # 消息类型
2) "channel_name" # 频道名称
3) 1 # 订阅数量
# 收到消息
1) "message" # 消息类型
2) "channel_name" # 频道名称
3) "message_body" # 消息内容
# 取消订阅
1) "unsubscribe" # 消息类型
2) "channel_name" # 频道名称
3) 0 # 剩余订阅数量
模式订阅消息格式
模式订阅收到的消息格式:
# 模式订阅确认
1) "psubscribe" # 消息类型
2) "pattern" # 模式
3) 1 # 订阅数量
# 收到消息
1) "pmessage" # 消息类型
2) "pattern" # 匹配的模式
3) "channel_name" # 实际频道名称
4) "message_body" # 消息内容
消息类型详解
subscribe 消息
确认订阅成功:
1) "subscribe"
2) "news.sports"
3) (integer) 1
message 消息
普通频道消息:
1) "message"
2) "news.sports"
3) "中国队获胜!"
pmessage 消息
模式匹配的消息:
1) "pmessage"
2) "news.*" # 匹配的模式
3) "news.sports" # 实际频道
4) "中国队获胜!"
unsubscribe 消息
取消订阅确认:
1) "unsubscribe"
2) "news.sports"
3) (integer) 0
Pub/Sub 信息查询
PUBSUB CHANNELS
查看活跃频道:
# 查看所有活跃频道
127.0.0.1:6379> PUBSUB CHANNELS
1) "news.sports"
2) "news.tech"
# 查看匹配的活跃频道
127.0.0.1:6379> PUBSUB CHANNELS news.*
1) "news.sports"
2) "news.tech"
活跃频道:至少有一个订阅者的频道。
PUBSUB NUMSUB
查看频道订阅者数量:
127.0.0.1:6379> PUBSUB NUMSUB news.sports news.tech
1) "news.sports"
2) (integer) 3 # 3 个订阅者
3) "news.tech"
4) (integer) 1 # 1 个订阅者
PUBSUB NUMPAT
查看模式订阅总数:
127.0.0.1:6379> PUBSUB NUMPAT
(integer) 5 # 当前有 5 个模式订阅
分片 Pub/Sub(Redis 7.0+)
概述
在 Redis Cluster 中,普通的 Pub/Sub 会将消息广播到所有节点,可能造成网络压力。分片 Pub/Sub 将频道分配到特定槽位,消息只在对应分片内传播。
┌─────────────────────────────────────────────────────────────┐
│ 分片 Pub/Sub │
├─────────────────────────────────────────────────────────────┤
│ │
│ 普通 Pub/Sub:消息广播到所有节点 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node A │──>│ Node B │──>│ Node C │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ↑ ↑ ↑ │
│ └─────────────┴─────────────┘ │
│ 所有节点都收到消息 │
│ │
│ ───────────────────────────────────────────────────── │
│ │
│ 分片 Pub/Sub:消息只在分片内传播 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node A │ │ Node B │ │ Node C │ │
│ │ Shard 1 │ │ Shard 2 │ │ Shard 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ↑ │
│ └──> 只有相关分片收到消息 │
│ │
└─────────────────────────────────────────────────────────────┘
分片命令
SSUBSCRIBE 分片订阅:
# 订阅分片频道
SSUBSCRIBE myshard:channel
SPUBLISH 分片发布:
# 发布分片消息
SPUBLISH myshard:channel "message"
SUNSUBSCRIBE 取消分片订阅:
SUNSUBSCRIBE myshard:channel
分片规则
频道名按相同算法映射到槽位(类似键名),确保同一分片频道的订阅者在同一节点组。
应用场景
场景一:实时通知
import redis
import threading
# 发布者
def publisher():
r = redis.Redis()
while True:
message = input("输入消息: ")
r.publish('notifications', message)
# 订阅者
def subscriber(name):
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"[{name}] 收到: {message['data'].decode()}")
# 启动订阅者
threading.Thread(target=subscriber, args=('用户A',)).start()
threading.Thread(target=subscriber, args=('用户B',)).start()
# 启动发布者
publisher()
场景二:聊天室
import redis
import threading
class ChatRoom:
def __init__(self, room_name, user_name):
self.r = redis.Redis()
self.room = room_name
self.user = user_name
self.channel = f"chat:{room_name}"
def join(self):
"""加入聊天室"""
pubsub = self.r.pubsub()
pubsub.subscribe(self.channel)
# 发送加入消息
self.r.publish(self.channel, f"系统: {self.user} 加入了聊天室")
# 监听消息
for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'].decode())
def send(self, text):
"""发送消息"""
self.r.publish(self.channel, f"{self.user}: {text}")
def leave(self):
"""离开聊天室"""
self.r.publish(self.channel, f"系统: {self.user} 离开了聊天室")
# 使用
chat = ChatRoom('general', '张三')
threading.Thread(target=chat.join).start()
chat.send("大家好!")
场景三:配置更新通知
import redis
import json
class ConfigManager:
def __init__(self):
self.r = redis.Redis()
self.config = {}
self._load_config()
self._listen_updates()
def _load_config(self):
"""加载配置"""
config_data = self.r.get('app:config')
if config_data:
self.config = json.loads(config_data)
def _listen_updates(self):
"""监听配置更新"""
def listener():
pubsub = self.r.pubsub()
pubsub.subscribe('config:updates')
for message in pubsub.listen():
if message['type'] == 'message':
self._load_config()
print("配置已更新")
import threading
threading.Thread(target=listener, daemon=True).start()
def update_config(self, new_config):
"""更新配置"""
self.r.set('app:config', json.dumps(new_config))
self.r.publish('config:updates', 'updated')
# 使用
manager = ConfigManager()
manager.update_config({'theme': 'dark', 'language': 'zh-CN'})
场景四:事件广播
import redis
import json
class EventEmitter:
def __init__(self):
self.r = redis.Redis()
def emit(self, event_name, data):
"""发送事件"""
self.r.publish(f'events:{event_name}', json.dumps(data))
def on(self, event_name, handler):
"""监听事件"""
def listener():
pubsub = self.r.pubsub()
pubsub.subscribe(f'events:{event_name}')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
handler(data)
import threading
threading.Thread(target=listener, daemon=True).start()
# 使用
emitter = EventEmitter()
# 监听用户登录事件
def on_login(data):
print(f"用户登录: {data['user']}")
emitter.on('user.login', on_login)
# 发送事件
emitter.emit('user.login', {'user': '张三', 'time': '2024-01-01 10:00:00'})
注意事项
1. 消息不持久化
# 订阅者离线时,消息会丢失
# 客户端 A:订阅频道
SUBSCRIBE news
# 客户端 B:发布消息
PUBLISH news "消息1" # 如果此时没有订阅者,消息丢失
# 客户端 A:只能收到之后的消息
解决方案:需要持久化时使用 Redis Stream。
2. 订阅状态限制
在 RESP2 协议下,订阅状态的客户端只能执行:
SUBSCRIBE
UNSUBSCRIBE
PSUBSCRIBE
PUNSUBSCRIBE
PING
QUIT
RESP3 协议(Redis 6.0+)取消了此限制。
3. 与数据库无关
Pub/Sub 不受数据库编号限制:
# 在 DB 0 发布
SELECT 0
PUBLISH news "消息"
# 在 DB 1 也能收到
SELECT 1
SUBSCRIBE news
# 仍然能收到消息
4. 重复订阅
同时订阅频道和匹配模式,会收到两条消息:
SUBSCRIBE news.sports
PSUBSCRIBE news.*
# 发布消息
PUBLISH news.sports "消息"
# 收到两条消息:
# 1) "message" + "news.sports" + "消息"
# 2) "pmessage" + "news.*" + "news.sports" + "消息"
小结
本章我们学习了:
- 基本概念:发布订阅模式和工作原理
- 基本命令:SUBSCRIBE、PUBLISH、UNSUBSCRIBE
- 模式订阅:PSUBSCRIBE 通配符匹配
- 消息格式:不同类型消息的结构
- 分片 Pub/Sub:Redis 7.0 新特性
- 应用场景:通知、聊天室、配置更新、事件广播
练习
- 实现一个简单的聊天室程序
- 使用模式订阅实现多级频道
- 对比 Pub/Sub 和 Stream 的使用场景
- 实现配置热更新通知机制