跳到主要内容

Redis 发布订阅

发布订阅(Pub/Sub)是一种消息传递模式,发送者(发布者)将消息发送到频道,接收者(订阅者)订阅频道接收消息。Redis 提供了完整的发布订阅功能。

发布订阅概述

什么是发布订阅?

发布订阅是一种消息传递范式,发布者和订阅者之间解耦,通过频道(Channel)进行通信。

┌─────────────────────────────────────────────────────────────┐
│ 发布订阅模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 发布者 订阅者 │
│ ┌─────────┐ ┌─────────┐ │
│ │Publisher│ │Sub 1 │ │
│ └─────────┘ └─────────┘ │
│ │ ▲ │
│ │ PUBLISH │ │
│ ▼ │ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Channel (频道) │ │
│ │ news.sports │ │
│ └─────────────────────────────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │Publisher│ │Sub 2 │ │
│ └─────────┘ └─────────┘ │
│ │
│ 特点: │
│ - 发布者不知道订阅者存在 │
│ - 订阅者不知道发布者存在 │
│ - 消息实时推送,不持久化 │
│ │
└─────────────────────────────────────────────────────────────┘

Redis Pub/Sub 的特点

特性说明
实时推送消息即时发送给所有订阅者
不持久化订阅者离线时消息丢失
至多一次每条消息最多送达一次
解耦合发布者和订阅者互不感知

与消息队列的区别

特性Redis Pub/SubRedis Stream专业 MQ
消息持久化
离线消息不支持支持支持
消息确认
消费组
适用场景实时通知日志流企业消息

基本命令

SUBSCRIBE 订阅频道

订阅一个或多个频道:

# 订阅单个频道
127.0.0.1:6379> SUBSCRIBE news.sports
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 消息类型
2) "news.sports" # 频道名称
3) (integer) 1 # 当前订阅的频道数

# 订阅多个频道
127.0.0.1:6379> SUBSCRIBE news.sports news.tech
1) "subscribe"
2) "news.sports"
3) (integer) 1
1) "subscribe"
2) "news.tech"
3) (integer) 2

订阅后,客户端进入订阅状态,只能接收消息,不能执行其他命令。

PUBLISH 发布消息

向频道发布消息:

127.0.0.1:6379> PUBLISH news.sports "中国队获胜!"
(integer) 2 # 返回收到消息的订阅者数量

127.0.0.1:6379> PUBLISH news.tech "Redis 7.0 发布"
(integer) 1

UNSUBSCRIBE 取消订阅

取消订阅频道:

# 取消特定频道
127.0.0.1:6379> UNSUBSCRIBE news.sports
1) "unsubscribe"
2) "news.sports"
3) (integer) 1 # 剩余订阅数

# 取消所有订阅
127.0.0.1:6379> UNSUBSCRIBE
1) "unsubscribe"
2) "news.sports"
3) (integer) 0

模式订阅

PSUBSCRIBE 订阅模式

使用通配符订阅匹配的频道:

# 订阅所有 news 开头的频道
127.0.0.1:6379> PSUBSCRIBE news.*
1) "psubscribe"
2) "news.*"
3) (integer) 1

# 匹配的频道:
# news.sports、news.tech、news.weather 等

通配符规则

符号说明
*匹配任意字符(包括空)
?匹配单个字符
[abc]匹配 a、b 或 c
# 示例
PSUBSCRIBE user:*:msg # 匹配 user:123:msg, user:456:msg
PSUBSCRIBE log:?:error # 匹配 log:a:error, log:b:error
PSUBSCRIBE news:[abc] # 匹配 news:a, news:b, news:c

PUNSUBSCRIBE 取消模式订阅

# 取消特定模式
127.0.0.1:6379> PUNSUBSCRIBE news.*

# 取消所有模式订阅
127.0.0.1:6379> PUNSUBSCRIBE

消息格式

频道订阅消息格式

订阅频道时收到的消息格式:

# 订阅确认
1) "subscribe" # 消息类型
2) "channel_name" # 频道名称
3) 1 # 订阅数量

# 收到消息
1) "message" # 消息类型
2) "channel_name" # 频道名称
3) "message_body" # 消息内容

