跳到主要内容

Redis 发布订阅

发布订阅(Pub/Sub)是一种消息传递模式,发送者(发布者)将消息发送到频道,接收者(订阅者)订阅频道接收消息。Redis 提供了完整的发布订阅功能。

发布订阅概述

什么是发布订阅?

发布订阅是一种消息传递范式,发布者和订阅者之间解耦,通过频道(Channel)进行通信。

特点

  • 发布者不知道订阅者存在
  • 订阅者不知道发布者存在
  • 消息实时推送,不持久化

Redis Pub/Sub 的特点

特性说明
实时推送消息即时发送给所有订阅者
不持久化订阅者离线时消息丢失
至多一次每条消息最多送达一次
解耦合发布者和订阅者互不感知

与消息队列的区别

特性Redis Pub/SubRedis Stream专业 MQ
消息持久化
离线消息不支持支持支持
消息确认
消费者组
适用场景实时通知日志流企业消息

基本命令

SUBSCRIBE 订阅频道

订阅一个或多个频道:

# 订阅单个频道
127.0.0.1:6379> SUBSCRIBE news.sports
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 消息类型
2) "news.sports" # 频道名称
3) (integer) 1 # 当前订阅的频道数

# 订阅多个频道
127.0.0.1:6379> SUBSCRIBE news.sports news.tech
1) "subscribe"
2) "news.sports"
3) (integer) 1
1) "subscribe"
2) "news.tech"
3) (integer) 2

订阅后,客户端进入订阅状态,只能接收消息,不能执行其他命令。

PUBLISH 发布消息

向频道发布消息:

127.0.0.1:6379> PUBLISH news.sports "中国队获胜!"
(integer) 2 # 返回收到消息的订阅者数量
127.0.0.1:6379> PUBLISH news.tech "Redis 7.0 发布"
(integer) 1

UNSUBSCRIBE 取消订阅

取消订阅频道:

# 取消特定频道
127.0.0.1:6379> UNSUBSCRIBE news.sports
1) "unsubscribe"
2) "news.sports"
3) (integer) 1 # 剩余订阅数

# 取消所有订阅
127.0.0.1:6379> UNSUBSCRIBE
1) "unsubscribe"
2) "news.sports"
3) (integer) 0

模式订阅

PSUBSCRIBE 订阅模式

使用通配符订阅匹配的频道:

# 订阅所有 news 开头的频道
127.0.0.1:6379> PSUBSCRIBE news.*
1) "psubscribe"
2) "news.*"
3) (integer) 1

# 匹配的频道:
# news.sports、news.tech、news.weather 等

通配符规则

符号说明
*匹配任意字符(包括空)
?匹配单个字符
[abc]匹配 a、b 或 c
# 示例
PSUBSCRIBE user:*:msg # 匹配 user:123:msg, user:456:msg
PSUBSCRIBE log:?:error # 匹配 log:a:error, log:b:error
PSUBSCRIBE news:[abc] # 匹配 news:a, news:b, news:c

PUNSUBSCRIBE 取消模式订阅

# 取消特定模式
127.0.0.1:6379> PUNSUBSCRIBE news.*

# 取消所有模式订阅
127.0.0.1:6379> PUNSUBSCRIBE

消息格式

频道订阅消息格式

订阅频道时收到的消息格式:

# 订阅确认
1) "subscribe" # 消息类型
2) "channel_name" # 频道名称
3) 1 # 订阅数量

# 收到消息
1) "message" # 消息类型
2) "channel_name" # 频道名称
3) "message_body" # 消息内容

# 取消订阅
1) "unsubscribe" # 消息类型
2) "channel_name" # 频道名称
3) 0 # 剩余订阅数量

模式订阅消息格式

模式订阅收到的消息格式:

# 模式订阅确认
1) "psubscribe" # 消息类型
2) "pattern" # 模式
3) 1 # 订阅数量

# 收到消息
1) "pmessage" # 消息类型
2) "pattern" # 匹配的模式
3) "channel_name" # 实际频道名称
4) "message_body" # 消息内容

消息类型详解

subscribe 消息

确认订阅成功:

1) "subscribe"
2) "news.sports"
3) (integer) 1

message 消息

普通频道消息:

1) "message"
2) "news.sports"
3) "中国队获胜!"

pmessage 消息

模式匹配的消息:

1) "pmessage"
2) "news.*" # 匹配的模式
3) "news.sports" # 实际频道
4) "中国队获胜!"

unsubscribe 消息

取消订阅确认:

1) "unsubscribe"
2) "news.sports"
3) (integer) 0

Pub/Sub 信息查询

PUBSUB CHANNELS

查看活跃频道:

# 查看所有活跃频道
127.0.0.1:6379> PUBSUB CHANNELS
1) "news.sports"
2) "news.tech"

# 查看匹配的活跃频道
127.0.0.1:6379> PUBSUB CHANNELS news.*
1) "news.sports"
2) "news.tech"

活跃频道:至少有一个订阅者的频道。

PUBSUB NUMSUB

查看频道订阅者数量:

127.0.0.1:6379> PUBSUB NUMSUB news.sports news.tech
1) "news.sports"
2) (integer) 3 # 3 个订阅者
3) "news.tech"
4) (integer) 1 # 1 个订阅者

PUBSUB NUMPAT

查看模式订阅总数:

