跳到主要内容

Redis Lua 脚本

Lua 脚本允许在 Redis 服务器端执行自定义逻辑,实现复杂操作的原子性执行。Redis 内置 Lua 5.1 解释器。

Lua 脚本概述

为什么使用 Lua 脚本?

┌─────────────────────────────────────────────────────────────┐
│ Lua 脚本的优势 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 原子性执行 │
│ ├── 脚本执行期间,其他命令等待 │
│ ├── 无需使用事务 │
│ └── 避免竞态条件 │
│ │
│ 2. 减少网络开销 │
│ ├── 多个命令一次发送 │
│ ├── 数据在服务端处理 │
│ └── 适合复杂逻辑 │
│ │
│ 3. 数据本地性 │
│ ├── 脚本在数据所在位置执行 │
│ └── 减少数据传输 │
│ │
│ 4. 扩展功能 │
│ ├── 实现自定义命令 │
│ └── 组合现有命令 │
│ │
└─────────────────────────────────────────────────────────────┘

Lua 脚本与事务对比

特性Lua 脚本事务
原子性完全保证保证
复杂逻辑支持(条件、循环)不支持
性能更高(一次执行)需多次交互
调试有调试器支持
灵活性

基本命令

EVAL 执行脚本

EVAL 命令直接执行 Lua 脚本:

# 语法
EVAL script numkeys key [key ...] arg [arg ...]

# 简单示例
127.0.0.1:6379> EVAL "return 'Hello, Redis!'" 0
"Hello, Redis!"

# 使用键参数
127.0.0.1:6379> EVAL "return redis.call('GET', KEYS[1])" 1 mykey
"value"

# 使用额外参数
127.0.0.1:6379> EVAL "return {KEYS[1], ARGV[1], ARGV[2]}" 1 key1 arg1 arg2
1) "key1"
2) "arg1"
3) "arg2"

参数说明

  • script:Lua 脚本代码
  • numkeys:键参数的数量
  • key [key ...]:键名列表(通过 KEYS 访问)
  • arg [arg ...]:额外参数(通过 ARGV 访问)

EVALSHA 执行缓存脚本

使用脚本的 SHA1 校验和执行缓存脚本:

# 先加载脚本
127.0.0.1:6379> SCRIPT LOAD "return redis.call('GET', KEYS[1])"
"a5290e74dc518b648c16d0b3b0b4b6b0e0e8d6d6"

# 使用 SHA1 执行
127.0.0.1:6379> EVALSHA a5290e74dc518b648c16d0b3b0b4b6b0e0e8d6d6 1 mykey
"value"

SCRIPT 命令

管理脚本的命令:

# 加载脚本到缓存
SCRIPT LOAD script

# 检查脚本是否存在
SCRIPT EXISTS sha1 [sha1 ...]

# 清空脚本缓存
SCRIPT FLUSH

# 终止运行中的脚本
SCRIPT KILL

# 调试模式
SCRIPT DEBUG YES|SYNC|NO
# 示例
127.0.0.1:6379> SCRIPT LOAD "return 1"
"e0e1f950fc57447d78c3c868843f8c8e8c8e8c8e"

127.0.0.1:6379> SCRIPT EXISTS e0e1f950fc57447d78c3c868843f8c8e8c8e8c8e
1) (integer) 1

127.0.0.1:6379> SCRIPT FLUSH
OK

KEYS 和 ARGV

参数传递

脚本通过全局变量 KEYSARGV 访问参数:

# KEYS: 键名数组,从 KEYS[1] 开始
# ARGV: 额外参数数组,从 ARGV[1] 开始

127.0.0.1:6379> EVAL "
return {
KEYS[1],
KEYS[2],
ARGV[1],
ARGV[2],
#KEYS, -- 键的数量
#ARGV -- 参数的数量
}
" 2 key1 key2 arg1 arg2 arg3

1) "key1"
2) "key2"
3) "arg1"
4) "arg2"
5) (integer) 2
6) (integer) 3

为什么区分 KEYS 和 ARGV?

在 Redis Cluster 中,脚本需要知道要访问哪些键,以确保所有键在同一槽位:

# 正确:使用 KEYS 传递键名
EVAL "return redis.call('GET', KEYS[1])" 1 user:{123}:name

# 错误:键名硬编码在脚本中
EVAL "return redis.call('GET', 'user:123:name')" 0
# 可能违反 Cluster 的键分布规则

最佳实践:所有键名都通过 KEYS 传递。

调用 Redis 命令

redis.call()

执行 Redis 命令,出错时返回错误:

-- 基本调用
local value = redis.call('GET', 'mykey')

-- 条件逻辑
local exists = redis.call('EXISTS', KEYS[1])
if exists == 1 then
return redis.call('GET', KEYS[1])
else
return nil
end

redis.pcall()

执行 Redis 命令,出错时返回错误对象而不是抛出:

-- 错误处理
local result = redis.pcall('INCR', KEYS[1])
if type(result) == 'table' and result.err then
-- 处理错误
return {err = result.err}
end
return result

两者的区别

-- redis.call() 出错时直接返回错误
redis.call('INCR', 'non_numeric_key')
-- ERR value is not an integer or out of range

-- redis.pcall() 出错时返回错误对象
local result = redis.pcall('INCR', 'non_numeric_key')
if result.err then
-- 可以继续执行其他逻辑
redis.call('SET', 'non_numeric_key', '0')
end

数据类型转换

Lua 到 Redis 类型转换

Lua 类型Redis 返回类型
number整数
string字符串
table(数组)数组
table(有 err 字段)错误
table(有 ok 字段)状态回复
boolean falseNil
boolean true整数 1
-- 示例
return 100 -- 整数
return "hello" -- 字符串
return {1, 2, 3} -- 数组
return {err = "错误"} -- 错误
return {ok = "成功"} -- 状态
return true -- 1
return false -- nil

Redis 到 Lua 类型转换

Redis 类型Lua 类型
整数number
字符串string
数组table
状态table(有 ok 字段)
错误table(有 err 字段)
Nilboolean false
-- 示例
local num = redis.call('INCR', 'counter') -- number
local str = redis.call('GET', 'key') -- string
local arr = redis.call('LRANGE', 'list', 0, -1) -- table

实际应用示例

示例一:限流器

-- rate_limiter.lua
-- KEYS[1]: 限流键名
-- ARGV[1]: 时间窗口(秒)
-- ARGV[2]: 最大请求数
-- 返回: 1 表示允许, 0 表示拒绝

local key = KEYS[1]
local window = tonumber(ARGV[1])
local max_requests = tonumber(ARGV[2])

local current = redis.call('GET', key)
if current == false then
current = 0
else
current = tonumber(current)
end

if current >= max_requests then
return 0
end

redis.call('INCR', key)
redis.call('EXPIRE', key, window)
return 1
# 使用
EVAL "限流器脚本" 1 rate:user:123 60 10
# 60秒内最多10次请求

示例二:分布式锁

-- acquire_lock.lua
-- KEYS[1]: 锁键名
-- ARGV[1]: 锁的值(通常是 UUID)
-- ARGV[2]: 过期时间(毫秒)
-- 返回: 1 表示成功, 0 表示失败

if redis.call('EXISTS', KEYS[1]) == 0 then
redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
end
return 0

-- release_lock.lua
-- KEYS[1]: 锁键名
-- ARGV[1]: 锁的值
-- 返回: 1 表示成功, 0 表示失败

if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
# 获取锁
EVAL "获取锁脚本" 1 lock:resource "uuid-123" 30000

# 释放锁
EVAL "释放锁脚本" 1 lock:resource "uuid-123"

示例三:秒杀

-- seckill.lua
-- KEYS[1]: 库存键
-- KEYS[2]: 用户已购键(Set)
-- ARGV[1]: 用户 ID
-- ARGV[2]: 购买数量
-- 返回: 1 成功, 0 库存不足, -1 已购买

-- 检查是否已购买
if redis.call('SISMEMBER', KEYS[2], ARGV[1]) == 1 then
return -1
end