# 取消订阅
1) "unsubscribe" # 消息类型
2) "channel_name" # 频道名称
3) 0 # 剩余订阅数量

模式订阅消息格式

模式订阅收到的消息格式:

# 模式订阅确认
1) "psubscribe" # 消息类型
2) "pattern" # 模式
3) 1 # 订阅数量

# 收到消息
1) "pmessage" # 消息类型
2) "pattern" # 匹配的模式
3) "channel_name" # 实际频道名称
4) "message_body" # 消息内容

消息类型详解

subscribe 消息

确认订阅成功:

1) "subscribe"
2) "news.sports"
3) (integer) 1

message 消息

普通频道消息:

1) "message"
2) "news.sports"
3) "中国队获胜!"

pmessage 消息

模式匹配的消息:

1) "pmessage"
2) "news.*" # 匹配的模式
3) "news.sports" # 实际频道
4) "中国队获胜!"

unsubscribe 消息

取消订阅确认:

1) "unsubscribe"
2) "news.sports"
3) (integer) 0

Pub/Sub 信息查询

PUBSUB CHANNELS

查看活跃频道:

# 查看所有活跃频道
127.0.0.1:6379> PUBSUB CHANNELS
1) "news.sports"
2) "news.tech"

# 查看匹配的活跃频道
127.0.0.1:6379> PUBSUB CHANNELS news.*
1) "news.sports"
2) "news.tech"

活跃频道:至少有一个订阅者的频道。

PUBSUB NUMSUB

查看频道订阅者数量:

127.0.0.1:6379> PUBSUB NUMSUB news.sports news.tech
1) "news.sports"
2) (integer) 3 # 3 个订阅者
3) "news.tech"
4) (integer) 1 # 1 个订阅者

PUBSUB NUMPAT

查看模式订阅总数:

127.0.0.1:6379> PUBSUB NUMPAT
(integer) 5 # 当前有 5 个模式订阅

分片 Pub/Sub(Redis 7.0+)

概述

在 Redis Cluster 中,普通的 Pub/Sub 会将消息广播到所有节点,可能造成网络压力。分片 Pub/Sub 将频道分配到特定槽位,消息只在对应分片内传播。

┌─────────────────────────────────────────────────────────────┐
│ 分片 Pub/Sub │
├─────────────────────────────────────────────────────────────┤
│ │
│ 普通 Pub/Sub:消息广播到所有节点 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node A │──>│ Node B │──>│ Node C │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ↑ ↑ ↑ │
│ └─────────────┴─────────────┘ │
│ 所有节点都收到消息 │
│ │
│ ───────────────────────────────────────────────────── │
│ │
│ 分片 Pub/Sub:消息只在分片内传播 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node A │ │ Node B │ │ Node C │ │
│ │ Shard 1 │ │ Shard 2 │ │ Shard 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ↑ │
│ └──> 只有相关分片收到消息 │
│ │
└─────────────────────────────────────────────────────────────┘

分片命令

SSUBSCRIBE 分片订阅

# 订阅分片频道
SSUBSCRIBE myshard:channel

SPUBLISH 分片发布

# 发布分片消息
SPUBLISH myshard:channel "message"

SUNSUBSCRIBE 取消分片订阅

SUNSUBSCRIBE myshard:channel

分片规则

频道名按相同算法映射到槽位(类似键名),确保同一分片频道的订阅者在同一节点组。

应用场景

场景一:实时通知

import redis
import threading

# 发布者
def publisher():
r = redis.Redis()
while True:
message = input("输入消息: ")
r.publish('notifications', message)

# 订阅者
def subscriber(name):
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('notifications')

for message in pubsub.listen():
if message['type'] == 'message':
print(f"[{name}] 收到: {message['data'].decode()}")

# 启动订阅者
threading.Thread(target=subscriber, args=('用户A',)).start()
threading.Thread(target=subscriber, args=('用户B',)).start()

# 启动发布者
publisher()

场景二:聊天室

import redis
import threading

class ChatRoom:
def __init__(self, room_name, user_name):
self.r = redis.Redis()
self.room = room_name
self.user = user_name
self.channel = f"chat:{room_name}"

