feat(queue): add getCountsPerPriority method (#2746)

This commit is contained in:
Rogger Valverde
2024-06-11 19:24:48 -06:00
committed by GitHub
parent 66ffd9d660
commit 0376dcc128
7 changed files with 125 additions and 35 deletions
Vendored
+7
View File
@@ -750,6 +750,13 @@ declare namespace Bull {
*/
close(doNotWaitJobs?: boolean): Promise<void>;
/**
* Returns the number of jobs per priority.
*/
getCountsPerPriority(priorities: number[]): Promise<{
[index: string]: number;
}>;
/**
* Returns a promise that will return the job instance associated with the jobId parameter.
* If the specified job cannot be located, the promise callback parameter will be set to null.
+25
View File
@@ -0,0 +1,25 @@
--[[
Get counts per provided states
Input:
KEYS[1] wait key
KEYS[2] priority key
ARGV[1...] priorities
]]
local rcall = redis.call
local results = {}
local waitKey = KEYS[1]
local prioritizedKey = KEYS[2]
for i = 1, #ARGV do
local priority = tonumber(ARGV[i])
if priority == 0 then
results[#results+1] = rcall("LLEN", waitKey) - rcall("ZCARD", prioritizedKey)
else
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
priority, priority)
end
end
return results
+16
View File
@@ -2,6 +2,7 @@
const _ = require('lodash');
const Job = require('./job');
const scripts = require('./scripts');
module.exports = function(Queue) {
Queue.prototype.getJob = async function(jobId) {
@@ -9,6 +10,21 @@ module.exports = function(Queue) {
return Job.fromId(this, jobId);
};
Queue.prototype.getCountsPerPriority = async function(priorities) {
const uniquePriorities = [...new Set(priorities)];
const responses = await scripts.getCountsPerPriority(
this,
uniquePriorities
);
const counts = {};
responses.forEach((res, index) => {
counts[`${uniquePriorities[index]}`] = res || 0;
});
return counts;
};
Queue.prototype._commandByType = function(types, count, callback) {
return _.map(types, type => {
type = type === 'waiting' ? 'wait' : type; // alias
+21 -2
View File
@@ -83,6 +83,21 @@ const scripts = {
return result;
},
getCountsPerPriorityArgs(queue, priorities) {
const keys = [queue.keys.wait, queue.keys.priority];
const args = priorities;
return keys.concat(args);
},
async getCountsPerPriority(queue, priorities) {
const client = await queue.client;
const args = this.getCountsPerPriorityArgs(queue, priorities);
return client.getCountsPerPriority(args);
},
moveToActive(queue, jobId) {
const queueKeys = queue.keys;
const keys = [queueKeys.wait, queueKeys.active, queueKeys.priority];
@@ -253,9 +268,13 @@ const scripts = {
case -2:
return new Error('Missing lock for job ' + jobId + ' ' + command);
case -3:
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`
);
case -6:
return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`
);
}
},
+23
View File
@@ -171,6 +171,29 @@ describe('Jobs getters', function() {
queue.add({ baz: 'qux' });
});
describe('.getCountsPerPriority', () => {
it('returns job counts per priority', done => {
const jobsArray = Array.from(Array(42).keys()).map(index => ({
name: 'test',
data: {},
opts: {
priority: index % 4
}
}));
queue.addBulk(jobsArray).then(() => {
queue.getCountsPerPriority([0, 1, 2, 3]).then(counts => {
expect(counts).to.be.eql({
'0': 11,
'1': 11,
'2': 10,
'3': 10
});
done();
});
});
});
});
it('fails jobs that exceed their specified timeout', done => {
queue.process((job, jobDone) => {
setTimeout(jobDone, 200);
+32 -32
View File
@@ -759,35 +759,35 @@ describe('Job', () => {
it('applies stacktrace limit on failure', () => {
const stackTraceLimit = 1;
return Job.create(queue, { foo: 'bar' }, { stackTraceLimit, attempts: 2 }).then(
job => {
return job
.isFailed()
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job
.moveToFailed(new Error('test error'), true)
.then(() => {
return job.isFailed().then(isFailed => {
expect(isFailed).to.be(true);
expect(job.stacktrace).not.be(null);
expect(job.stacktrace.length).to.be(stackTraceLimit);
});
});
return Job.create(
queue,
{ foo: 'bar' },
{ stackTraceLimit, attempts: 2 }
).then(job => {
return job
.isFailed()
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true).then(() => {
return job.isFailed().then(isFailed => {
expect(isFailed).to.be(true);
expect(job.stacktrace).not.be(null);
expect(job.stacktrace.length).to.be(stackTraceLimit);
});
});
}
);
});
});
});
});
@@ -914,8 +914,8 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('completed');
return client.zrem(queue.toKey('completed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
return client.zrem(queue.toKey('completed'), job.id).then(() => {
return client.lpush(queue.toKey('active'), job.id);
});
})
.then(() => {
@@ -930,8 +930,8 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('delayed');
return client.zrem(queue.toKey('delayed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
return client.zrem(queue.toKey('delayed'), job.id).then(() => {
return client.lpush(queue.toKey('active'), job.id);
});
})
.then(() => {
+1 -1
View File
@@ -1771,7 +1771,7 @@ describe('Queue', () => {
});
queue2
.add({ foo: 'bar' }, {removeOnFail: true})
.add({ foo: 'bar' }, { removeOnFail: true })
.then(job => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');