引言
前段時間組內有個投票的產品,上線前考慮欠缺,導致被刷票嚴重。后來,通過研究,發現可以通過 redis lua 腳本實現限流,這里將 redis lua 腳本相關的知識分享出來,講的不到位的地方還望斧正。
redis lua 腳本相關命令
這一小節的內容是基本命令,可粗略閱讀后跳過,等使用的時候再回來查詢
redis 自 2.6.0 加入了 lua 腳本相關的命令,EVAL
、EVALSHA
、SCRIPT EXISTS
、SCRIPT FLUSH
、SCRIPT KILL
、SCRIPT LOAD
,自 3.2.0 加入了 lua 腳本的調試功能和命令SCRIPT DEBUG
。這里對命令做下簡單的介紹。
EVAL
執行一段lua腳本,每次都需要將完整的lua腳本傳遞給redis服務器。
SCRIPT LOAD
將一段lua腳本緩存到redis中并返回一個tag串,并不會執行。
EVALSHA
執行一個腳本,不過傳入參數是「2」中返回的tag,節省網絡帶寬
- 。
SCRIPT EXISTS
判斷「2」返回的tag串是否存在服務器中。
SCRIPT FLUSH
清除服務器上的所有緩存的腳本。
SCRIPT KILL
殺死正在運行的腳本。
SCRIPT DEBUG
設置調試模式,可設置同步、異步、關閉,同步會阻塞所有請求。
生產環境中,推薦使用EVALSHA
,相較于EVAL
的每次發送腳本主體、浪費帶寬,會更高效。這里要注意SCRIPT KILL
,殺死正在運行腳本的時候,如果腳本執行過寫操作了,這里會殺死失敗,因為這違反了 redis lua 腳本的原子性。調試盡量放在測試環境完成之后再發布到生產環境,在生產環境調試千萬不要使用同步模式,原因下文會詳細討論。
Redis 中 lua 腳本的書寫和調試
redis lua 腳本是對其現有命令的擴充,單個命令不能完成、需要多個命令,但又要保證原子性的動作可以用腳本來實現。腳本中的邏輯一般比較簡單,不要加入太復雜的東西,因為 redis 是單線程的,當腳本執行的時候,其他命令、腳本需要等待直到當前腳本執行完成。因此,對 lua 的語法也不需完全了解,了解基本的使用就足夠了,這里對 lua 語法不做過多介紹,會穿插到腳本示例里面。
一個秒殺搶購示例
假設有一個秒殺活動,商品庫存 100,每個用戶 uid 只能搶購一次。設計搶購流程如下:
- 先通過 uid 判斷是否已經搶過,已經搶過返回
0
結束。
- 判斷商品剩余庫存是否大于0,是的話進入「3」,否的話返回
0
結束。
- 將用戶 uid 加入已購用戶set中。
- 物品數量減一,返回成功
1
結束。
local goodsSurplus
local flag
-- 判斷用戶是否已搶過
local buyMembersKey = tostring(KEYS[1])
local memberUid = tonumber(ARGV[1])
local goodsSurplusKey = tostring(KEYS[2])
local hasBuy = redis.call("sIsMember", buyMembersKey, memberUid)
-- 已經搶購過,返回0
if hasBuy ~= 0 then
return 0
end
-- 準備搶購
goodsSurplus = redis.call("GET", goodsSurplusKey)
if goodsSurplus == false then
return 0
end
-- 沒有剩余可搶購物品
goodsSurplus = tonumber(goodsSurplus)
if goodsSurplus = 0 then
return 0
end
flag = redis.call("SADD", buyMembersKey, memberUid)
flag = redis.call("DECR", goodsSurplusKey)
return 1
即使不了解 lua,相信你也可以將上面的腳本看個一二,其中--
開始的是單行注釋。local
用來聲明局部變量,redis lua 腳本中的所有變量都應該聲明為local xxx
,避免在持久化、復制的時候產生各種問題。KEYS
和ARGV
是兩個全局變量,就像 PHP 中的$argc
、$argv
一樣,腳本執行時傳入的參數會寫入這兩個變量,供我們在腳本中使用。redis.call
用來執行 redis 現有命令,傳參跟 redis 命令行執行時傳入參數順序一致。
另外 redis lua 腳本中用到 lua table 的地方還比較多,這里要注意,lua 腳本中的 table 下標是從 1 開始的,比如KEYS
、ARGV
,這里跟其他語言不一樣,需要注意。
對于主要使用 PHP 這種弱類型語言開發同學來說,一定要注意變量的類型,不同類型比較的時候可能會出現類似attempt to compare string with number
的提示,這個時候使用 lua 的tonumber
將字符串轉換為數字在進行比較即可。比如我們使用GET
去獲取一個值,然后跟 0 比較大小,就需要將獲取出來的字符串轉換為數字。
在調試之前呢,我們先看看效果,將上面的代碼保存到 lua 文件中/path/to/buy.lua
,然后運行redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742984
即可執行腳本,執行之后返回-1
,因為我們未設置商品數量,set goodsSurplus 5
之后再次執行,效果如下:
➜ ~ redis-cli set goodsSurplus 5
OK
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742984
(integer) 1
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742984
(integer) 0
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742983
(integer) 1
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742982
(integer) 1
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742981
(integer) 1
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742980
(integer) -1
➜ ~ redis-cli --eval /path/to/buy.lua hadBuyUids goodsSurplus , 58247
(integer) -1
在命令行運行腳本的時候,腳本后面傳入的是參數,通過 ,
分隔為兩組,前面是鍵,后面是值,這兩組分別寫入KEYS
和ARGV
。分隔符一定要看清楚了,逗號前后都有空格,漏掉空格會讓腳本解析傳入參數異常。
debug 調試
上一小節,我們寫了很長一段 redis lua 腳本,怎么調試呢,有沒有像 GDB 那樣的調試工具呢,答案是肯定的。redis 從 v3.2.0 開始支持 lua debugger,可以加斷點、print 變量信息、展示正在執行的代碼......我們結合上一小節的腳本,來詳細說說 redis 中 lua 腳本的調試。
如何進入調試模式
執行redis-cli --ldb --eval /path/to/buy.lua hadBuyUids goodsSurplus , 5824742984
,進入調試模式,比之前執行的時候多了參數--ldb
,這個參數是開啟 lua dubegger 的意思,這個模式下 redis 會 fork 一個進程進入隔離環境,不會影響 redis 正常提供服務,但調試期間,原始 redis 執行命令、腳本的結果也不會體現到 fork 之后的隔離環境之中。因此呢,還有另外一種調試模式--ldb-sync-mode
,也就是前面提到的同步模式,這個模式下,會阻塞 redis 上所有的命令、腳本,直到腳本退出,完全模擬了正式環境使用時候的情況,使用的時候務必注意這點。
調試命令詳解
這一小節的內容是調試時候的詳細命令,可以粗略閱讀后跳過,等使用的時候再回來查詢
幫助信息
[h]elp
調試模式下,輸入h
或者help
展示調試模式下的全部可用指令。
流程相關
[s]tep 、 [n]ext 、 [c]continue
執行當前行代碼,并停留在下一行,如下所示
* Stopped at 4, stop reason = step over
-> 4 local buyMembersKey = tostring(KEYS[1])
lua debugger> n
* Stopped at 5, stop reason = step over
-> 5 local memberUid = tonumber(ARGV[1])
lua debugger> n
* Stopped at 6, stop reason = step over
-> 6 local goodsSurplusKey = tostring(KEYS[2])
lua debugger> s
* Stopped at 7, stop reason = step over
-> 7 local hasBuy = redis.call("sIsMember", buyMembersKey, memberUid)
continue
從當前行開始執行代碼直到結束或者碰到斷點。
展示相關
[l]list 、 [l]list [line] 、 [l]list [line] [ctx] 、 [w]hole
展示當前行附近的代碼,[line]
是重新指定中心行,[ctx]
是指定展示中心行周圍幾行代碼。[w]hole
是展示所有行代碼
打印相關
[p]rint 、 [p]rint var>
打印當前所有局部變量,var>
是打印指定變量,如下所示:
lua debugger> print
value> goodsSurplus = nil
value> flag = nil
value> buyMembersKey = "hadBuyUids"
value> memberUid = 58247
lua debugger> print buyMembersKey
value> "hadBuyUids"
斷點相關
[b]reak 、 [b]reak line> 、 [b]reak -line> 、 [b]reak 0
展示斷點、像指定行添加斷點、刪除指定行的斷點、刪除所有斷點
其他命令
[r]edis cmd> 、 [m]axlen [len] 、 [a]bort 、 [e]eval code> 、 [t]race
- 在調試其中執行 redis 命令
- 設置展示內容的最大長度,0表示不限制
- 退出調試模式,同步模式下(設置了參數--ldb-sync-mode)修改會保留。
- 執行一行 lua 代碼。
- 展示執行棧。
詳細說下[m]axlen [len]
命令,如下代碼:
local myTable = {}
local count = 0
while count 1000 do
myTable[count] = count
count = count + 1
end
return 1
在最后一行打印斷點,執行print
可以看到,輸出了一長串內容,我們執行maxlen 10
之后,再次執行print
可以看到打印的內容變少了,設置為maxlen 0
之后,再次執行可以看到所有的內容全部展示了。
詳細說下[t]race
命令,代碼如下:
local function func1(num)
num = num + 1
return num
end
local function func2(num)
num = func1(num)
num = num + 1
return num
end
func2(123)
執行b 2
在 func1 中打斷點,然后執行c
,斷點地方停頓,再次執行t
,可以到如下信息:
lua debugger> t
In func1:
->#3 return num
From func2:
7 num = func1(num)
From top level:
12 func2(123)
請求限流
至此,算是對 redis lua 腳本有了基本的認識,基本語法、調試也做了了解,接下來就實現一個請求限流器。流程和代碼如下:

--[[
傳入參數:
業務標識
ip
限制時間
限制時間內的訪問次數
]]--
local busIdentify = tostring(KEYS[1])
local ip = tostring(KEYS[2])
local expireSeconds = tonumber(ARGV[1])
local limitTimes = tonumber(ARGV[2])
local identify = busIdentify .. "_" .. ip
local times = redis.call("GET", identify)
--[[
獲取已經記錄的時間
獲取到繼續判斷是否超過限制
超過限制返回0
否則加1,返回1
]]--
if times ~= false then
times = tonumber(times)
if times >= limitTimes then
return 0
else
redis.call("INCR", identify)
return 1
end
end
-- 不存在的話,設置為1并設置過期時間
local flag = redis.call("SETEX", identify, expireSeconds, 1)
return 1
將上面的 lua 腳本保存到/path/to/limit.lua
,執行redis-cli --eval /path/to/limit.lua limit_vgroup 192.168.1.19 , 10 3
,表示 limit_vgroup 這個業務,192.168.1.1 這個 ip 每 10 秒鐘限制訪問三次。
好了,至此,一個請求限流功能就完成了,連續執行三次之后上面的程序會返回 0,過 10 秒鐘在執行,又可以返回 1,這樣便達到了限流的目的。
有同學可能會說了,這個請求限流功能還有值得優化的地方,如果連續的兩個計數周期,第一個周期的最后請求 3 次,接著馬上到第二個周期了,又可以請求了,這個地方如何優化呢,我們接著往下看。
請求限流優化
上面的計數器法簡單粗暴,但是存在臨界點的問題。為了解決這個問題,引入類似滑動窗口的概念,讓統計次數的周期是連續的,可以很好的解決臨界點的問題,滑動窗口原理如下圖所示:

建立一個 redis list 結構,其長度等價于訪問次數,每次請求時,判斷 list 結構長度是否超過限制次數,未超過的話,直接加到隊首返回成功,否則,判斷隊尾一條數據是否已經超過限制時間,未超過直接返回失敗,超過刪除隊尾元素,將此次請求時間插入隊首,返回成功。
local busIdentify = tostring(KEYS[1])
local ip = tostring(KEYS[2])
local expireSeconds = tonumber(ARGV[1])
local limitTimes = tonumber(ARGV[2])
-- 傳入額外參數,請求時間戳
local timestamp = tonumber(ARGV[3])
local lastTimestamp
local identify = busIdentify .. "_" .. ip
local times = redis.call("LLEN", identify)
if times limitTimes then
redis.call("RPUSH", identify, timestamp)
return 1
end
lastTimestamp = redis.call("LRANGE", identify, 0, 0)
lastTimestamp = tonumber(lastTimestamp[1])
if lastTimestamp + expireSeconds >= timestamp then
return 0
end
redis.call("LPOP", identify)
redis.call("RPUSH", identify, timestamp)
return 1
上面的 lua 腳本保存到/path/to/limit_fun.lua
,執行redis-cli --eval /path/to/limit_fun.lua limit_vgroup 192.168.1.19 , 10 3 1548660999
即可。
最開始,我想著把時間戳計算redis.call("TIME")
也放入 redis lua 腳本中,后來發現使用的時候 redis 會報錯,這是因為 redis 默認情況復制 lua 腳本到備機和持久化中,如果腳本是一個非純函數(pure function),備庫中執行的時候或者宕機恢復的時候可能產生不一致的情況,這里可以類比 mysql 中基于 SQL 語句的復制模式。redis 在 3.2 版本中加入了redis.replicate_commands
函數來解決這個問題,在腳本第一行執行這個函數,redis 會將修改數據的命令收集起來,然后用MULTI/EXEC
包裹起來,這種方式稱為script effects replication,這個類似于 mysql 中的基于行的復制模式,將非純函數的值計算出來,用來持久化和主從復制。我們這里將變動參數提到調用方這里,調用者傳入時間戳來解決這個問題。
另外,redis 從版本 5 開始,默認支持script effects replication,不需要在第一行調用開啟函數了。如果是耗時計算,這樣當然很好,同步、恢復的時候只需要計算一次后邊就不用計算了,但是如果是一個循環生成的數據,可能在同步的時候會浪費更多的帶寬,沒有腳本來的更直接,但這種情況應該比較少。
至此,腳本優化完成了,但我又想到一個問題,我們的環境是單機環境,如果是分布式環境的話,腳本怎么執行、何處理呢,接下來一節,我們來討論下這個問題。
集群環境中 lua 處理
redis 集群中,會將鍵分配的不同的槽位上,然后分配到對應的機器上,當操作的鍵為一個的時候,自然沒問題,但如果操作的鍵為多個的時候,集群如何知道這個操作落到那個機器呢?比如簡單的mget
命令,mget test1 test2 test3
,還有我們上面執行腳本時候傳入多個參數,帶著這個問題我們繼續。
首先用 docker 啟動一個 redis 集群,docker pull grokzen/redis-cluster
,拉取這個鏡像,然后執行docker run -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster-script -e "IP=0.0.0.0" grokzen/redis-cluster
啟動這個容器,這個容器啟動了一個 redis 集群,3 主 3 從。
我們從任意一個節點進入集群,比如redis-cli -c -p 7003
,進入后執行cluster nodes
可以看到集群的信息,我們鏈接的是從庫,執行set lua fun
,有同學可能會問了,從庫也可以執行寫嗎,沒問題的,集群會計算出 lua 這個鍵屬于哪個槽位,然后定向到對應的主庫。
執行mset lua fascinating redis powerful
,可以看到集群反回了錯誤信息,告訴我們本次請求的鍵沒有落到同一個槽位上
(error) CROSSSLOT Keys in request don't hash to the same slot
同樣,還是上面的 lua 腳本,我們加上集群端口號,執行redis-cli -p 7000 --eval /tmp/limit_fun.lua limit_vgroup 192.168.1.19 , 10 3 1548660999
,一樣返回上面的錯誤。
針對這個問題,redis官方為我們提供了hash tag
這個方法來解決,什么意思呢,我們取鍵中的一段來計算 hash,計算落入那個槽中,這樣同一個功能不同的 key 就可以落入同一個槽位了,hash tag 是通過{}
這對括號括起來的字符串,比如上面的,我們改為mset lua{yes} fascinating redis{yes} powerful
,就可以執行成功了,我這里 mset 這個操作落到了 7002 端口的機器。
同理,我們對傳入腳本的鍵名做 hash tag 處理就可以了,這里要注意不僅傳入鍵名要有相同的 hash tag,里面實際操作的 key 也要有相同的 hash tag,不然會報錯Lua script attempted to access a non local key in a cluster node
,什么意思呢,就拿我們上面的例子來說,執行的時候如下所示,可以看到 ,
前面的兩個鍵都加了 hash tag —— yes,這樣沒問題,因為腳本里面只是用了一個拼接的 key —— limit_vgroup{yes}_192.168.1.19{yes}
。
redis-cli -c -p 7000 --eval /tmp/limit_fun.lua limit_vgroup{yes} 192.168.1.19{yes} , 10 3 1548660999
如果我們在腳本里面加上redis.call("GET", "yesyes")
(別讓這個鍵跟我們拼接的鍵落在一個solt),可以看到就報了上面的錯誤,所以在執行腳本的時候,只要傳入參數鍵、腳本里面執行 redis 命令時候的鍵有相同的 hash tag 即可。
另外,這里有個 hash tag 規則:
鍵中包含{
字符;建中包含{
字符,并在{
字符右邊;并且{
,}
之間有至少一個字符,之間的字符就用來做鍵的 hash tag。
所以,鍵limit_vgroup{yes}_192.168.1.19{yes}
的 hash tag 是 yes
。foo{}{bar}
鍵的 hash tag就是它本身。foo{{bar}}
鍵的 hash tag 是 {bar
。
使用 golang 連接使用 redis
這里我們使用 golang 實例展示下,通過ForEachMaster
將 lua 腳本緩存到集群中的每個 node,并保存返回的 sha 值,以后通過 evalsha 去執行代碼。
package main
import (
"github.com/go-redis/redis"
"fmt"
)
func createScript() *redis.Script {
script := redis.NewScript(`
local busIdentify = tostring(KEYS[1])
local ip = tostring(KEYS[2])
local expireSeconds = tonumber(ARGV[1])
local limitTimes = tonumber(ARGV[2])
-- 傳入額外參數,請求時間戳
local timestamp = tonumber(ARGV[3])
local lastTimestamp
local identify = busIdentify .. "_" .. ip
local times = redis.call("LLEN", identify)
if times limitTimes then
redis.call("RPUSH", identify, timestamp)
return 1
end
lastTimestamp = redis.call("LRANGE", identify, 0, 0)
lastTimestamp = tonumber(lastTimestamp[1])
if lastTimestamp + expireSeconds >= timestamp then
return 0
end
redis.call("LPOP", identify)
redis.call("RPUSH", identify, timestamp)
return 1
`)
return script
}
func scriptCacheToCluster(c *redis.ClusterClient) string {
script := createScript()
var ret string
c.ForEachMaster(func(m *redis.Client) error {
if result, err := script.Load(m).Result(); err != nil {
panic("緩存腳本到主節點失敗")
} else {
ret = result
}
return nil
})
return ret
}
func main() {
redisdb := redis.NewClusterClient(redis.ClusterOptions{
Addrs: []string{
":7000",
":7001",
":7002",
":7003",
":7004",
":7005",
},
})
// 將腳本緩存到所有節點,執行一次拿到結果即可
sha := scriptCacheToCluster(redisdb)
// 執行緩存腳本
ret := redisdb.EvalSha(sha, []string{
"limit_vgroup{yes}",
"192.168.1.19{yes}",
}, 10, 3,1548660999)
if result, err := ret.Result(); err != nil {
fmt.Println("發生異常,返回值:", err.Error())
} else {
fmt.Println("返回值:", result)
}
// 示例錯誤情況,sha 值不存在
ret1 := redisdb.EvalSha(sha + "error", []string{
"limit_vgroup{yes}",
"192.168.1.19{yes}",
}, 10, 3,1548660999)
if result, err := ret1.Result(); err != nil {
fmt.Println("發生異常,返回值:", err.Error())
} else {
fmt.Println("返回值:", result)
}
}
執行上面的代碼,返回值如下:
返回值: 0
發生異常,返回值: NOSCRIPT No matching script. Please use EVAL.
好了,目前為止,相信你對 redis lua 腳本已經有了很好的了解,可以實現一些自己想要的功能了,感謝大家的閱讀。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:- 詳解利用redis + lua解決搶紅包高并發的問題
- 簡介Lua腳本與Redis數據庫的結合使用
- Redis執行Lua腳本的好處與示例代碼
- redis中如何使用lua腳本讓你的靈活性提高5個逼格詳解
- 利用Lua定制Redis命令的方法詳解
- Redis如何使用lua腳本實例教程
- Nginx利用Lua+Redis實現動態封禁IP的方法
- Redis和Lua使用過程中遇到的小問題
- 通過redis的腳本lua如何實現搶紅包功能