def join(self):
"""加入聊天室"""
pubsub = self.r.pubsub()
pubsub.subscribe(self.channel)

# 发送加入消息
self.r.publish(self.channel, f"系统: {self.user} 加入了聊天室")

# 监听消息
for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'].decode())

def send(self, text):
"""发送消息"""
self.r.publish(self.channel, f"{self.user}: {text}")

def leave(self):
"""离开聊天室"""
self.r.publish(self.channel, f"系统: {self.user} 离开了聊天室")

# 使用
chat = ChatRoom('general', '张三')
threading.Thread(target=chat.join).start()
chat.send("大家好!")

场景三:配置更新通知

import redis
import json

class ConfigManager:
def __init__(self):
self.r = redis.Redis()
self.config = {}
self._load_config()
self._listen_updates()

def _load_config(self):
"""加载配置"""
config_data = self.r.get('app:config')
if config_data:
self.config = json.loads(config_data)

def _listen_updates(self):
"""监听配置更新"""
def listener():
pubsub = self.r.pubsub()
pubsub.subscribe('config:updates')

for message in pubsub.listen():
if message['type'] == 'message':
self._load_config()
print("配置已更新")

import threading
threading.Thread(target=listener, daemon=True).start()

def update_config(self, new_config):
"""更新配置"""
self.r.set('app:config', json.dumps(new_config))
self.r.publish('config:updates', 'updated')

# 使用
manager = ConfigManager()
manager.update_config({'theme': 'dark', 'language': 'zh-CN'})

场景四:事件广播

import redis
import json

class EventEmitter:
def __init__(self):
self.r = redis.Redis()

def emit(self, event_name, data):
"""发送事件"""
self.r.publish(f'events:{event_name}', json.dumps(data))

def on(self, event_name, handler):
"""监听事件"""
def listener():
pubsub = self.r.pubsub()
pubsub.subscribe(f'events:{event_name}')

for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
handler(data)

import threading
threading.Thread(target=listener, daemon=True).start()

# 使用
emitter = EventEmitter()

# 监听用户登录事件
def on_login(data):
print(f"用户登录: {data['user']}")

emitter.on('user.login', on_login)

# 发送事件
emitter.emit('user.login', {'user': '张三', 'time': '2024-01-01 10:00:00'})

注意事项

1. 消息不持久化

# 订阅者离线时,消息会丢失
# 客户端 A:订阅频道
SUBSCRIBE news

# 客户端 B:发布消息
PUBLISH news "消息1" # 如果此时没有订阅者,消息丢失

# 客户端 A:只能收到之后的消息

解决方案:需要持久化时使用 Redis Stream。

2. 订阅状态限制

在 RESP2 协议下,订阅状态的客户端只能执行:

SUBSCRIBE
UNSUBSCRIBE
PSUBSCRIBE
PUNSUBSCRIBE
PING
QUIT

RESP3 协议(Redis 6.0+)取消了此限制。

3. 与数据库无关

Pub/Sub 不受数据库编号限制:

# 在 DB 0 发布
SELECT 0
PUBLISH news "消息"

# 在 DB 1 也能收到
SELECT 1
SUBSCRIBE news
# 仍然能收到消息

4. 重复订阅

同时订阅频道和匹配模式,会收到两条消息:

SUBSCRIBE news.sports
PSUBSCRIBE news.*

# 发布消息
PUBLISH news.sports "消息"

# 收到两条消息:
# 1) "message" + "news.sports" + "消息"
# 2) "pmessage" + "news.*" + "news.sports" + "消息"

小结

本章我们学习了:

  1. 基本概念:发布订阅模式和工作原理
  2. 基本命令:SUBSCRIBE、PUBLISH、UNSUBSCRIBE
  3. 模式订阅:PSUBSCRIBE 通配符匹配
  4. 消息格式:不同类型消息的结构
  5. 分片 Pub/Sub:Redis 7.0 新特性
  6. 应用场景:通知、聊天室、配置更新、事件广播

练习

  1. 实现一个简单的聊天室程序
  2. 使用模式订阅实现多级频道
  3. 对比 Pub/Sub 和 Stream 的使用场景
  4. 实现配置热更新通知机制

参考资源