feat(job): support debouncing (#2760)

This commit is contained in:
Rogger Valverde
2024-07-30 07:49:44 -06:00
committed by GitHub
parent f6d29fcc7f
commit 603befe439
13 changed files with 292 additions and 29 deletions
+24 -2
View File
@@ -33,6 +33,9 @@
ARGV[9] priority
ARGV[10] LIFO
ARGV[11] token
ARGV[12] debounce key
ARGV[13] debounceId
ARGV[14] debounceTtl
]]
local jobId
local jobIdKey
@@ -40,6 +43,7 @@ local rcall = redis.call
-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/debounceJob"
--- @include "includes/getTargetQueueList"
local jobCounter = rcall("INCR", KEYS[4])
@@ -56,10 +60,28 @@ else
end
end
local debounceKey = ARGV[12]
local opts = cmsgpack.unpack(ARGV[5])
-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
local debouncedJobId = debounceJob(ARGV[1], ARGV[13], ARGV[14],
jobId, debounceKey, ARGV[11])
if debouncedJobId then
return debouncedJobId
end
local debounceId = ARGV[13]
local optionalValues = {}
if debounceId ~= "" then
table.insert(optionalValues, "deid")
table.insert(optionalValues, debounceId)
end
-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp",
ARGV[6], "delay", ARGV[7], "priority", ARGV[9], unpack(optionalValues))
-- Check if job is delayed
local delayedTimestamp = tonumber(ARGV[8])
+11 -3
View File
@@ -6,7 +6,7 @@
KEYS[2] priority key
KEYS[3] rate limiter key
ARGV[1] jobId
ARGV[1] prefix key
ARGV[2] maxTimestamp
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
@@ -16,7 +16,7 @@ local setKey = KEYS[1]
local priorityKey = KEYS[2]
local rateLimiterKey = KEYS[3]
local jobKeyPrefix = ARGV[1]
local prefixKey = ARGV[1]
local maxTimestamp = ARGV[2]
local limitStr = ARGV[3]
local setName = ARGV[4]
@@ -24,6 +24,9 @@ local setName = ARGV[4]
local isList = false
local rcall = redis.call
-- Includes
--- @include "includes/removeDebounceKey"
if setName == "wait" or setName == "active" or setName == "paused" then
isList = true
end
@@ -75,7 +78,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
break
end
local jobKey = jobKeyPrefix .. jobId
local jobKey = prefixKey .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Find the right timestamp of the job to compare to maxTimestamp:
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
@@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
rcall("ZREM", setKey, jobId)
end
rcall("ZREM", priorityKey, jobId)
if setName ~= "completed" and setName ~= "failed" then
removeDebounceKey(prefixKey, jobKey)
end
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")
+20
View File
@@ -0,0 +1,20 @@
--[[
Function to debounce a job.
]]
local function debounceJob(prefixKey, debounceId, ttl, jobId, debounceKey, token)
if debounceId ~= "" then
local debounceKeyExists
if ttl ~= "" then
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
else
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
end
if debounceKeyExists then
local currentDebounceJobId = rcall('GET', debounceKey)
rcall("PUBLISH", prefixKey .. "debounced@" .. token, currentDebounceJobId)
return currentDebounceJobId
end
end
end
@@ -0,0 +1,12 @@
--[[
Function to remove debounce key.
]]
local function removeDebounceKey(prefixKey, jobKey)
local debounceId = rcall("HGET", jobKey, "deid")
if debounceId then
local debounceKey = prefixKey .. "de:" .. debounceId
rcall("DEL", debounceKey)
end
end
@@ -0,0 +1,14 @@
--[[
Function to remove debounce key if needed.
]]
local function removeDebounceKeyIfNeeded(prefixKey, debounceId)
if debounceId then
local debounceKey = prefixKey .. "de:" .. debounceId
local pttl = rcall("PTTL", debounceKey)
if pttl == 0 or pttl == -1 then
rcall("DEL", debounceKey)
end
end
end
+4 -2
View File
@@ -25,6 +25,7 @@ local rcall = redis.call
-- Includes
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeDebounceKeyIfNeeded"
local function removeJob(jobId, baseKey)
local jobKey = baseKey .. jobId
@@ -78,12 +79,13 @@ if(#stalling > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
if(stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local jobAttributes = rcall("HMGET", jobKey, "opts", "deid")
local opts = cjson.decode(jobAttributes[1])
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
"finishedOn", ARGV[3])
removeDebounceKeyIfNeeded(ARGV[2], jobAttributes[2])
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}')
if removeOnFailType == "number" then
+4
View File
@@ -84,6 +84,7 @@ end
-- Includes
--- @include "includes/removeLock"
--- @include "includes/removeDebounceKeyIfNeeded"
if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1])
@@ -96,6 +97,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
if numRemovedElements < 1 then return -3 end
local debounceId = rcall("HGET", KEYS[3], "deid")
removeDebounceKeyIfNeeded(ARGV[9], debounceId)
-- Remove job?
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
+11 -6
View File
@@ -18,6 +18,10 @@ local maxCount = tonumber(ARGV[1])
local baseKey = KEYS[2]
local rcall = redis.call
-- Includes
--- @include "includes/removeDebounceKey"
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end
@@ -26,23 +30,24 @@ local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end
local function removeJobs(parentKey, keys)
local function removeJobs(baseKey, keys)
for i, key in ipairs(keys) do
rcall("DEL", baseKey .. key)
rcall("DEL", baseKey .. key .. ':logs')
local jobKey = baseKey .. key
rcall("DEL", jobKey, jobKey .. ':logs')
removeDebounceKey(baseKey, jobKey)
end
maxCount = maxCount - #keys
end
local function removeListJobs(keyName, max)
local jobs = getListItems(keyName, max)
removeJobs(keyName, jobs)
removeJobs(baseKey, jobs)
rcall("LTRIM", keyName, #jobs, -1)
end
local function removeZSetJobs(keyName, max)
local jobs = getZSetItems(keyName, max)
removeJobs(keyName, jobs)
removeJobs(baseKey, jobs)
if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end
end
@@ -65,7 +70,7 @@ if (#activeJobs > 0) then
end
removeLockKeys(activeJobs)
removeJobs(activeKey, activeJobs)
removeJobs(baseKey, activeJobs)
rcall("LTRIM", activeKey, #activeJobs, -1)
if (maxCount <= 0) then return 1 end
@@ -3,16 +3,17 @@
In order to be able to remove a job, it must be unlocked.
Input:
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] jobId
KEYS[9] job logs
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] jobId key
KEYS[9] job logs
KEYS[10] rate limiter index table
KEYS[11] prefix key
ARGV[1] jobId
ARGV[2] lock token
@@ -24,8 +25,12 @@
-- TODO PUBLISH global event 'removed'
local rcall = redis.call
-- Includes
--- @include "includes/removeDebounceKey"
local lockKey = KEYS[8] .. ':lock'
local lock = redis.call("GET", lockKey)
local lock = rcall("GET", lockKey)
if not lock then -- or (lock == ARGV[2])) then
local jobId = ARGV[1]
rcall("LREM", KEYS[1], 0, jobId)
@@ -35,6 +40,9 @@ if not lock then -- or (lock == ARGV[2])) then
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)
removeDebounceKey(KEYS[11], KEYS[8])
rcall("DEL", KEYS[8])
rcall("DEL", KEYS[9])
+8 -1
View File
@@ -57,6 +57,7 @@ const Job = function(queue, name, data, opts) {
this.attemptsMade = 0;
this.toKey = _.bind(queue.toKey, queue);
this.debounceId = this.opts.debounce ? this.opts.debounce.id : undefined;
};
function setDefaultOpts(opts) {
@@ -82,7 +83,8 @@ function addJob(queue, client, job) {
return scripts.addJob(client, queue, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
priority: opts.priority,
debounce: opts.debounce
});
}
@@ -182,6 +184,7 @@ Job.prototype.toJSON = function() {
failedReason: this.failedReason,
stacktrace: this.stacktrace || null,
returnvalue: this.returnvalue || null,
debounceId: this.debounceId || null,
finishedOn: this.finishedOn || null,
processedOn: this.processedOn || null
};
@@ -641,6 +644,10 @@ Job.fromJSON = function(queue, json, jobId) {
job.returnvalue = getReturnValue(json.returnvalue);
}
if (json.deid) {
job.debounceId = json.deid;
}
return job;
};
+20 -3
View File
@@ -276,7 +276,8 @@ const Queue = function Queue(name, url, opts) {
'limiter',
'drained',
'duplicated',
'progress'
'progress',
'de' // debounce key
],
key => {
keys[key] = this.toKey(key);
@@ -418,6 +419,7 @@ Queue.prototype._setupQueueEventListeners = function() {
const failedKey = this.keys.failed;
const drainedKey = this.keys.drained;
const duplicatedKey = this.keys.duplicated;
const debouncedKey = this.keys.de + 'bounced';
const pmessageHandler = (pattern, channel, message) => {
const keyAndToken = channel.split('@');
@@ -445,6 +447,12 @@ Queue.prototype._setupQueueEventListeners = function() {
}
utils.emitSafe(this, 'global:duplicated', message);
break;
case debouncedKey:
if (this.token === token) {
utils.emitSafe(this, 'debounced', message);
}
utils.emitSafe(this, 'global:debounced', message);
break;
}
};
@@ -513,7 +521,7 @@ Queue.prototype._setupQueueEventListeners = function() {
};
Queue.prototype._registerEvent = function(eventName) {
const internalEvents = ['waiting', 'delayed', 'duplicated'];
const internalEvents = ['waiting', 'delayed', 'duplicated', 'debounced'];
if (
eventName.startsWith('global:') ||
@@ -531,7 +539,7 @@ Queue.prototype._registerEvent = function(eventName) {
.isRedisReady(this.eclient)
.then(() => {
const channel = this.toKey(_eventName);
if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) {
if (['active', 'waiting', 'stalled', 'duplicated', 'debounced'].indexOf(_eventName) !== -1) {
return (this.registeredEvents[_eventName] = this.eclient.psubscribe(
channel + '*'
));
@@ -782,6 +790,15 @@ Queue.prototype.retryJobs = async function(opts = {}) {
} while (cursor);
};
/**
* Removes a debounce key.
*
* @param id - identifier
*/
Queue.prototype.removeDebounceKey = (id) => {
return this.client.del(`${this.keys.de}:${id}`);
}
/**
Adds an array of jobs to the queue.
@method add
+6 -2
View File
@@ -43,7 +43,10 @@ const scripts = {
job.delay ? job.timestamp + job.delay : 0,
opts.priority || 0,
opts.lifo ? 'RPUSH' : 'LPUSH',
queue.token
queue.token,
job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null,
opts.debounce ? opts.debounce.id : null,
opts.debounce ? opts.debounce.ttl : null,
];
keys = keys.concat(args);
return client.addJob(keys);
@@ -401,7 +404,8 @@ const scripts = {
queue.keys.priority,
queue.toKey(jobId),
queue.toKey(`${jobId}:logs`),
queue.keys.limiter
queue.keys.limiter,
queue.toKey(''),
];
return queue.client.removeJob(keys.concat([jobId, queue.token]));
},
+140
View File
@@ -1137,6 +1137,146 @@ describe('Queue', () => {
});
});
describe('when job is debounced when added again with same debounce id', () => {
describe('when ttl is provided', () => {
it('used a fixed time period and emits debounced event', async () => {
const job = await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
let debouncedCounter = 0;
let secondJob = null;
queue.on('debounced', (jobId) => {
if (debouncedCounter > 1) {
expect(jobId).to.be.equal(secondJob.id);
} else {
expect(jobId).to.be.equal(job.id);
}
debouncedCounter++;
});
await delay(1000);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await delay(1100);
secondJob = await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await delay(100);
expect(debouncedCounter).to.be.equal(4);
});
});
describe('when removing debounced job', () => {
it('removes debounce key', async ()=> {
const job = await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
let debouncedCounter = 0;
queue.on('debounced', () => {
debouncedCounter++;
});
await job.remove();
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await delay(1000);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await delay(1100);
const secondJob = await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await secondJob.remove();
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1', ttl: 2000 } },
);
await delay(100);
expect(debouncedCounter).to.be.equal(2);
});
});
describe('when ttl is not provided', ()=> {
it('waits until job is finished before removing debounce key', async ()=> {
queue.process(
async () => {
await delay(100);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1' } },
);
await delay(100);
await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1' } },
);
await delay(100);
}
);
let debouncedCounter = 0;
const completing = new Promise(resolve => {
queue.once('completed', ({ id }) => {
expect(id).to.be.equal('1');
resolve();
});
queue.on('debounced', () => {
debouncedCounter++;
});
});
await queue.add({ foo: 'bar' }, { debounce: { id: 'a1' } });
await completing;
const secondJob = await queue.add(
{ foo: 'bar' },
{ debounce: { id: 'a1' } },
);
const count = await queue.getJobCountByTypes();
expect(count).to.be.eql(2);
expect(debouncedCounter).to.be.equal(2);
expect(secondJob.id).to.be.equal('4');
});
});
});
it('process a job that updates progress', done => {
queue.process((job, jobDone) => {
expect(job.data.foo).to.be.equal('bar');