mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 08:27:43 +08:00
fix(priority): consider paused state when calling getCountsPerPriority (#2748)
This commit is contained in:
@@ -1,25 +0,0 @@
|
||||
--[[
|
||||
Get counts per provided states
|
||||
|
||||
Input:
|
||||
KEYS[1] wait key
|
||||
KEYS[2] priority key
|
||||
|
||||
ARGV[1...] priorities
|
||||
]]
|
||||
local rcall = redis.call
|
||||
local results = {}
|
||||
local waitKey = KEYS[1]
|
||||
local prioritizedKey = KEYS[2]
|
||||
|
||||
for i = 1, #ARGV do
|
||||
local priority = tonumber(ARGV[i])
|
||||
if priority == 0 then
|
||||
results[#results+1] = rcall("LLEN", waitKey) - rcall("ZCARD", prioritizedKey)
|
||||
else
|
||||
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
|
||||
priority, priority)
|
||||
end
|
||||
end
|
||||
|
||||
return results
|
||||
@@ -0,0 +1,37 @@
|
||||
--[[
|
||||
Get counts per provided states
|
||||
|
||||
Input:
|
||||
KEYS[1] wait key
|
||||
KEYS[2] paused key
|
||||
KEYS[3] meta-paused key
|
||||
KEYS[4] priority key
|
||||
|
||||
ARGV[1...] priorities
|
||||
]]
|
||||
local rcall = redis.call
|
||||
local results = {}
|
||||
local prioritizedKey = KEYS[4]
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/getTargetQueueList"
|
||||
|
||||
for i = 1, #ARGV do
|
||||
local priority = tonumber(ARGV[i])
|
||||
if priority == 0 then
|
||||
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
|
||||
local count = rcall("LLEN", target) - rcall("ZCARD", prioritizedKey)
|
||||
if count < 0 then
|
||||
-- considering when last waiting job is moved to active before
|
||||
-- removing priority reference
|
||||
results[#results+1] = 0
|
||||
else
|
||||
results[#results+1] = count
|
||||
end
|
||||
else
|
||||
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
|
||||
priority, priority)
|
||||
end
|
||||
end
|
||||
|
||||
return results
|
||||
+6
-1
@@ -84,7 +84,12 @@ const scripts = {
|
||||
},
|
||||
|
||||
getCountsPerPriorityArgs(queue, priorities) {
|
||||
const keys = [queue.keys.wait, queue.keys.priority];
|
||||
const keys = [
|
||||
queue.keys.wait,
|
||||
queue.keys.paused,
|
||||
queue.keys['meta-paused'],
|
||||
queue.keys.priority
|
||||
];
|
||||
|
||||
const args = priorities;
|
||||
|
||||
|
||||
@@ -192,6 +192,28 @@ describe('Jobs getters', function() {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('when queue is paused', () => {
|
||||
it('returns job counts per priority', async () => {
|
||||
await queue.pause();
|
||||
const jobsArray = Array.from(Array(42).keys()).map(index => ({
|
||||
name: 'test',
|
||||
data: {},
|
||||
opts: {
|
||||
priority: index % 4
|
||||
}
|
||||
}));
|
||||
await queue.addBulk(jobsArray);
|
||||
const counts = await queue.getCountsPerPriority([0, 1, 2, 3]);
|
||||
|
||||
expect(counts).to.be.eql({
|
||||
'0': 11,
|
||||
'1': 11,
|
||||
'2': 10,
|
||||
'3': 10
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('fails jobs that exceed their specified timeout', done => {
|
||||
|
||||
Reference in New Issue
Block a user