From 0376dcc128d9af13ecbd658d8ea7ff19fce56915 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 11 Jun 2024 19:24:48 -0600 Subject: [PATCH] feat(queue): add getCountsPerPriority method (#2746) --- index.d.ts | 7 +++ lib/commands/getCountsPerPriority-2.lua | 25 ++++++++++ lib/getters.js | 16 +++++++ lib/scripts.js | 23 ++++++++- test/test_getters.js | 23 +++++++++ test/test_job.js | 64 ++++++++++++------------- test/test_queue.js | 2 +- 7 files changed, 125 insertions(+), 35 deletions(-) create mode 100644 lib/commands/getCountsPerPriority-2.lua diff --git a/index.d.ts b/index.d.ts index 07e3f14..f183ac7 100644 --- a/index.d.ts +++ b/index.d.ts @@ -750,6 +750,13 @@ declare namespace Bull { */ close(doNotWaitJobs?: boolean): Promise; + /** + * Returns the number of jobs per priority. + */ + getCountsPerPriority(priorities: number[]): Promise<{ + [index: string]: number; + }>; + /** * Returns a promise that will return the job instance associated with the jobId parameter. * If the specified job cannot be located, the promise callback parameter will be set to null. diff --git a/lib/commands/getCountsPerPriority-2.lua b/lib/commands/getCountsPerPriority-2.lua new file mode 100644 index 0000000..32e7dd8 --- /dev/null +++ b/lib/commands/getCountsPerPriority-2.lua @@ -0,0 +1,25 @@ +--[[ + Get counts per provided states + + Input: + KEYS[1] wait key + KEYS[2] priority key + + ARGV[1...] priorities +]] +local rcall = redis.call +local results = {} +local waitKey = KEYS[1] +local prioritizedKey = KEYS[2] + +for i = 1, #ARGV do + local priority = tonumber(ARGV[i]) + if priority == 0 then + results[#results+1] = rcall("LLEN", waitKey) - rcall("ZCARD", prioritizedKey) + else + results[#results+1] = rcall("ZCOUNT", prioritizedKey, + priority, priority) + end +end + +return results diff --git a/lib/getters.js b/lib/getters.js index a23ae9f..af381de 100644 --- a/lib/getters.js +++ b/lib/getters.js @@ -2,6 +2,7 @@ const _ = require('lodash'); const Job = require('./job'); +const scripts = require('./scripts'); module.exports = function(Queue) { Queue.prototype.getJob = async function(jobId) { @@ -9,6 +10,21 @@ module.exports = function(Queue) { return Job.fromId(this, jobId); }; + Queue.prototype.getCountsPerPriority = async function(priorities) { + const uniquePriorities = [...new Set(priorities)]; + const responses = await scripts.getCountsPerPriority( + this, + uniquePriorities + ); + + const counts = {}; + responses.forEach((res, index) => { + counts[`${uniquePriorities[index]}`] = res || 0; + }); + + return counts; + }; + Queue.prototype._commandByType = function(types, count, callback) { return _.map(types, type => { type = type === 'waiting' ? 'wait' : type; // alias diff --git a/lib/scripts.js b/lib/scripts.js index 3436626..3b5172d 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -83,6 +83,21 @@ const scripts = { return result; }, + getCountsPerPriorityArgs(queue, priorities) { + const keys = [queue.keys.wait, queue.keys.priority]; + + const args = priorities; + + return keys.concat(args); + }, + + async getCountsPerPriority(queue, priorities) { + const client = await queue.client; + const args = this.getCountsPerPriorityArgs(queue, priorities); + + return client.getCountsPerPriority(args); + }, + moveToActive(queue, jobId) { const queueKeys = queue.keys; const keys = [queueKeys.wait, queueKeys.active, queueKeys.priority]; @@ -253,9 +268,13 @@ const scripts = { case -2: return new Error('Missing lock for job ' + jobId + ' ' + command); case -3: - return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); + return new Error( + `Job ${jobId} is not in the ${state} state. ${command}` + ); case -6: - return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`); + return new Error( + `Lock mismatch for job ${jobId}. Cmd ${command} from ${state}` + ); } }, diff --git a/test/test_getters.js b/test/test_getters.js index 43db18b..acfde9c 100644 --- a/test/test_getters.js +++ b/test/test_getters.js @@ -171,6 +171,29 @@ describe('Jobs getters', function() { queue.add({ baz: 'qux' }); }); + describe('.getCountsPerPriority', () => { + it('returns job counts per priority', done => { + const jobsArray = Array.from(Array(42).keys()).map(index => ({ + name: 'test', + data: {}, + opts: { + priority: index % 4 + } + })); + queue.addBulk(jobsArray).then(() => { + queue.getCountsPerPriority([0, 1, 2, 3]).then(counts => { + expect(counts).to.be.eql({ + '0': 11, + '1': 11, + '2': 10, + '3': 10 + }); + done(); + }); + }); + }); + }); + it('fails jobs that exceed their specified timeout', done => { queue.process((job, jobDone) => { setTimeout(jobDone, 200); diff --git a/test/test_job.js b/test/test_job.js index 88131f3..b8bdc9f 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -759,35 +759,35 @@ describe('Job', () => { it('applies stacktrace limit on failure', () => { const stackTraceLimit = 1; - return Job.create(queue, { foo: 'bar' }, { stackTraceLimit, attempts: 2 }).then( - job => { - return job - .isFailed() - .then(isFailed => { - expect(isFailed).to.be(false); - }) - .then(() => { - return scripts.moveToActive(queue); - }) - .then(() => { - return job.moveToFailed(new Error('test error'), true); - }) - .then(() => { - return scripts.moveToActive(queue); - }) - .then(() => { - return job - .moveToFailed(new Error('test error'), true) - .then(() => { - return job.isFailed().then(isFailed => { - expect(isFailed).to.be(true); - expect(job.stacktrace).not.be(null); - expect(job.stacktrace.length).to.be(stackTraceLimit); - }); - }); + return Job.create( + queue, + { foo: 'bar' }, + { stackTraceLimit, attempts: 2 } + ).then(job => { + return job + .isFailed() + .then(isFailed => { + expect(isFailed).to.be(false); + }) + .then(() => { + return scripts.moveToActive(queue); + }) + .then(() => { + return job.moveToFailed(new Error('test error'), true); + }) + .then(() => { + return scripts.moveToActive(queue); + }) + .then(() => { + return job.moveToFailed(new Error('test error'), true).then(() => { + return job.isFailed().then(isFailed => { + expect(isFailed).to.be(true); + expect(job.stacktrace).not.be(null); + expect(job.stacktrace.length).to.be(stackTraceLimit); + }); }); - } - ); + }); + }); }); }); @@ -914,8 +914,8 @@ describe('Job', () => { }) .then(state => { expect(state).to.be('completed'); - return client.zrem(queue.toKey('completed'), job.id).then(()=>{ - return client.lpush(queue.toKey('active'), job.id) + return client.zrem(queue.toKey('completed'), job.id).then(() => { + return client.lpush(queue.toKey('active'), job.id); }); }) .then(() => { @@ -930,8 +930,8 @@ describe('Job', () => { }) .then(state => { expect(state).to.be('delayed'); - return client.zrem(queue.toKey('delayed'), job.id).then(()=>{ - return client.lpush(queue.toKey('active'), job.id) + return client.zrem(queue.toKey('delayed'), job.id).then(() => { + return client.lpush(queue.toKey('active'), job.id); }); }) .then(() => { diff --git a/test/test_queue.js b/test/test_queue.js index 1a2031c..7b8c663 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1771,7 +1771,7 @@ describe('Queue', () => { }); queue2 - .add({ foo: 'bar' }, {removeOnFail: true}) + .add({ foo: 'bar' }, { removeOnFail: true }) .then(job => { expect(job.id).to.be.ok; expect(job.data.foo).to.be.eql('bar');