前言
laravel 的隊列服務(wù)對各種不同的后臺隊列服務(wù)提供了統(tǒng)一的 API。隊列允許你延遲執(zhí)行消耗時間的任務(wù),比如發(fā)送一封郵件。這樣可以有效的降低請求響應(yīng)的時間。
發(fā)現(xiàn)問題
在 Laravel 中使用 Redis 處理隊列任務(wù),框架提供的功能非常強大,但是最近遇到一個問題,就是發(fā)現(xiàn)一個任務(wù)被多次執(zhí)行,這是為什么呢?

先說原因:
因為在 Laravel 中如果一個隊列(任務(wù))執(zhí)行時間大于 60 秒,就會被認為執(zhí)行失敗并重新加入隊列中,這樣就會導(dǎo)致重復(fù)執(zhí)行同一個任務(wù)。
這個任務(wù)的邏輯就是給用戶推送內(nèi)容,需要根據(jù)隊列內(nèi)容取出用戶并遍歷,通過請求后端 HTTP 接口發(fā)送。比如有 10000 個用戶,在用戶數(shù)量多或接口處理速度沒那么快的情況下,執(zhí)行時間肯定會大于 60 秒,于是這個任務(wù)就被重新加入隊列。情況更糟糕一點,前面的任務(wù)如果都沒有在 60 秒執(zhí)行完,就都會重新加入隊列,這樣同一個任務(wù)就不止重復(fù)執(zhí)行一次了,而是多次。
下面從 Laravel 源代碼找一下罪魁禍首。
源代碼文件:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/**
* The expiration time of a job.
*
* @var int|null
*/
protected $expire = 60;
這個 $expire 成員變量是一個固定的值,Laravel 認為一個隊列再怎么 60 秒也該執(zhí)行完了吧。取隊列方法:
public function pop($queue = null)
{
$original = $queue ?: $this->default;
$queue = $this->getQueue($queue);
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->expire)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
list($job, $reserved) = $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
);
if ($reserved) {
return new RedisJob($this->container, $this, $job, $reserved, $original);
}
}
取隊列有幾步操作,因為隊列執(zhí)行失敗,或執(zhí)行超時等都會放入另外的集合保存起來,以便重試,過程如下:
1.把因執(zhí)行失敗的隊列從 delayed 集合重新 rpush 到當前執(zhí)行的隊列中。
2.把因執(zhí)行超時的隊列從 reserved 集合重新 rpush 到當前執(zhí)行的隊列中。
3.然后才是從隊列中取任務(wù)開始執(zhí)行,同時把隊列放入 reserved 的有序集合。
這里使用了 eval 命令執(zhí)行這個過程,用到了幾個 lua 腳本。
從要執(zhí)行的隊列中取任務(wù):
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
可以看到 Laravel 在取 Redis 要執(zhí)行的隊列的時候,同時會放一份到一個有序集合中,并使用過期時間戳作為分值。
只有當這個任務(wù)完成后,再把有序集合中這個任務(wù)移除。從這個有序集合移除隊列的代碼就省略,我們看一下 Laravel 如何處理執(zhí)行時間大于 60 秒的隊列。
也就是這段 lua 腳本執(zhí)行的操作:
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return true
這里 zrangebyscore 找出分值從無限小到當前時間戳的元素,也就是 60 秒之前加入到集合的任務(wù),然后通過 zremrangebyrank 從集合移除這些元素并 rpush 到隊列中。
看到這里應(yīng)該就恍然大悟了。
如果一個隊列 60 秒沒執(zhí)行完,那么進程在取隊列的時候從 reserved 集合中把這些任務(wù)又重新 rpush 到隊列中。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
您可能感興趣的文章:- Laravel學(xué)習(xí)教程之從入口到輸出過程詳解
- Laravel框架源碼解析之反射的使用詳解
- Laravel源碼解析之路由的使用和示例詳解
- 通過源碼解析Laravel的依賴注入
- Laravel框架數(shù)據(jù)庫CURD操作、連貫操作總結(jié)
- PHP開發(fā)框架Laravel數(shù)據(jù)庫操作方法總結(jié)
- Laravel框架表單驗證詳解
- Laravel框架中擴展函數(shù)、擴展自定義類的方法
- Laravel框架路由配置總結(jié)、設(shè)置技巧大全
- PHP的Laravel框架中使用消息隊列queue及異步隊列的方法
- PHP框架Laravel插件Pagination實現(xiàn)自定義分頁
- Laravel 5框架學(xué)習(xí)之向視圖傳送數(shù)據(jù)
- Laravel框架源碼解析之入口文件原理分析