Merge pull request #537 from OptimalBits/movetofinished-errors

better error propagation moveToFinished #499
This commit is contained in:
Manuel Astudillo
2017-05-24 08:51:32 +02:00
committed by GitHub
5 changed files with 61 additions and 22 deletions
-2
View File
@@ -7,8 +7,6 @@
KEYS[3] jobId
KEYS[4] 'added'
pushCmd,
jobId
ARGV[1] pushCmd
ARGV[2] jobId
ARGV[3] token
+12 -13
View File
@@ -158,6 +158,7 @@ Job.prototype.discard = function(){
Job.prototype.moveToFailed = function(err, ignoreLock){
var _this = this;
return new Promise(function(resolve, reject){
var command;
var multi = _this.queue.client.multi();
_this._saveAttempt(multi, err);
@@ -169,27 +170,25 @@ Job.prototype.moveToFailed = function(err, ignoreLock){
// If so, move to delayed (need to unlock job in this case!)
var args = scripts.moveToDelayedArgs(_this.queue, _this.id, Date.now() + backoff, ignoreLock);
multi.moveToDelayed(args);
command = 'delayed';
}else{
// If not, retry immediately
multi.retryJob(scripts.retryJobArgs(_this, ignoreLock), function(err, result){
if(err){
reject();
}else{
switch(result){
case -1:
reject(new Error('Missing Job ' + _this.id + ' during retry'));
case -2:
reject(new Error('Missing Job lock for ' + _this.id + ' during retry'));
}
}
});
multi.retryJob(scripts.retryJobArgs(_this, ignoreLock));
command = 'retry';
}
} else {
// If not, move to failed
var args = scripts.moveToFailedArgs(_this, err.message, _this.opts.removeOnFail, ignoreLock);
multi.moveToFinished(args);
command = 'failed';
}
return multi.exec().then(resolve, reject);
return multi.exec().then(function(results){
var code = _.last(results)[1];
if (code < 0){
return reject(scripts.finishedErrors(code, _this.id, command));
}
resolve();
}, reject);
});
};
+4 -2
View File
@@ -714,7 +714,10 @@ Queue.prototype.processJobs = function(index, resolve){
(_this.paused || Promise.resolve()).then(function(){
return _this.processing[index] = _this.getNextJob()
.then(_this.processJob)
.then(processJobs, function(/*err*/){
.then(processJobs, function(err){
_this.emit('error', err, 'Error processing job');
//
// Wait before trying to process again.
//
@@ -783,7 +786,6 @@ Queue.prototype.processJob = function(job){
function handleFailed(err){
var error = err.cause || err; //Handle explicit rejection
// See https://github.com/OptimalBits/bull/pull/415#issuecomment-269744735
return job.moveToFailed(err).then(function(){
_this.emit('failed', job, error, 'active');
});
+12 -1
View File
@@ -124,7 +124,18 @@ var scripts = {
moveToFinished: function(job, val, propVal, shouldRemove, target, ignoreLock){
var args = scripts.moveToFinishedArgs(job, val, propVal, shouldRemove, target, ignoreLock);
return job.queue.client.moveToFinished(args);
return job.queue.client.moveToFinished(args).then(function(result){
if(result < 0){
throw scripts.finishedErrors(result, job.id, 'finished');
}
});
},
finishedErrors: function(code, jobId, command){
switch(code){
case -1: return new Error('Missing key for job ' + jobId + ' ' + command);
case -2: return new Error('Missing lock for job ' + jobId + ' ' + command);
}
},
// TODO: add a retention argument for completed and finished jobs (in time).
+33 -4
View File
@@ -1495,16 +1495,18 @@ describe('Queue', function () {
it('an unlocked job should not be moved to delayed', function(done) {
var queue = new Queue('delayed queue');
var job;
queue.process(function(job, callback) {
queue.process(function(_job, callback) {
// Release the lock to simulate the event loop stalling (so failure to renew the lock).
job = _job;
job.releaseLock().then(function() {
// Once it's failed, it should NOT be moved to delayed since this worker lost the lock.
callback(new Error('retry this job'));
});
});
queue.on('failed', function(job) {
queue.on('error', function(err){
job.isDelayed().then(function(isDelayed) {
expect(isDelayed).to.be.equal(false);
queue.close().then(done, done);
@@ -1516,8 +1518,10 @@ describe('Queue', function () {
it('an unlocked job should not be moved to waiting', function(done) {
var queue = new Queue('delayed queue');
var job;
queue.process(function(job, callback) {
queue.process(function(_job, callback) {
job = _job;
// Release the lock to simulate the event loop stalling (so failure to renew the lock).
job.releaseLock().then(function() {
// Once it's failed, it should NOT be moved to waiting since this worker lost the lock.
@@ -1525,7 +1529,7 @@ describe('Queue', function () {
});
});
queue.on('failed', function(job) {
queue.on('error', function(err){
job.isWaiting().then(function(isWaiting) {
expect(isWaiting).to.be.equal(false);
queue.close().then(done, done);
@@ -1900,6 +1904,31 @@ describe('Queue', function () {
});
queue.add({ foo: 'bar' }).then(addedHandler);
});
it('an unlocked job should not be moved to failed', function(done) {
queue = utils.buildQueue('test unlocked failed');
queue.process(function(job, callback) {
// Release the lock to simulate the event loop stalling (so failure to renew the lock).
job.releaseLock().then(function() {
// Once it's failed, it should NOT be moved to failed since this worker lost the lock.
callback(new Error('retry this job'));
});
});
queue.on('failed', function(job) {
job.isFailed().then(function(isFailed) {
expect(isFailed).to.be.equal(false);
});
});
queue.on('error', function(err){
queue.close().then(done, done);
});
// Note that backoff:0 should immediately retry the job upon failure (ie put it in 'waiting')
queue.add({ foo: 'bar' }, { backoff: 0, attempts: 2 });
});
});
describe('Jobs getters', function () {