-- 检查库存
local stock = tonumber(redis.call('GET', KEYS[1]) or 0)
local buy_num = tonumber(ARGV[2])

if stock < buy_num then
return 0
end

-- 扣减库存,记录用户
redis.call('DECRBY', KEYS[1], buy_num)
redis.call('SADD', KEYS[2], ARGV[1])
return 1

示例四:缓存更新

-- cache_update.lua
-- KEYS[1]: 缓存键
-- ARGV[1]: 新值
-- ARGV[2]: 过期时间(秒)
-- 返回: 旧值

local old_value = redis.call('GET', KEYS[1])
redis.call('SET', KEYS[1], ARGV[1], 'EX', ARGV[2])
return old_value

示例五:列表原子操作

-- list_atomic_push.lua
-- KEYS[1]: 列表键
-- ARGV[1]: 最大长度
-- ARGV[2...]: 要添加的元素
-- 返回: 列表当前长度

local max_len = tonumber(ARGV[1])

-- 添加元素
for i = 2, #ARGV do
redis.call('RPUSH', KEYS[1], ARGV[i])
end

-- 裁剪列表
redis.call('LTRIM', KEYS[1], -max_len, -1)

return redis.call('LLEN', KEYS[1])

Python 客户端示例

使用 eval

import redis

r = redis.Redis()

# 定义脚本
script = """
local current = redis.call('GET', KEYS[1])
if current == false then
current = 0
else
current = tonumber(current)
end

if current >= tonumber(ARGV[1]) then
return 0
end

redis.call('INCR', KEYS[1])
redis.call('EXPIRE', KEYS[1], ARGV[2])
return 1
"""

# 执行脚本
result = r.eval(script, 1, 'rate:user:123', 10, 60)
print(f"允许: {result == 1}")

使用 register_script

import redis

r = redis.Redis()

# 注册脚本
limiter_script = r.register_script("""
local current = redis.call('GET', KEYS[1])
if current == false then
current = 0
else
current = tonumber(current)
end

if current >= tonumber(ARGV[1]) then
return 0
end

redis.call('INCR', KEYS[1])
redis.call('EXPIRE', KEYS[1], ARGV[2])
return 1
""")

# 多次使用
result = limiter_script(keys=['rate:user:123'], args=['10', '60'])

Script 对象

import redis

r = redis.Redis()

# 使用 Script 对象
lock_script = r.register_script("""
if redis.call('EXISTS', KEYS[1]) == 0 then
redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
return 1
end
return 0
""")

unlock_script = r.register_script("""
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
""")

# 使用
def acquire_lock(resource, token, timeout=30000):
return lock_script(
keys=[f'lock:{resource}'],
args=[token, timeout]
)

def release_lock(resource, token):
return unlock_script(
keys=[f'lock:{resource}'],
args=[token]
)

脚本缓存

缓存机制

每个执行过的脚本都会缓存在服务器内存中:

# 查看缓存信息
127.0.0.1:6379> INFO memory | grep script
used_memory_scripts_eval:1234
number_of_cached_scripts:5

缓存特点

  1. 不持久化:重启后缓存清空
  2. 全量刷新:SCRIPT FLUSH 清空所有缓存
  3. 按需加载:EVALSHA 失败时需要重新加载

推荐模式

import redis

class ScriptManager:
def __init__(self, r):
self.r = r
self.scripts = {}

def get_script(self, name, script_code):
"""获取或加载脚本"""
if name not in self.scripts:
self.scripts[name] = self.r.register_script(script_code)
return self.scripts[name]

def execute(self, name, script_code, keys=None, args=None):
"""执行脚本(自动处理缓存)"""
script = self.get_script(name, script_code)
try:
return script(keys=keys or [], args=args or [])
except redis.exceptions.NoScriptError:
# 脚本不在缓存,重新注册
self.scripts[name] = self.r.register_script(script_code)
return self.scripts[name](keys=keys or [], args=args or [])

# 使用
r = redis.Redis()
manager = ScriptManager(r)
result = manager.execute(
'limiter',
'限流脚本代码',
keys=['rate:user:123'],
args=['10', '60']
)