127.0.0.1:6379> PUBSUB NUMPAT
(integer) 5 # 当前有 5 个模式订阅

分片 Pub/Sub(Redis 7.0+)

概述

在 Redis Cluster 中,普通的 Pub/Sub 会将消息广播到所有节点,可能造成网络压力。分片 Pub/Sub 将频道分配到特定槽位,消息只在对应分片内传播。

分片命令

SSUBSCRIBE 分片订阅

# 订阅分片频道
SSUBSCRIBE myshard:channel

SPUBLISH 分片发布

# 发布分片消息
SPUBLISH myshard:channel "message"

SUNSUBSCRIBE 取消分片订阅

SUNSUBSCRIBE myshard:channel

分片规则

频道名按相同算法映射到槽位(类似键名),确保同一分片频道的订阅者在同一节点组。

应用场景

场景一:实时通知

import redis
import threading

# 发布者
def publisher():
r = redis.Redis()
while True:
message = input("输入消息: ")
r.publish('notifications', message)

# 订阅者
def subscriber(name):
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('notifications')

for message in pubsub.listen():
if message['type'] == 'message':
print(f"[{name}] 收到: {message['data'].decode()}")

# 启动订阅者
threading.Thread(target=subscriber, args=('用户A',)).start()
threading.Thread(target=subscriber, args=('用户B',)).start()

# 启动发布者
publisher()

场景二:聊天室

import redis
import threading

class ChatRoom:
def __init__(self, room_name, user_name):
self.r = redis.Redis()
self.room = room_name
self.user = user_name
self.channel = f"chat:{room_name}"

def join(self):
"""加入聊天室"""
pubsub = self.r.pubsub()
pubsub.subscribe(self.channel)

# 发送加入消息
self.r.publish(self.channel, f"系统: {self.user} 加入了聊天室")

# 监听消息
for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'].decode())

def send(self, text):
"""发送消息"""
self.r.publish(self.channel, f"{self.user}: {text}")

def leave(self):
"""离开聊天室"""
self.r.publish(self.channel, f"系统: {self.user} 离开了聊天室")

# 使用
chat = ChatRoom('general', '张三')
threading.Thread(target=chat.join).start()
chat.send("大家好!")

场景三:配置更新通知

import redis
import json

class ConfigManager:
def __init__(self):
self.r = redis.Redis()
self.config = {}
self._load_config()
self._listen_updates()

def _load_config(self):
"""加载配置"""
config_data = self.r.get('app:config')
if config_data:
self.config = json.loads(config_data)

def _listen_updates(self):
"""监听配置更新"""
def listener():
pubsub = self.r.pubsub()
pubsub.subscribe('config:updates')

for message in pubsub.listen():
if message['type'] == 'message':
self._load_config()
print("配置已更新")

import threading
threading.Thread(target=listener, daemon=True).start()

def update_config(self, new_config):
"""更新配置"""
self.r.set('app:config', json.dumps(new_config))
self.r.publish('config:updates', 'updated')

# 使用
manager = ConfigManager()
manager.update_config({'theme': 'dark', 'language': 'zh-CN'})

场景四:事件广播

import redis
import json

class EventEmitter:
def __init__(self):
self.r = redis.Redis()

def emit(self, event_name, data):
"""发送事件"""
self.r.publish(f'events:{event_name}', json.dumps(data))

def on(self, event_name, handler):
"""监听事件"""
def listener():
pubsub = self.r.pubsub()
pubsub.subscribe(f'events:{event_name}')

for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
handler(data)

import threading
threading.Thread(target=listener, daemon=True).start()

# 使用
emitter = EventEmitter()

# 监听用户登录事件
def on_login(data):
print(f"用户登录: {data['user']}")

emitter.on('user.login', on_login)

# 发送事件
emitter.emit('user.login', {'user': '张三', 'time': '2024-01-01 10:00:00'})

Pub/Sub 的局限性

1. 消息不持久化

# 订阅者离线时,消息丢失
# 解决方案:使用 Redis Stream 或专业消息队列

2. 无法确认消费

# 发布者不知道订阅者是否成功处理消息
# 解决方案:使用 Redis Stream 的消费者组

3. 不支持消息回溯

# 无法重新消费历史消息
# 解决方案:使用 Redis Stream 的 XREADGROUP

何时选择 Pub/Sub vs Stream

场景推荐方案
实时通知、广播Pub/Sub
聊天室、在线状态Pub/Sub
消息队列、任务处理Stream
日志收集、事件溯源Stream
需要消息确认Stream
需要历史消息Stream

小结

本章我们学习了:

  1. 发布订阅概述:解耦的消息传递模式
  2. 基本命令:SUBSCRIBE、PUBLISH、UNSUBSCRIBE
  3. 模式订阅:PSUBSCRIBE 和通配符匹配
  4. 消息格式:subscribe、message、pmessage 等类型
  5. 分片 Pub/Sub:Redis 7.0+ 的集群优化
  6. 应用场景:实时通知、聊天室、配置更新、事件广播
  7. 局限性:消息不持久化、无法确认消费

练习

  1. 创建一个简单的发布订阅系统,实现消息广播
  2. 使用模式订阅实现多频道消息监听
  3. 实现一个简单的聊天室应用
  4. 对比 Pub/Sub 和 Stream 的使用场景

参考资料