mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 16:37:42 +08:00
feat(job): support debouncing (#2760)
This commit is contained in:
@@ -33,6 +33,9 @@
|
||||
ARGV[9] priority
|
||||
ARGV[10] LIFO
|
||||
ARGV[11] token
|
||||
ARGV[12] debounce key
|
||||
ARGV[13] debounceId
|
||||
ARGV[14] debounceTtl
|
||||
]]
|
||||
local jobId
|
||||
local jobIdKey
|
||||
@@ -40,6 +43,7 @@ local rcall = redis.call
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/addJobWithPriority"
|
||||
--- @include "includes/debounceJob"
|
||||
--- @include "includes/getTargetQueueList"
|
||||
|
||||
local jobCounter = rcall("INCR", KEYS[4])
|
||||
@@ -56,10 +60,28 @@ else
|
||||
end
|
||||
end
|
||||
|
||||
local debounceKey = ARGV[12]
|
||||
|
||||
local opts = cmsgpack.unpack(ARGV[5])
|
||||
|
||||
-- Store the job.
|
||||
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
|
||||
local debouncedJobId = debounceJob(ARGV[1], ARGV[13], ARGV[14],
|
||||
jobId, debounceKey, ARGV[11])
|
||||
if debouncedJobId then
|
||||
return debouncedJobId
|
||||
end
|
||||
|
||||
local debounceId = ARGV[13]
|
||||
|
||||
local optionalValues = {}
|
||||
|
||||
if debounceId ~= "" then
|
||||
table.insert(optionalValues, "deid")
|
||||
table.insert(optionalValues, debounceId)
|
||||
end
|
||||
|
||||
-- Store the job.
|
||||
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp",
|
||||
ARGV[6], "delay", ARGV[7], "priority", ARGV[9], unpack(optionalValues))
|
||||
|
||||
-- Check if job is delayed
|
||||
local delayedTimestamp = tonumber(ARGV[8])
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
KEYS[2] priority key
|
||||
KEYS[3] rate limiter key
|
||||
|
||||
ARGV[1] jobId
|
||||
ARGV[1] prefix key
|
||||
ARGV[2] maxTimestamp
|
||||
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
|
||||
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
|
||||
@@ -16,7 +16,7 @@ local setKey = KEYS[1]
|
||||
local priorityKey = KEYS[2]
|
||||
local rateLimiterKey = KEYS[3]
|
||||
|
||||
local jobKeyPrefix = ARGV[1]
|
||||
local prefixKey = ARGV[1]
|
||||
local maxTimestamp = ARGV[2]
|
||||
local limitStr = ARGV[3]
|
||||
local setName = ARGV[4]
|
||||
@@ -24,6 +24,9 @@ local setName = ARGV[4]
|
||||
local isList = false
|
||||
local rcall = redis.call
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/removeDebounceKey"
|
||||
|
||||
if setName == "wait" or setName == "active" or setName == "paused" then
|
||||
isList = true
|
||||
end
|
||||
@@ -75,7 +78,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
|
||||
break
|
||||
end
|
||||
|
||||
local jobKey = jobKeyPrefix .. jobId
|
||||
local jobKey = prefixKey .. jobId
|
||||
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
|
||||
-- Find the right timestamp of the job to compare to maxTimestamp:
|
||||
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
|
||||
@@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
|
||||
rcall("ZREM", setKey, jobId)
|
||||
end
|
||||
rcall("ZREM", priorityKey, jobId)
|
||||
|
||||
if setName ~= "completed" and setName ~= "failed" then
|
||||
removeDebounceKey(prefixKey, jobKey)
|
||||
end
|
||||
|
||||
rcall("DEL", jobKey)
|
||||
rcall("DEL", jobKey .. ":logs")
|
||||
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
--[[
|
||||
Function to debounce a job.
|
||||
]]
|
||||
|
||||
local function debounceJob(prefixKey, debounceId, ttl, jobId, debounceKey, token)
|
||||
if debounceId ~= "" then
|
||||
local debounceKeyExists
|
||||
if ttl ~= "" then
|
||||
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
|
||||
else
|
||||
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
|
||||
end
|
||||
if debounceKeyExists then
|
||||
local currentDebounceJobId = rcall('GET', debounceKey)
|
||||
rcall("PUBLISH", prefixKey .. "debounced@" .. token, currentDebounceJobId)
|
||||
|
||||
return currentDebounceJobId
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,12 @@
|
||||
|
||||
--[[
|
||||
Function to remove debounce key.
|
||||
]]
|
||||
|
||||
local function removeDebounceKey(prefixKey, jobKey)
|
||||
local debounceId = rcall("HGET", jobKey, "deid")
|
||||
if debounceId then
|
||||
local debounceKey = prefixKey .. "de:" .. debounceId
|
||||
rcall("DEL", debounceKey)
|
||||
end
|
||||
end
|
||||
@@ -0,0 +1,14 @@
|
||||
--[[
|
||||
Function to remove debounce key if needed.
|
||||
]]
|
||||
|
||||
local function removeDebounceKeyIfNeeded(prefixKey, debounceId)
|
||||
if debounceId then
|
||||
local debounceKey = prefixKey .. "de:" .. debounceId
|
||||
local pttl = rcall("PTTL", debounceKey)
|
||||
|
||||
if pttl == 0 or pttl == -1 then
|
||||
rcall("DEL", debounceKey)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -25,6 +25,7 @@ local rcall = redis.call
|
||||
-- Includes
|
||||
--- @include "includes/batches"
|
||||
--- @include "includes/getTargetQueueList"
|
||||
--- @include "includes/removeDebounceKeyIfNeeded"
|
||||
|
||||
local function removeJob(jobId, baseKey)
|
||||
local jobKey = baseKey .. jobId
|
||||
@@ -78,12 +79,13 @@ if(#stalling > 0) then
|
||||
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
|
||||
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
|
||||
if(stalledCount > MAX_STALLED_JOB_COUNT) then
|
||||
local rawOpts = rcall("HGET", jobKey, "opts")
|
||||
local opts = cjson.decode(rawOpts)
|
||||
local jobAttributes = rcall("HMGET", jobKey, "opts", "deid")
|
||||
local opts = cjson.decode(jobAttributes[1])
|
||||
local removeOnFailType = type(opts["removeOnFail"])
|
||||
rcall("ZADD", KEYS[4], ARGV[3], jobId)
|
||||
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
|
||||
"finishedOn", ARGV[3])
|
||||
removeDebounceKeyIfNeeded(ARGV[2], jobAttributes[2])
|
||||
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}')
|
||||
|
||||
if removeOnFailType == "number" then
|
||||
|
||||
@@ -84,6 +84,7 @@ end
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/removeLock"
|
||||
--- @include "includes/removeDebounceKeyIfNeeded"
|
||||
|
||||
if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
|
||||
local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1])
|
||||
@@ -96,6 +97,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
|
||||
|
||||
if numRemovedElements < 1 then return -3 end
|
||||
|
||||
local debounceId = rcall("HGET", KEYS[3], "deid")
|
||||
removeDebounceKeyIfNeeded(ARGV[9], debounceId)
|
||||
|
||||
-- Remove job?
|
||||
local keepJobs = cmsgpack.unpack(ARGV[6])
|
||||
local maxCount = keepJobs['count']
|
||||
|
||||
@@ -18,6 +18,10 @@ local maxCount = tonumber(ARGV[1])
|
||||
local baseKey = KEYS[2]
|
||||
|
||||
local rcall = redis.call
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/removeDebounceKey"
|
||||
|
||||
local function getListItems(keyName, max)
|
||||
return rcall('LRANGE', keyName, 0, max - 1)
|
||||
end
|
||||
@@ -26,23 +30,24 @@ local function getZSetItems(keyName, max)
|
||||
return rcall('ZRANGE', keyName, 0, max - 1)
|
||||
end
|
||||
|
||||
local function removeJobs(parentKey, keys)
|
||||
local function removeJobs(baseKey, keys)
|
||||
for i, key in ipairs(keys) do
|
||||
rcall("DEL", baseKey .. key)
|
||||
rcall("DEL", baseKey .. key .. ':logs')
|
||||
local jobKey = baseKey .. key
|
||||
rcall("DEL", jobKey, jobKey .. ':logs')
|
||||
removeDebounceKey(baseKey, jobKey)
|
||||
end
|
||||
maxCount = maxCount - #keys
|
||||
end
|
||||
|
||||
local function removeListJobs(keyName, max)
|
||||
local jobs = getListItems(keyName, max)
|
||||
removeJobs(keyName, jobs)
|
||||
removeJobs(baseKey, jobs)
|
||||
rcall("LTRIM", keyName, #jobs, -1)
|
||||
end
|
||||
|
||||
local function removeZSetJobs(keyName, max)
|
||||
local jobs = getZSetItems(keyName, max)
|
||||
removeJobs(keyName, jobs)
|
||||
removeJobs(baseKey, jobs)
|
||||
if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end
|
||||
end
|
||||
|
||||
@@ -65,7 +70,7 @@ if (#activeJobs > 0) then
|
||||
end
|
||||
|
||||
removeLockKeys(activeJobs)
|
||||
removeJobs(activeKey, activeJobs)
|
||||
removeJobs(baseKey, activeJobs)
|
||||
rcall("LTRIM", activeKey, #activeJobs, -1)
|
||||
if (maxCount <= 0) then return 1 end
|
||||
|
||||
|
||||
@@ -3,16 +3,17 @@
|
||||
In order to be able to remove a job, it must be unlocked.
|
||||
|
||||
Input:
|
||||
KEYS[1] 'active',
|
||||
KEYS[2] 'wait',
|
||||
KEYS[3] 'delayed',
|
||||
KEYS[4] 'paused',
|
||||
KEYS[5] 'completed',
|
||||
KEYS[6] 'failed',
|
||||
KEYS[7] 'priority',
|
||||
KEYS[8] jobId
|
||||
KEYS[9] job logs
|
||||
KEYS[1] 'active',
|
||||
KEYS[2] 'wait',
|
||||
KEYS[3] 'delayed',
|
||||
KEYS[4] 'paused',
|
||||
KEYS[5] 'completed',
|
||||
KEYS[6] 'failed',
|
||||
KEYS[7] 'priority',
|
||||
KEYS[8] jobId key
|
||||
KEYS[9] job logs
|
||||
KEYS[10] rate limiter index table
|
||||
KEYS[11] prefix key
|
||||
|
||||
ARGV[1] jobId
|
||||
ARGV[2] lock token
|
||||
@@ -24,8 +25,12 @@
|
||||
-- TODO PUBLISH global event 'removed'
|
||||
|
||||
local rcall = redis.call
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/removeDebounceKey"
|
||||
|
||||
local lockKey = KEYS[8] .. ':lock'
|
||||
local lock = redis.call("GET", lockKey)
|
||||
local lock = rcall("GET", lockKey)
|
||||
if not lock then -- or (lock == ARGV[2])) then
|
||||
local jobId = ARGV[1]
|
||||
rcall("LREM", KEYS[1], 0, jobId)
|
||||
@@ -35,6 +40,9 @@ if not lock then -- or (lock == ARGV[2])) then
|
||||
rcall("ZREM", KEYS[5], jobId)
|
||||
rcall("ZREM", KEYS[6], jobId)
|
||||
rcall("ZREM", KEYS[7], jobId)
|
||||
|
||||
removeDebounceKey(KEYS[11], KEYS[8])
|
||||
|
||||
rcall("DEL", KEYS[8])
|
||||
rcall("DEL", KEYS[9])
|
||||
|
||||
+8
-1
@@ -57,6 +57,7 @@ const Job = function(queue, name, data, opts) {
|
||||
this.attemptsMade = 0;
|
||||
|
||||
this.toKey = _.bind(queue.toKey, queue);
|
||||
this.debounceId = this.opts.debounce ? this.opts.debounce.id : undefined;
|
||||
};
|
||||
|
||||
function setDefaultOpts(opts) {
|
||||
@@ -82,7 +83,8 @@ function addJob(queue, client, job) {
|
||||
return scripts.addJob(client, queue, jobData, {
|
||||
lifo: opts.lifo,
|
||||
customJobId: opts.jobId,
|
||||
priority: opts.priority
|
||||
priority: opts.priority,
|
||||
debounce: opts.debounce
|
||||
});
|
||||
}
|
||||
|
||||
@@ -182,6 +184,7 @@ Job.prototype.toJSON = function() {
|
||||
failedReason: this.failedReason,
|
||||
stacktrace: this.stacktrace || null,
|
||||
returnvalue: this.returnvalue || null,
|
||||
debounceId: this.debounceId || null,
|
||||
finishedOn: this.finishedOn || null,
|
||||
processedOn: this.processedOn || null
|
||||
};
|
||||
@@ -641,6 +644,10 @@ Job.fromJSON = function(queue, json, jobId) {
|
||||
job.returnvalue = getReturnValue(json.returnvalue);
|
||||
}
|
||||
|
||||
if (json.deid) {
|
||||
job.debounceId = json.deid;
|
||||
}
|
||||
|
||||
return job;
|
||||
};
|
||||
|
||||
|
||||
+20
-3
@@ -276,7 +276,8 @@ const Queue = function Queue(name, url, opts) {
|
||||
'limiter',
|
||||
'drained',
|
||||
'duplicated',
|
||||
'progress'
|
||||
'progress',
|
||||
'de' // debounce key
|
||||
],
|
||||
key => {
|
||||
keys[key] = this.toKey(key);
|
||||
@@ -418,6 +419,7 @@ Queue.prototype._setupQueueEventListeners = function() {
|
||||
const failedKey = this.keys.failed;
|
||||
const drainedKey = this.keys.drained;
|
||||
const duplicatedKey = this.keys.duplicated;
|
||||
const debouncedKey = this.keys.de + 'bounced';
|
||||
|
||||
const pmessageHandler = (pattern, channel, message) => {
|
||||
const keyAndToken = channel.split('@');
|
||||
@@ -445,6 +447,12 @@ Queue.prototype._setupQueueEventListeners = function() {
|
||||
}
|
||||
utils.emitSafe(this, 'global:duplicated', message);
|
||||
break;
|
||||
case debouncedKey:
|
||||
if (this.token === token) {
|
||||
utils.emitSafe(this, 'debounced', message);
|
||||
}
|
||||
utils.emitSafe(this, 'global:debounced', message);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -513,7 +521,7 @@ Queue.prototype._setupQueueEventListeners = function() {
|
||||
};
|
||||
|
||||
Queue.prototype._registerEvent = function(eventName) {
|
||||
const internalEvents = ['waiting', 'delayed', 'duplicated'];
|
||||
const internalEvents = ['waiting', 'delayed', 'duplicated', 'debounced'];
|
||||
|
||||
if (
|
||||
eventName.startsWith('global:') ||
|
||||
@@ -531,7 +539,7 @@ Queue.prototype._registerEvent = function(eventName) {
|
||||
.isRedisReady(this.eclient)
|
||||
.then(() => {
|
||||
const channel = this.toKey(_eventName);
|
||||
if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) {
|
||||
if (['active', 'waiting', 'stalled', 'duplicated', 'debounced'].indexOf(_eventName) !== -1) {
|
||||
return (this.registeredEvents[_eventName] = this.eclient.psubscribe(
|
||||
channel + '*'
|
||||
));
|
||||
@@ -782,6 +790,15 @@ Queue.prototype.retryJobs = async function(opts = {}) {
|
||||
} while (cursor);
|
||||
};
|
||||
|
||||
/**
|
||||
* Removes a debounce key.
|
||||
*
|
||||
* @param id - identifier
|
||||
*/
|
||||
Queue.prototype.removeDebounceKey = (id) => {
|
||||
return this.client.del(`${this.keys.de}:${id}`);
|
||||
}
|
||||
|
||||
/**
|
||||
Adds an array of jobs to the queue.
|
||||
@method add
|
||||
|
||||
+6
-2
@@ -43,7 +43,10 @@ const scripts = {
|
||||
job.delay ? job.timestamp + job.delay : 0,
|
||||
opts.priority || 0,
|
||||
opts.lifo ? 'RPUSH' : 'LPUSH',
|
||||
queue.token
|
||||
queue.token,
|
||||
job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null,
|
||||
opts.debounce ? opts.debounce.id : null,
|
||||
opts.debounce ? opts.debounce.ttl : null,
|
||||
];
|
||||
keys = keys.concat(args);
|
||||
return client.addJob(keys);
|
||||
@@ -401,7 +404,8 @@ const scripts = {
|
||||
queue.keys.priority,
|
||||
queue.toKey(jobId),
|
||||
queue.toKey(`${jobId}:logs`),
|
||||
queue.keys.limiter
|
||||
queue.keys.limiter,
|
||||
queue.toKey(''),
|
||||
];
|
||||
return queue.client.removeJob(keys.concat([jobId, queue.token]));
|
||||
},
|
||||
|
||||
@@ -1137,6 +1137,146 @@ describe('Queue', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('when job is debounced when added again with same debounce id', () => {
|
||||
describe('when ttl is provided', () => {
|
||||
it('used a fixed time period and emits debounced event', async () => {
|
||||
const job = await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
|
||||
let debouncedCounter = 0;
|
||||
let secondJob = null;
|
||||
queue.on('debounced', (jobId) => {
|
||||
if (debouncedCounter > 1) {
|
||||
expect(jobId).to.be.equal(secondJob.id);
|
||||
} else {
|
||||
expect(jobId).to.be.equal(job.id);
|
||||
}
|
||||
debouncedCounter++;
|
||||
});
|
||||
|
||||
await delay(1000);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await delay(1100);
|
||||
secondJob = await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await delay(100);
|
||||
|
||||
expect(debouncedCounter).to.be.equal(4);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when removing debounced job', () => {
|
||||
it('removes debounce key', async ()=> {
|
||||
const job = await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
|
||||
let debouncedCounter = 0;
|
||||
queue.on('debounced', () => {
|
||||
debouncedCounter++;
|
||||
});
|
||||
await job.remove();
|
||||
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await delay(1000);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await delay(1100);
|
||||
const secondJob = await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await secondJob.remove();
|
||||
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1', ttl: 2000 } },
|
||||
);
|
||||
await delay(100);
|
||||
|
||||
expect(debouncedCounter).to.be.equal(2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when ttl is not provided', ()=> {
|
||||
it('waits until job is finished before removing debounce key', async ()=> {
|
||||
queue.process(
|
||||
async () => {
|
||||
await delay(100);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1' } },
|
||||
);
|
||||
await delay(100);
|
||||
await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1' } },
|
||||
);
|
||||
await delay(100);
|
||||
}
|
||||
);
|
||||
|
||||
let debouncedCounter = 0;
|
||||
|
||||
const completing = new Promise(resolve => {
|
||||
queue.once('completed', ({ id }) => {
|
||||
expect(id).to.be.equal('1');
|
||||
resolve();
|
||||
});
|
||||
|
||||
queue.on('debounced', () => {
|
||||
debouncedCounter++;
|
||||
});
|
||||
});
|
||||
|
||||
await queue.add({ foo: 'bar' }, { debounce: { id: 'a1' } });
|
||||
|
||||
await completing;
|
||||
|
||||
const secondJob = await queue.add(
|
||||
{ foo: 'bar' },
|
||||
{ debounce: { id: 'a1' } },
|
||||
);
|
||||
|
||||
const count = await queue.getJobCountByTypes();
|
||||
|
||||
expect(count).to.be.eql(2);
|
||||
|
||||
expect(debouncedCounter).to.be.equal(2);
|
||||
expect(secondJob.id).to.be.equal('4');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('process a job that updates progress', done => {
|
||||
queue.process((job, jobDone) => {
|
||||
expect(job.data.foo).to.be.equal('bar');
|
||||
|
||||
Reference in New Issue
Block a user