调试脚本

使用 redis-cli 调试

# 启动调试模式
redis-cli --ldb --eval myscript.lua key1 key2 , arg1 arg2

# 调试命令
# n - 下一步
# s - 进入函数
# c - 继续执行
# l - 显示当前行
# p - 打印变量
# b - 设置断点

使用 redis.debug()

-- 在脚本中打印调试信息
local value = redis.call('GET', KEYS[1])
redis.debug('当前值:', value)
redis.debug('参数:', ARGV[1])
return value

性能注意事项

1. 避免长时间运行的脚本

-- 不好的做法:循环处理大量数据
for i = 1, 1000000 do
redis.call('SET', 'key:'..i, 'value')
end

-- 好的做法:分批处理
-- 在应用层控制批次

2. 使用局部变量

-- 不好的做法
function process()
redis.call('GET', KEYS[1])
redis.call('GET', KEYS[1]) -- 重复调用
end

-- 好的做法
function process()
local value = redis.call('GET', KEYS[1])
-- 使用缓存的 value
end

3. 合理使用 pipeline

# 脚本本身已经是原子的,不需要额外的 pipeline
# 但多个脚本调用可以使用 pipeline

pipe = r.pipeline()
pipe.eval(script1, 1, 'key1')
pipe.eval(script2, 1, 'key2')
results = pipe.execute()

Redis Functions (Redis 7.0+)

Redis Functions 是 Redis 7.0 引入的重大特性,旨在取代直接在应用中嵌入 Lua 脚本的旧模式。

为什么演进?

维度旧 EVAL/Lua 脚本新 Redis Functions
持久性重启后由于命令缓存清空导致 EVALSHA 失败存储在 RDB/AOF 中,永久生效
扩展性客户端需要不断管理 SHA1一次定义,库(Library)级别管理
调试性动态负载,难以追查模块化结构,更接近插件式开发
复用性每个客户端独立推送脚本服务器层面统一提供的 API

定义函数 (Library)

函数以 Library 为单位组织:

# 使用 FUNCTION LOAD 加载一个库 (包含 metadata)
127.0.0.1:6379> FUNCTION LOAD "#!lua name=mylib\n redis.register_function('myincr', function(keys, args) return redis.call('INCRBY', keys[1], args[1]) end)"
"mylib"

执行函数

# 语法: FCALL <function_name> <num_keys> <key> <arg>
127.0.0.1:6379> FCALL myincr 1 mykey 10
(integer) 10

管理函数

# 查看库列表
FUNCTION LIST
# 查看详情
FUNCTION LIST WITHCODE
# 删除库
FUNCTION DELETE mylib
# 清空
FUNCTION FLUSH

生产建议

  1. 迁移策略:将通用的业务逻辑(如原子扣库存、分布式锁令牌清理)封装为 Library 函数。
  2. 多语言支持:目前主要支持 Lua 脚本引擎,未来版本计划引入对其他引擎的支持。
  3. 版本控制:在库名中包含版本信息(如 order_biz_v1),方便平滑升级。

脚本标志(Redis 7.0+)

Redis 7.0 支持脚本标志声明:

#!lua flags=no-writes,allow-stale

-- 声明脚本不写入数据,允许在从库执行
local value = redis.call('GET', KEYS[1])
return value

可用标志

标志说明
no-writes脚本只读
allow-oom允许在内存不足时执行
allow-stale允许在从库执行
no-cluster不允许在集群执行

小结

本章我们学习了:

  1. 基本命令:EVAL、EVALSHA、SCRIPT 系列
  2. 参数传递:KEYS 和 ARGV 的使用
  3. 命令调用:redis.call() 和 redis.pcall()
  4. 实际应用:限流器、分布式锁、秒杀等
  5. 脚本缓存:管理机制和最佳实践
  6. 调试技巧:redis-cli 调试和 redis.debug()

练习

  1. 编写一个原子性的计数器脚本
  2. 实现一个分布式锁的获取和释放
  3. 使用 Lua 脚本优化限流器
  4. 编写一个批量获取并设置默认值的脚本

参考资源