mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-04 17:37:42 +08:00
feat(queue): add retryJobs for failed status
This commit is contained in:
committed by
Manuel Astudillo
parent
7665c8329c
commit
501b2cc49c
@@ -0,0 +1,57 @@
|
||||
--[[
|
||||
Attempts to retry all failed jobs
|
||||
|
||||
Input:
|
||||
KEYS[1] base key
|
||||
KEYS[2] failed state key
|
||||
KEYS[3] wait state key
|
||||
|
||||
ARGV[1] count
|
||||
|
||||
Output:
|
||||
1 means the operation is not completed
|
||||
0 means the operation is completed
|
||||
]]
|
||||
local baseKey = KEYS[1]
|
||||
local maxCount = tonumber(ARGV[1])
|
||||
|
||||
local rcall = redis.call;
|
||||
|
||||
local function batches(n, batchSize)
|
||||
local i = 0
|
||||
|
||||
return function()
|
||||
local from = i * batchSize + 1
|
||||
i = i + 1
|
||||
if (from <= n) then
|
||||
local to = math.min(from + batchSize - 1, n)
|
||||
return from, to
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
local function getZSetItems(keyName, max)
|
||||
return rcall('ZRANGE', keyName, 0, max - 1)
|
||||
end
|
||||
|
||||
local jobs = getZSetItems(KEYS[2], maxCount)
|
||||
|
||||
if (#jobs > 0) then
|
||||
for i, key in ipairs(jobs) do
|
||||
local jobKey = baseKey .. key
|
||||
rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason")
|
||||
end
|
||||
|
||||
for from, to in batches(#jobs, 7000) do
|
||||
rcall("ZREM", KEYS[2], unpack(jobs, from, to))
|
||||
rcall("LPUSH", KEYS[3], unpack(jobs, from, to))
|
||||
end
|
||||
end
|
||||
|
||||
maxCount = maxCount - #jobs
|
||||
|
||||
if(maxCount <= 0) then
|
||||
return 1
|
||||
end
|
||||
|
||||
return 0
|
||||
@@ -725,6 +725,19 @@ Queue.prototype.add = function(name, data, opts) {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Retry all the failed jobs.
|
||||
*
|
||||
* @param opts.count - number to limit how many jobs will be moved to wait status per iteration
|
||||
* @returns
|
||||
*/
|
||||
Queue.prototype.retryJobs = async function(opts = {}) {
|
||||
let cursor = 0;
|
||||
do {
|
||||
cursor = await scripts.retryJobs(this, opts.count);
|
||||
} while (cursor);
|
||||
};
|
||||
|
||||
/**
|
||||
Adds an array of jobs to the queue.
|
||||
@method add
|
||||
|
||||
@@ -114,6 +114,29 @@ const scripts = {
|
||||
});
|
||||
},
|
||||
|
||||
retryJobsArgs(
|
||||
queue,
|
||||
count,
|
||||
) {
|
||||
const keys = [
|
||||
queue.toKey(''),
|
||||
queue.toKey('failed'),
|
||||
queue.toKey('wait'),
|
||||
];
|
||||
|
||||
const args = [count];
|
||||
|
||||
return keys.concat(args);
|
||||
},
|
||||
|
||||
async retryJobs(queue, count = 1000){
|
||||
const client = await queue.client;
|
||||
|
||||
const args = this.retryJobsArgs(queue, count);
|
||||
|
||||
return (client).retryJobs(args);
|
||||
},
|
||||
|
||||
moveToFinishedArgs(
|
||||
job,
|
||||
val,
|
||||
|
||||
@@ -612,6 +612,59 @@ describe('Queue', () => {
|
||||
.catch(done);
|
||||
});
|
||||
|
||||
describe('.retryJobs', () => {
|
||||
it('should retry all failed jobs', async () => {
|
||||
const jobCount = 8;
|
||||
|
||||
let fail = true;
|
||||
queue.process(async () => {
|
||||
await delay(10);
|
||||
if (fail) {
|
||||
throw new Error('failed');
|
||||
}
|
||||
});
|
||||
|
||||
let order = 0;
|
||||
const failing = new Promise(resolve => {
|
||||
queue.on('failed', job => {
|
||||
expect(order).to.be.eql(job.data.idx);
|
||||
if (order === jobCount - 1) {
|
||||
resolve();
|
||||
}
|
||||
order++;
|
||||
});
|
||||
});
|
||||
|
||||
for (const index of Array.from(Array(jobCount).keys())) {
|
||||
await queue.add({ idx: index });
|
||||
}
|
||||
|
||||
await failing;
|
||||
|
||||
const failedCount = await queue.getJobCounts('failed');
|
||||
expect(failedCount.failed).to.be.equal(jobCount);
|
||||
|
||||
order = 0;
|
||||
const completing = new Promise(resolve => {
|
||||
queue.on('completed', job => {
|
||||
expect(order).to.be.eql(job.data.idx);
|
||||
if (order === jobCount - 1) {
|
||||
resolve();
|
||||
}
|
||||
order++;
|
||||
});
|
||||
});
|
||||
|
||||
fail = false;
|
||||
await queue.retryJobs({ count: 2 });
|
||||
|
||||
await completing;
|
||||
|
||||
const CompletedCount = await queue.getJobCounts('completed');
|
||||
expect(CompletedCount.completed).to.be.equal(jobCount);
|
||||
});
|
||||
});
|
||||
|
||||
it('should keep specified number of jobs after completed with removeOnComplete', async () => {
|
||||
const keepJobs = 3;
|
||||
await testRemoveOnFinish(keepJobs, keepJobs);
|
||||
|
||||
Reference in New Issue
Block a user