fix(priority): consider paused state when calling getCountsPerPriority (#2748)

This commit is contained in:
Rogger Valverde
2024-06-20 19:38:29 -06:00
committed by GitHub
parent 79113ac6f3
commit 6c2719a929
4 changed files with 65 additions and 26 deletions
-25
View File
@@ -1,25 +0,0 @@
--[[
Get counts per provided states
Input:
KEYS[1] wait key
KEYS[2] priority key
ARGV[1...] priorities
]]
local rcall = redis.call
local results = {}
local waitKey = KEYS[1]
local prioritizedKey = KEYS[2]
for i = 1, #ARGV do
local priority = tonumber(ARGV[i])
if priority == 0 then
results[#results+1] = rcall("LLEN", waitKey) - rcall("ZCARD", prioritizedKey)
else
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
priority, priority)
end
end
return results
+37
View File
@@ -0,0 +1,37 @@
--[[
Get counts per provided states
Input:
KEYS[1] wait key
KEYS[2] paused key
KEYS[3] meta-paused key
KEYS[4] priority key
ARGV[1...] priorities
]]
local rcall = redis.call
local results = {}
local prioritizedKey = KEYS[4]
-- Includes
--- @include "includes/getTargetQueueList"
for i = 1, #ARGV do
local priority = tonumber(ARGV[i])
if priority == 0 then
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
local count = rcall("LLEN", target) - rcall("ZCARD", prioritizedKey)
if count < 0 then
-- considering when last waiting job is moved to active before
-- removing priority reference
results[#results+1] = 0
else
results[#results+1] = count
end
else
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
priority, priority)
end
end
return results
+6 -1
View File
@@ -84,7 +84,12 @@ const scripts = {
},
getCountsPerPriorityArgs(queue, priorities) {
const keys = [queue.keys.wait, queue.keys.priority];
const keys = [
queue.keys.wait,
queue.keys.paused,
queue.keys['meta-paused'],
queue.keys.priority
];
const args = priorities;
+22
View File
@@ -192,6 +192,28 @@ describe('Jobs getters', function() {
});
});
});
describe('when queue is paused', () => {
it('returns job counts per priority', async () => {
await queue.pause();
const jobsArray = Array.from(Array(42).keys()).map(index => ({
name: 'test',
data: {},
opts: {
priority: index % 4
}
}));
await queue.addBulk(jobsArray);
const counts = await queue.getCountsPerPriority([0, 1, 2, 3]);
expect(counts).to.be.eql({
'0': 11,
'1': 11,
'2': 10,
'3': 10
});
});
});
});
it('fails jobs that exceed their specified timeout', done => {