feat(queue): add retryJobs for failed status

This commit is contained in:
rogger andré valverde flores
2022-01-31 19:38:11 -05:00
committed by Manuel Astudillo
parent 7665c8329c
commit 501b2cc49c
4 changed files with 146 additions and 0 deletions
+57
View File
@@ -0,0 +1,57 @@
--[[
Attempts to retry all failed jobs
Input:
KEYS[1] base key
KEYS[2] failed state key
KEYS[3] wait state key
ARGV[1] count
Output:
1 means the operation is not completed
0 means the operation is completed
]]
local baseKey = KEYS[1]
local maxCount = tonumber(ARGV[1])
local rcall = redis.call;
local function batches(n, batchSize)
local i = 0
return function()
local from = i * batchSize + 1
i = i + 1
if (from <= n) then
local to = math.min(from + batchSize - 1, n)
return from, to
end
end
end
local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end
local jobs = getZSetItems(KEYS[2], maxCount)
if (#jobs > 0) then
for i, key in ipairs(jobs) do
local jobKey = baseKey .. key
rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason")
end
for from, to in batches(#jobs, 7000) do
rcall("ZREM", KEYS[2], unpack(jobs, from, to))
rcall("LPUSH", KEYS[3], unpack(jobs, from, to))
end
end
maxCount = maxCount - #jobs
if(maxCount <= 0) then
return 1
end
return 0
+13
View File
@@ -725,6 +725,19 @@ Queue.prototype.add = function(name, data, opts) {
}
};
/**
* Retry all the failed jobs.
*
* @param opts.count - number to limit how many jobs will be moved to wait status per iteration
* @returns
*/
Queue.prototype.retryJobs = async function(opts = {}) {
let cursor = 0;
do {
cursor = await scripts.retryJobs(this, opts.count);
} while (cursor);
};
/**
Adds an array of jobs to the queue.
@method add
+23
View File
@@ -114,6 +114,29 @@ const scripts = {
});
},
retryJobsArgs(
queue,
count,
) {
const keys = [
queue.toKey(''),
queue.toKey('failed'),
queue.toKey('wait'),
];
const args = [count];
return keys.concat(args);
},
async retryJobs(queue, count = 1000){
const client = await queue.client;
const args = this.retryJobsArgs(queue, count);
return (client).retryJobs(args);
},
moveToFinishedArgs(
job,
val,
+53
View File
@@ -612,6 +612,59 @@ describe('Queue', () => {
.catch(done);
});
describe('.retryJobs', () => {
it('should retry all failed jobs', async () => {
const jobCount = 8;
let fail = true;
queue.process(async () => {
await delay(10);
if (fail) {
throw new Error('failed');
}
});
let order = 0;
const failing = new Promise(resolve => {
queue.on('failed', job => {
expect(order).to.be.eql(job.data.idx);
if (order === jobCount - 1) {
resolve();
}
order++;
});
});
for (const index of Array.from(Array(jobCount).keys())) {
await queue.add({ idx: index });
}
await failing;
const failedCount = await queue.getJobCounts('failed');
expect(failedCount.failed).to.be.equal(jobCount);
order = 0;
const completing = new Promise(resolve => {
queue.on('completed', job => {
expect(order).to.be.eql(job.data.idx);
if (order === jobCount - 1) {
resolve();
}
order++;
});
});
fail = false;
await queue.retryJobs({ count: 2 });
await completing;
const CompletedCount = await queue.getJobCounts('completed');
expect(CompletedCount.completed).to.be.equal(jobCount);
});
});
it('should keep specified number of jobs after completed with removeOnComplete', async () => {
const keepJobs = 3;
await testRemoveOnFinish(keepJobs, keepJobs);