Redis Stack 模块详解
Redis Stack 是 Redis 官方推出的扩展包,在 Redis 核心功能的基础上,增加了 JSON 文档存储、全文搜索、时序数据处理、概率数据结构等高级能力。这些模块让 Redis 从一个简单的键值存储进化为一个功能完整的多模型数据库。
什么是 Redis Stack?
Redis Stack 将多个 Redis 模块打包在一起,提供了开箱即用的完整解决方案:
| 模块 | 功能 | 典型应用场景 |
|---|---|---|
| RedisJSON | JSON 文档存储与操作 | 用户配置、商品信息、嵌套数据 |
| RediSearch | 全文搜索与二级索引 | 商品搜索、文档检索、自动补全 |
| RedisTimeSeries | 时序数据存储与分析 | 监控指标、IoT 传感器数据 |
| RedisBloom | 概率数据结构 | 布隆过滤器、基数估算、频率统计 |
为什么需要 Redis Stack?
在 Redis Stack 出现之前,很多场景需要组合多个系统:
传统方案:
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
├─────────────┬─────────────┬─────────────┬─────────────────┤
│ Redis │ Elasticsearch│ InfluxDB │ 其他系统 │
│ (缓存) │ (全文搜索) │ (时序数据) │ │
└─────────────┴─────────────┴─────────────┴─────────────────┘
问题:数据同步复杂、运维成本高、延迟增加
Redis Stack 方案:
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
├─────────────────────────────────────────────────────────────┤
│ Redis Stack │
│ ┌─────────┬──────────┬────────────┬──────────────┐ │
│ │ JSON │ Search │ TimeSeries │ Bloom │ │
│ └─────────┴──────────┴────────────┴──────────────┘ │
└─────────────────────────────────────────────────────────────┘
优势:单一数据源、低延迟、简化架构
安装 Redis Stack
Docker 方式(推荐):
# 拉取 Redis Stack 镜像
docker pull redis/redis-stack:latest
# 启动容器
docker run -d --name redis-stack \
-p 6379:6379 \
-p 8001:8001 \
redis/redis-stack:latest
# 6379: Redis 服务端口
# 8001: RedisInsight 图形界面端口
其他安装方式:
# Ubuntu/Debian
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
sudo apt-get update
sudo apt-get install redis-stack-server
# macOS (Homebrew)
brew tap redis-stack/redis-stack
brew install redis-stack
# 启动服务
redis-stack-server
RedisJSON:JSON 文档存储
RedisJSON 让 Redis 能够原生存储和操作 JSON 文档。与使用 String 存储 JSON 字符串相比,RedisJSON 支持 JSONPath 语法进行部分读写,无需读取和覆盖整个文档。
JSONPath 语法基础
RedisJSON 使用 JSONPath 表达式定位文档中的元素:
| 表达式 | 说明 | 示例 |
|---|---|---|
$ | 根节点 | $ |
. | 子节点 | $.name |
.. | 递归查找 | $..name |
[] | 数组索引 | $.items[0] |
[*] | 所有数组元素 | $.items[*] |
[start:end] | 数组切片 | $.items[0:3] |
基本操作
设置和获取 JSON 文档
# 设置完整的 JSON 文档
127.0.0.1:6379> JSON.SET user:1001 $ '{"name":"张三","age":25,"email":"[email protected]"}'
OK
# 获取完整文档
127.0.0.1:6379> JSON.GET user:1001
"{\"name\":\"张三\",\"age\":25,\"email\":\"[email protected]\"}"
# 获取特定字段(返回 JSON 数组)
127.0.0.1:6379> JSON.GET user:1001 $.name
"[\"张三\"]"
# 获取多个字段
127.0.0.1:6379> JSON.GET user:1001 $.name $.age
"{\"$.name\":[\"张三\"],\"$.age\":[25]}"
解释:$ 表示文档根路径,$.name 表示根节点下的 name 字段。返回值总是 JSON 数组格式,因为 JSONPath 可能匹配多个值。
部分更新
# 更新单个字段
127.0.0.1:6379> JSON.SET user:1001 $.age 26
OK
# 更新嵌套对象
127.0.0.1:6379> JSON.SET user:1001 $.address '{"city":"北京","zip":"100000"}'
OK
# 批量更新多个字段
127.0.0.1:6379> JSON.MSET user:1001 $.name "李四" user:1001 $.age 30
OK
数组操作
# 创建包含数组的文档
127.0.0.1:6379> JSON.SET product:2001 $ '{"name":"Redis实战","tags":["redis","database","nosql"]}'
OK
# 追加元素到数组
127.0.0.1:6379> JSON.ARRAPPEND product:2001 $.tags '"cache"'
(integer) 4
# 获取数组长度
127.0.0.1:6379> JSON.ARRLEN product:2001 $.tags
1) (integer) 4
# 获取数组特定索引
127.0.0.1:6379> JSON.GET product:2001 $.tags[0]
"[\"redis\"]"
# 从数组头部弹出元素
127.0.0.1:6379> JSON.ARRPOP product:2001 $.tags 0
"\"redis\""
# 在指定位置插入元素
127.0.0.1:6379> JSON.ARRINSERT product:2001 $.tags 0 '"new_tag"'
(integer) 4
数值操作
# 创建数值字段
127.0.0.1:6379> JSON.SET counter $ '{"views":100}'
OK
# 增加数值
127.0.0.1:6379> JSON.NUMINCRBY counter $.views 10
"[110]"
# 乘法(用于浮点数)
127.0.0.1:6379> JSON.SET price $ '{"value":100.0}'
OK
127.0.0.1:6379> JSON.NUMMULTBY price $.value 1.1
"[110.0]"
删除操作
# 删除特定字段
127.0.0.1:6379> JSON.DEL user:1001 $.email
(integer) 1
# 删除整个文档
127.0.0.1:6379> JSON.DEL user:1001
(integer) 1
实际应用示例
存储用户配置
import redis
import json
r = redis.Redis(decode_responses=True)
def save_user_config(user_id, config):
"""保存用户配置"""
key = f"config:{user_id}"
r.execute_command('JSON.SET', key, '$', json.dumps(config))
def update_theme(user_id, theme):
"""更新主题设置"""
key = f"config:{user_id}"
r.execute_command('JSON.SET', key, '$.theme', json.dumps(theme))
def get_notification_settings(user_id):
"""获取通知设置"""
key = f"config:{user_id}"
result = r.execute_command('JSON.GET', key, '$.notifications')
return json.loads(result)[0] if result else None
# 使用示例
config = {
"theme": "dark",
"language": "zh-CN",
"notifications": {
"email": True,
"push": False,
"frequency": "daily"
}
}
save_user_config(1001, config)
update_theme(1001, "light")
print(get_notification_settings(1001))
存储商品信息
import redis
import json
r = redis.Redis(decode_responses=True)
def add_product(product_id, name, price, tags, stock):
"""添加商品"""
product = {
"name": name,
"price": price,
"tags": tags,
"stock": stock,
"sales": 0,
"created_at": "2024-01-15T10:00:00Z"
}
r.execute_command('JSON.SET', f'product:{product_id}', '$', json.dumps(product))
def update_stock(product_id, quantity):
"""更新库存"""
r.execute_command('JSON.NUMINCRBY', f'product:{product_id}', '$.stock', quantity)
def add_tag(product_id, tag):
"""添加标签"""
r.execute_command('JSON.ARRAPPEND', f'product:{product_id}', '$.tags', json.dumps(tag))
def record_sale(product_id, quantity):
"""记录销售"""
key = f'product:{product_id}'
r.execute_command('JSON.NUMINCRBY', key, '$.sales', quantity)
r.execute_command('JSON.NUMINCRBY', key, '$.stock', -quantity)
# 使用示例
add_product(2001, "Redis实战", 89.0, ["技术", "数据库"], 100)
update_stock(2001, 50) # 补货 50
add_tag(2001, "畅销")
record_sale(2001, 3) # 卖出 3 本
RediSearch:全文搜索与索引
RediSearch 为 Redis 提供了强大的全文搜索、二级索引和聚合查询能力,可以替代 Elasticsearch 处理中小规模的搜索场景。
核心概念
| 概念 | 说明 |
|---|---|
| 索引(Index) | 定义可搜索的字段和类型 |
| 文档(Document) | 被索引的数据,可以来自 Hash 或 JSON |
| 字段类型 | TEXT(文本)、NUMERIC(数值)、TAG(标签)、GEO(地理) |
| 分词器 | 将文本拆分为可搜索的词元 |
创建索引
基于 Hash 的索引
# 先添加一些 Hash 数据
127.0.0.1:6379> HSET product:1 name "Redis实战指南" price 89 category "技术" stock 100
127.0.0.1:6379> HSET product:2 name "Python编程从入门到精通" price 79 category "技术" stock 50
127.0.0.1:6379> HSET product:3 name "JavaScript高级程序设计" price 99 category "技术" stock 80
127.0.0.1:6379> HSET product:4 name "深入理解计算机系统" price 129 category "计算机" stock 30
# 创建索引
127.0.0.1:6379> FT.CREATE product_idx
ON HASH
PREFIX 1 product:
SCHEMA
name TEXT WEIGHT 2.0
price NUMERIC SORTABLE
category TAG
stock NUMERIC
OK
参数说明:
ON HASH:索引的数据来源是 Hash 类型PREFIX 1 product::只索引product:开头的键SCHEMA:定义字段结构TEXT WEIGHT 2.0:文本字段,权重 2.0(搜索时更重要)NUMERIC SORTABLE:数值字段,可排序TAG:标签字段,精确匹配
基于 JSON 的索引
# 添加 JSON 文档
127.0.0.1:6379> JSON.SET article:1 $ '{"title":"Redis集群部署实践","author":"张三","views":1500,"tags":["redis","cluster"],"status":"published"}'
127.0.0.1:6379> JSON.SET article:2 $ '{"title":"MySQL性能优化指南","author":"李四","views":2300,"tags":["mysql","performance"],"status":"published"}'
127.0.0.1:6379> JSON.SET article:3 $ '{"title":"Docker容器化部署","author":"张三","views":980,"tags":["docker","devops"],"status":"draft"}'
# 创建 JSON 索引
127.0.0.1:6379> FT.CREATE article_idx
ON JSON
PREFIX 1 article:
SCHEMA
$.title AS title TEXT
$.author AS author TAG
$.views AS views NUMERIC SORTABLE
$.tags AS tags TAG SEPARATOR ","
$.status AS status TAG
OK
搜索查询
基本搜索
# 搜索所有文档
127.0.0.1:6379> FT.SEARCH product_idx "*"
# 搜索包含 "Redis" 的文档
127.0.0.1:6379> FT.SEARCH product_idx "Redis"
1) (integer) 1
2) "product:1"
3) 1) "name"
2) "Redis实战指南"
3) "price"
4) "89"
5) "category"
6) "技术"
7) "stock"
8) "100"
# 指定返回字段
127.0.0.1:6379> FT.SEARCH product_idx "Redis" RETURN 2 name price
模糊搜索和通配符
# 前缀匹配
127.0.0.1:6379> FT.SEARCH product_idx "Pyt*"
# 模糊匹配(允许 1 个字符差异)
127.0.0.1:6379> FT.SEARCH product_idx "%Reds%"
# 正则表达式
127.0.0.1:6379> FT.SEARCH product_idx "/Red.s/"
字段过滤
# 在特定字段中搜索
127.0.0.1:6379> FT.SEARCH product_idx "@name:Redis"
# 数值范围过滤
127.0.0.1:6379> FT.SEARCH product_idx "*" FILTER price 50 100
# 标签精确匹配
127.0.0.1:6379> FT.SEARCH product_idx "@category:{技术}"
# 多标签匹配(OR)
127.0.0.1:6379> FT.SEARCH product_idx "@category:{技术|计算机}"
组合查询
# AND 查询(空格分隔)
127.0.0.1:6379> FT.SEARCH product_idx "Redis 技术"
# OR 查询(| 分隔)
127.0.0.1:6379> FT.SEARCH product_idx "Redis|Python"
# NOT 查询(- 前缀)
127.0.0.1:6379> FT.SEARCH product_idx "技术 -Redis"
# 复杂组合
127.0.0.1:6379> FT.SEARCH product_idx "(@name:Python|Redis) @category:{技术}"
排序和分页
# 按价格升序排序
127.0.0.1:6379> FT.SEARCH product_idx "*" SORTBY price ASC
# 按价格降序排序,限制返回 5 条
127.0.0.1:6379> FT.SEARCH product_idx "*" SORTBY price DESC LIMIT 0 5
# 分页(LIMIT offset count)
127.0.0.1:6379> FT.SEARCH product_idx "*" LIMIT 10 10
聚合查询
聚合查询用于数据分析和统计:
# 按类别分组统计数量
127.0.0.1:6379> FT.AGGREGATE product_idx "*" GROUPBY 1 @category REDUCE COUNT 0 AS count
# 计算平均价格
127.0.0.1:6379> FT.AGGREGATE product_idx "*" GROUPBY 1 @category REDUCE AVG 1 @price AS avg_price
# 多级聚合
127.0.0.1:6379> FT.AGGREGATE article_idx "*"
GROUPBY 2 @author @status
REDUCE COUNT 0 AS count
REDUCE SUM 1 @views AS total_views
中文搜索
RediSearch 支持中文分词,需要指定中文分词器:
# 创建支持中文的索引
127.0.0.1:6379> FT.CREATE chinese_idx
ON HASH
PREFIX 1 doc:
SCHEMA
title TEXT WITHSUFFIXTRIE
content TEXT CHINESE
# 搜索中文
127.0.0.1:6379> FT.SEARCH chinese_idx "分布式系统"
注意:中文分词需要 Redis Stack 7.2+ 版本。
自动补全
RediSearch 提供了自动补全功能:
# 创建自动补全字典
127.0.0.1:6379> FT.SUGADD autocomplete "Redis数据库" 100
127.0.0.1:6379> FT.SUGADD autocomplete "Redis集群" 90
127.0.0.1:6379> FT.SUGADD autocomplete "Redis哨兵" 80
# 获取补全建议
127.0.0.1:6379> FT.SUGGET autocomplete "Red"
1) "Redis数据库"
2) "Redis集群"
3) "Redis哨兵"
索引管理
# 查看索引信息
127.0.0.1:6379> FT.INFO product_idx
# 列出所有索引
127.0.0.1:6379> FT._LIST
# 删除索引(不删除数据)
127.0.0.1:6379> FT.DROPINDEX product_idx
# 删除索引和数据
127.0.0.1:6379> FT.DROPINDEX product_idx DD
Python 客户端示例
import redis
from redis.commands.search.field import TextField, NumericField, TagField
from redis.commands.search.indexDefinition import IndexDefinition
from redis.commands.search.query import Query
r = redis.Redis(decode_responses=True)
# 创建索引
def create_product_index():
"""创建商品索引"""
schema = (
TextField("name", weight=2.0),
NumericField("price", sortable=True),
TagField("category"),
NumericField("stock")
)
definition = IndexDefinition(prefix=["product:"], index_type="hash")
r.ft("product_idx").create_index(schema, definition=definition)
# 搜索商品
def search_products(keyword, min_price=None, max_price=None, category=None):
"""搜索商品"""
query = Query(keyword).no_content()
if min_price is not None and max_price is not None:
query = query.add_filter(NumericFilter("price", min_price, max_price))
if category:
query = query.add_filter(TagFilter("category", [category]))
result = r.ft("product_idx").search(query)
return result.total, result.docs
# 使用示例
# create_product_index()
# total, docs = search_products("Redis", min_price=50, max_price=100)
RedisBloom:概率数据结构
RedisBloom 提供了多种概率数据结构,它们的特点是:空间占用极小,但结果有一定误差。适用于大规模数据的快速判断场景。
布隆过滤器(Bloom Filter)
布隆过滤器可以快速判断一个元素一定不存在或可能存在于集合中。
工作原理
布隆过滤器结构:
┌─────────────────────────────────────────────────────┐
│ 位数组 (m 位) │
│ ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┐ │
│ │ 0 │ 1 │ 0 │ 1 │ 0 │ 0 │ 1 │ 0 │ 1 │ 0 │ 0 │ 1 │ │
│ └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┘ │
└─────────────────────────────────────────────────────┘
添加元素 "user:123":
hash1("user:123") % m = 2 → 设置第 2 位
hash2("user:123") % m = 5 → 设置第 5 位
hash3("user:123") % m = 8 → 设置第 8 位
判断:
- 如果所有对应的位都是 1 → 可能存在(有误判率)
- 如果有任何一位是 0 → 一定不存在
基本操作
# 添加元素(自动创建过滤器)
127.0.0.1:6379> BF.ADD spam_emails "[email protected]"
(integer) 1
# 批量添加
127.0.0.1:6379> BF.MADD spam_emails "[email protected]" "[email protected]"
1) (integer) 1
2) (integer) 1
# 检查是否存在
127.0.0.1:6379> BF.EXISTS spam_emails "[email protected]"
(integer) 1 # 可能存在
127.0.0.1:6379> BF.EXISTS spam_emails "[email protected]"
(integer) 0 # 一定不存在
# 批量检查
127.0.0.1:6379> BF.MEXISTS spam_emails "[email protected]" "[email protected]"
1) (integer) 1
2) (integer) 0
预创建过滤器(推荐)
预先创建可以指定参数,获得更好的性能和准确率:
# BF.RESERVE key error_rate capacity [expansion] [NONSCALING]
# error_rate: 期望的误判率(如 0.01 表示 1%)
# capacity: 预期存储的元素数量
127.0.0.1:6379> BF.RESERVE emails 0.01 1000000
OK
# 然后添加元素
127.0.0.1:6379> BF.ADD emails "[email protected]"
(integer) 1
应用场景
场景一:垃圾邮件过滤
import redis
r = redis.Redis(decode_responses=True)
def is_spam(email_address):
"""检查是否是垃圾邮件地址"""
return r.execute_command('BF.EXISTS', 'spam_emails', email_address)
def add_spam(email_address):
"""添加垃圾邮件地址"""
r.execute_command('BF.ADD', 'spam_emails', email_address)
# 使用
add_spam("[email protected]")
if is_spam("[email protected]"):
print("拦截垃圾邮件")
场景二:推荐系统去重
import redis
r = redis.Redis(decode_responses=True)
# 初始化(预期 1000 万用户,误判率 0.1%)
r.execute_command('BF.RESERVE', 'recommended_users', 0.001, 10000000)
def should_recommend(user_id, target_user_id):
"""判断是否应该推荐"""
key = f"rec:{user_id}"
if r.execute_command('BF.EXISTS', key, target_user_id):
return False # 可能已经推荐过
r.execute_command('BF.ADD', key, target_user_id)
return True
场景三:缓存穿透防护
import redis
r = redis.Redis(decode_responses=True)
def get_user(user_id):
"""获取用户信息,防止缓存穿透"""
# 先检查布隆过滤器
if not r.execute_command('BF.EXISTS', 'valid_user_ids', user_id):
return None # 一定不存在,直接返回
# 查询缓存和数据库
cache_key = f"user:{user_id}"
user = r.get(cache_key)
if user:
return user
# 查询数据库...
return None
# 初始化:加载所有有效的用户 ID
def init_valid_users():
# 从数据库加载所有用户 ID
for user_id in get_all_user_ids_from_db():
r.execute_command('BF.ADD', 'valid_user_ids', user_id)
Cuckoo 过滤器
Cuckoo 过滤器相比布隆过滤器的优势:
- 支持删除操作
- 更高的空间效率
- 更低的误判率
# 创建 Cuckoo 过滤器
127.0.0.1:6379> CF.RESERVE allowed_ips 1000000
OK
# 添加元素
127.0.0.1:6379> CF.ADD allowed_ips "192.168.1.100"
(integer) 1
# 检查存在
127.0.0.1:6379> CF.EXISTS allowed_ips "192.168.1.100"
(integer) 1
# 删除元素(布隆过滤器不支持)
127.0.0.1:6379> CF.DEL allowed_ips "192.168.1.100"
(integer) 1
Count-Min Sketch
用于估计元素出现频率:
# 创建 Count-Min Sketch
127.0.0.1:6379> CMS.INITBYPROB page_views 0.01 0.99
# 增加计数
127.0.0.1:6379> CMS.INCRBY page_views "home" 1
127.0.0.1:6379> CMS.INCRBY page_views "home" 1
127.0.0.1:6379> CMS.INCRBY page_views "about" 1
# 查询频率
127.0.0.1:6379> CMS.QUERY page_views "home"
(integer) 2
Top-K
用于找出最频繁的 K 个元素:
# 创建 Top-K 过滤器
127.0.0.1:6379> TOPK.RESERVE trending_topics 10
# 添加元素
127.0.0.1:6379> TOPK.ADD trending_topics "redis" "redis" "mysql" "redis" "python"
# 获取 Top K
127.0.0.1:6379> TOPK.LIST trending_topics
1) "redis"
2) "mysql"
3) "python"
RedisTimeSeries:时序数据
RedisTimeSeries 专门用于存储和查询时间序列数据,非常适合监控系统、IoT 传感器数据等场景。
基本概念
| 概念 | 说明 |
|---|---|
| 时间序列(Time Series) | 带时间戳的数据点序列 |
| 标签(Labels) | 用于分类和查询的键值对 |
| 规则(Rules) | 自动降采样配置 |
| 聚合(Aggregation) | 数据统计方法(avg、sum、max、min 等) |
基本操作
创建时间序列
# 创建时间序列,保留 1 小时数据
127.0.0.1:6379> TS.CREATE sensor:temp:1 RETENTION 3600000 LABELS location "room1" type "temperature"
OK
# 创建时间序列,保留 24 小时数据
127.0.0.1:6379> TS.CREATE sensor:humidity:1 RETENTION 86400000 LABELS location "room1" type "humidity"
OK
添加数据点
# 添加数据点(自动使用当前时间)
127.0.0.1:6379> TS.ADD sensor:temp:1 * 25.5
(integer) 1705312800000
# 添加指定时间戳的数据
127.0.0.1:6379> TS.ADD sensor:temp:1 1705312860000 26.0
(integer) 1705312860000
# 批量添加
127.0.0.1:6379> TS.MADD sensor:temp:1 * 25.8 sensor:humidity:1 * 65.2
1) (integer) 1705312920000
2) (integer) 1705312920000
查询数据
# 查询时间范围内的数据
127.0.0.1:6379> TS.RANGE sensor:temp:1 1705312800000 1705312920000
1) 1) (integer) 1705312800000
2) 25.5
2) 1) (integer) 1705312860000
2) 26
# 查询最新数据
127.0.0.1:6379> TS.GET sensor:temp:1
1) (integer) 1705312920000
2) "25.8"
# 按标签查询多个时间序列
127.0.0.1:6379> TS.MRANGE - + WITHLABELS FILTER location=room1
聚合查询
# 每 5 分钟计算平均值
127.0.0.1:6379> TS.RANGE sensor:temp:1 1705312800000 1705313100000 AGGREGATION avg 300000
# 聚合类型:
# avg - 平均值
# sum - 求和
# min - 最小值
# max - 最大值
# count - 计数
# first - 第一个值
# last - 最后一个值
# std.p / std.s - 标准差
降采样规则
自动将高精度数据聚合为低精度数据:
# 创建源时间序列(1 秒精度)
127.0.0.1:6379> TS.CREATE sensor:raw RETENTION 86400000
# 创建目标时间序列(1 分钟精度)
127.0.0.1:6379> TS.CREATE sensor:1min RETENTION 604800000
# 创建降采样规则:每 60 秒取平均值写入 sensor:1min
127.0.0.1:6379> TS.CREATERULE sensor:raw sensor:1min AGGREGATION avg 60000
OK
# 现在添加数据到 sensor:raw,会自动聚合到 sensor:1min
127.0.0.1:6379> TS.ADD sensor:raw * 25.5
Python 客户端示例
import redis
import time
r = redis.Redis(decode_responses=True)
class SensorMonitor:
def __init__(self, sensor_id, location):
self.sensor_id = sensor_id
self.temp_key = f"sensor:temp:{sensor_id}"
self.humidity_key = f"sensor:humidity:{sensor_id}"
# 创建时间序列
try:
r.execute_command('TS.CREATE', self.temp_key,
'RETENTION', 86400000, # 24 小时
'LABELS', 'location', location, 'type', 'temperature')
except:
pass
try:
r.execute_command('TS.CREATE', self.humidity_key,
'RETENTION', 86400000,
'LABELS', 'location', location, 'type', 'humidity')
except:
pass
def record(self, temp, humidity):
"""记录传感器数据"""
timestamp = int(time.time() * 1000)
r.execute_command('TS.MADD',
self.temp_key, timestamp, temp,
self.humidity_key, timestamp, humidity)
def get_stats(self, duration_ms=3600000):
"""获取统计信息"""
now = int(time.time() * 1000)
start = now - duration_ms
temp_stats = r.execute_command('TS.RANGE', self.temp_key, start, now,
'AGGREGATION', 'avg', duration_ms)
humidity_stats = r.execute_command('TS.RANGE', self.humidity_key, start, now,
'AGGREGATION', 'avg', duration_ms)
return {
'avg_temp': float(temp_stats[0][1]) if temp_stats else None,
'avg_humidity': float(humidity_stats[0][1]) if humidity_stats else None
}
# 使用示例
monitor = SensorMonitor("sensor-001", "server-room")
monitor.record(25.5, 65.2)
monitor.record(26.0, 64.8)
print(monitor.get_stats())
模块选择指南
| 场景 | 推荐模块 | 理由 |
|---|---|---|
| 存储用户配置、商品信息 | RedisJSON | 支持部分更新,结构化存储 |
| 商品搜索、文档检索 | RediSearch | 全文索引,高性能查询 |
| 垃圾邮件过滤、去重 | RedisBloom | 空间效率高,查询快速 |
| 监控指标、IoT 数据 | RedisTimeSeries | 专为时序数据优化 |
| 复杂应用 | 组合使用 | JSON + Search 常见组合 |
最佳实践
1. 索引设计
# 好的设计:只索引需要的字段
FT.CREATE product_idx ON HASH PREFIX 1 product: SCHEMA name TEXT price NUMERIC
# 避免:索引过多字段
FT.CREATE product_idx ON HASH PREFIX 1 product: SCHEMA
name TEXT description TEXT content TEXT ... # 索引过大
2. 内存优化
# 使用布隆过滤器预判断,减少数据库查询
def get_with_bloom_filter(key):
if not bf.exists(key):
return None # 一定不存在
return db.query(key) # 可能存在,查询数据库
3. 时序数据规划
# 根据数据特点设置保留时间
# 高频数据:短期保留 + 降采样
# 低频数据:长期保留
# 高频传感器数据:保留 1 小时原始 + 7 天聚合
TS.CREATE sensor:raw RETENTION 3600000
TS.CREATE sensor:1h RETENTION 604800000
TS.CREATERULE sensor:raw sensor:1h AGGREGATION avg 3600000
4. JSON 文档设计
# 好的设计:扁平结构
{
"user_id": 1001,
"name": "张三",
"profile": {
"city": "北京",
"age": 25
}
}
# 避免:过深嵌套
{
"data": {
"user": {
"info": {
"profile": {
"detail": {
"name": "张三" # 层级太深
}
}
}
}
}
}
小结
本章我们学习了:
- Redis Stack 概述:模块组合的价值和安装方式
- RedisJSON:JSONPath 语法、文档操作、实际应用
- RediSearch:索引创建、搜索查询、聚合分析、中文支持
- RedisBloom:布隆过滤器、Cuckoo 过滤器、Top-K、Count-Min Sketch
- RedisTimeSeries:时序数据存储、聚合查询、降采样规则
- 最佳实践:索引设计、内存优化、数据规划
练习
- 使用 RedisJSON 存储一个包含嵌套结构的商品信息
- 创建一个支持中文搜索的 RediSearch 索引
- 使用布隆过滤器实现一个简单的 URL 去重系统
- 使用 RedisTimeSeries 记录和查询 CPU 使用率