mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 08:27:43 +08:00
fix(scripts): throw error when moving non-active job to delayed (#2740)
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
local function removeLock(jobKey, stalledKey, token, jobId)
|
||||
if token ~= "0" then
|
||||
local lockKey = jobKey .. ':lock'
|
||||
local lockToken = rcall("GET", lockKey)
|
||||
if lockToken == token then
|
||||
rcall("DEL", lockKey)
|
||||
rcall("SREM", stalledKey, jobId)
|
||||
else
|
||||
if lockToken then
|
||||
-- Lock exists but token does not match
|
||||
return -6
|
||||
else
|
||||
-- Lock is missing completely
|
||||
return -2
|
||||
end
|
||||
end
|
||||
end
|
||||
return 0
|
||||
end
|
||||
@@ -21,22 +21,21 @@
|
||||
]]
|
||||
local rcall = redis.call
|
||||
|
||||
-- Includes
|
||||
--- @include "includes/removeLock"
|
||||
|
||||
if rcall("EXISTS", KEYS[3]) == 1 then
|
||||
-- Check for job lock
|
||||
if ARGV[3] ~= "0" then
|
||||
local lockKey = KEYS[3] .. ':lock'
|
||||
if rcall("GET", lockKey) == ARGV[3] then
|
||||
rcall("DEL", lockKey)
|
||||
rcall("SREM", KEYS[4], ARGV[2])
|
||||
else
|
||||
return -2
|
||||
end
|
||||
local errorCode = removeLock(KEYS[3], KEYS[4], ARGV[3], ARGV[2])
|
||||
if errorCode < 0 then
|
||||
return errorCode
|
||||
end
|
||||
|
||||
local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[2])
|
||||
if numRemovedElements < 1 then return -3 end
|
||||
|
||||
local score = tonumber(ARGV[1])
|
||||
rcall("ZADD", KEYS[2], score, ARGV[2])
|
||||
rcall("PUBLISH", KEYS[2], (score / 0x1000))
|
||||
rcall("LREM", KEYS[1], 0, ARGV[2])
|
||||
|
||||
return 0
|
||||
else
|
||||
|
||||
+1
-1
@@ -339,7 +339,7 @@ Job.prototype.moveToFailed = async function(err, ignoreLock) {
|
||||
const results = await multi.exec();
|
||||
const code = _.last(results)[1];
|
||||
if (code < 0) {
|
||||
throw scripts.finishedErrors(code, this.id, command);
|
||||
throw scripts.finishedErrors(code, this.id, command, 'active');
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
+6
-2
@@ -238,7 +238,7 @@ const scripts = {
|
||||
);
|
||||
return job.queue.client.moveToFinished(args).then(result => {
|
||||
if (result < 0) {
|
||||
throw scripts.finishedErrors(result, job.id, 'finished');
|
||||
throw scripts.finishedErrors(result, job.id, 'finished', 'active');
|
||||
} else if (result) {
|
||||
return raw2jobData(result);
|
||||
}
|
||||
@@ -246,12 +246,16 @@ const scripts = {
|
||||
});
|
||||
},
|
||||
|
||||
finishedErrors(code, jobId, command) {
|
||||
finishedErrors(code, jobId, command, state) {
|
||||
switch (code) {
|
||||
case -1:
|
||||
return new Error('Missing key for job ' + jobId + ' ' + command);
|
||||
case -2:
|
||||
return new Error('Missing lock for job ' + jobId + ' ' + command);
|
||||
case -3:
|
||||
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
|
||||
case -6:
|
||||
return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
+9
-2
@@ -726,6 +726,9 @@ describe('Job', () => {
|
||||
.then(isFailed => {
|
||||
expect(isFailed).to.be(false);
|
||||
})
|
||||
.then(() => {
|
||||
return scripts.moveToActive(queue);
|
||||
})
|
||||
.then(() => {
|
||||
return job.moveToFailed(new Error('test error'), true);
|
||||
})
|
||||
@@ -893,7 +896,9 @@ describe('Job', () => {
|
||||
})
|
||||
.then(state => {
|
||||
expect(state).to.be('completed');
|
||||
return client.zrem(queue.toKey('completed'), job.id);
|
||||
return client.zrem(queue.toKey('completed'), job.id).then(()=>{
|
||||
return client.lpush(queue.toKey('active'), job.id)
|
||||
});
|
||||
})
|
||||
.then(() => {
|
||||
return job.moveToDelayed(Date.now() + 10000, true);
|
||||
@@ -907,7 +912,9 @@ describe('Job', () => {
|
||||
})
|
||||
.then(state => {
|
||||
expect(state).to.be('delayed');
|
||||
return client.zrem(queue.toKey('delayed'), job.id);
|
||||
return client.zrem(queue.toKey('delayed'), job.id).then(()=>{
|
||||
return client.lpush(queue.toKey('active'), job.id)
|
||||
});
|
||||
})
|
||||
.then(() => {
|
||||
return job.moveToFailed(new Error('test'), true);
|
||||
|
||||
Reference in New Issue
Block a user