mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 08:27:43 +08:00
fix: increment jobCounter only when a job is processed. fixes #1875.
Do not count when it is rate limited (#1879)
This commit is contained in:
@@ -45,11 +45,14 @@ local rateLimit = function(jobId, maxJobs)
|
||||
rateLimiterKey = rateLimiterKey .. ":" .. group
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
-- -- key for storing rate limited jobs
|
||||
-- When a job has been previously rate limited it should be part of this set
|
||||
-- if the job is back here means that the delay time for this job has passed and now we should
|
||||
-- be able to process it again.
|
||||
local limitedSetKey = rateLimiterKey .. ":limited"
|
||||
local delay = 0
|
||||
|
||||
|
||||
-- -- Check if job was already limited
|
||||
local isLimited = rcall("SISMEMBER", limitedSetKey, jobId);
|
||||
|
||||
@@ -57,27 +60,30 @@ local rateLimit = function(jobId, maxJobs)
|
||||
-- Remove from limited zset since we are going to try to process it
|
||||
rcall("SREM", limitedSetKey, jobId)
|
||||
rcall("HDEL", limiterIndexTable, jobId)
|
||||
else
|
||||
-- If not, check if there are any limited jobs
|
||||
local numLimitedJobs = rcall("SCARD", limitedSetKey)
|
||||
else
|
||||
-- If not, check if there are any limited jobs
|
||||
-- If the job has not been rate limited, we should check if there are any other rate limited jobs, because if that
|
||||
-- is the case we do not want to process this job, just calculate a delay for it and put it to "sleep".
|
||||
local numLimitedJobs = rcall("SCARD", limitedSetKey)
|
||||
|
||||
if numLimitedJobs > 0 then
|
||||
-- Note, add some slack to compensate for drift.
|
||||
delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
|
||||
end
|
||||
if numLimitedJobs > 0 then
|
||||
-- Note, add some slack to compensate for drift.
|
||||
delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
|
||||
end
|
||||
end
|
||||
|
||||
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
|
||||
|
||||
local jobCounter = tonumber(rcall("GET", rateLimiterKey))
|
||||
if(jobCounter == nil) then
|
||||
jobCounter = 0
|
||||
end
|
||||
-- check if rate limit hit
|
||||
if (delay == 0) and (jobCounter > maxJobs) then
|
||||
if (delay == 0) and (jobCounter >= maxJobs) then
|
||||
-- Seems like there are no current rated limited jobs, but the jobCounter has exceeded the number of jobs for this unit of time so we need to rate limit this job.
|
||||
local exceedingJobs = jobCounter - maxJobs
|
||||
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs - 1) * ARGV[7]) / maxJobs
|
||||
elseif jobCounter == 1 then
|
||||
rcall("PEXPIRE", rateLimiterKey, ARGV[7])
|
||||
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * ARGV[7]) / maxJobs
|
||||
end
|
||||
|
||||
if delay > 0 then
|
||||
if delay > 0 then
|
||||
local bounceBack = ARGV[8]
|
||||
if bounceBack == 'false' then
|
||||
local timestamp = delay + tonumber(ARGV[4])
|
||||
@@ -85,7 +91,7 @@ local rateLimit = function(jobId, maxJobs)
|
||||
rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
|
||||
rcall("PUBLISH", KEYS[7], timestamp)
|
||||
rcall("SADD", limitedSetKey, jobId)
|
||||
|
||||
|
||||
-- store index so that we can delete rate limited data
|
||||
rcall("HSET", limiterIndexTable, jobId, limitedSetKey)
|
||||
|
||||
@@ -96,6 +102,12 @@ local rateLimit = function(jobId, maxJobs)
|
||||
return true
|
||||
else
|
||||
-- false indicates not rate limited
|
||||
-- increment jobCounter only when a job is not rate limited
|
||||
if (jobCounter == 0) then
|
||||
rcall("PSETEX", rateLimiterKey, ARGV[7], 1)
|
||||
else
|
||||
rcall("INCR", rateLimiterKey)
|
||||
end
|
||||
return false
|
||||
end
|
||||
end
|
||||
@@ -115,7 +127,7 @@ if jobId then
|
||||
local maxJobs = tonumber(ARGV[6])
|
||||
|
||||
if maxJobs then
|
||||
if rateLimit(jobId, maxJobs) then
|
||||
if rateLimit(jobId, maxJobs) then
|
||||
return
|
||||
end
|
||||
end
|
||||
@@ -126,7 +138,7 @@ if jobId then
|
||||
rcall("SET", lockKey, ARGV[2], "PX", ARGV[3])
|
||||
|
||||
-- remove from priority
|
||||
rcall("ZREM", KEYS[3], jobId)
|
||||
rcall("ZREM", KEYS[3], jobId)
|
||||
rcall("PUBLISH", KEYS[4], jobId)
|
||||
rcall("HSET", jobKey, "processedOn", ARGV[4])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user