fix: better handling of maxRetriesPerRequest

This commit is contained in:
Manuel Astudillo
2022-02-23 12:06:42 +08:00
committed by GitHub
parent 848378dbda
commit d3b91386e3
4 changed files with 89 additions and 96 deletions
+1 -1
View File
@@ -5,7 +5,7 @@ module.exports.Messages = {
RETRY_JOB_IS_LOCKED: "Couldn't retry job: The job is locked",
RETRY_JOB_NOT_FAILED:
"Couldn't retry job: The job has been already retried or has not failed",
MISSING_REDIS_OPTS: `Using a redis instance with enableReadyCheck or maxRetriesPerRequest is not permitted.
MISSING_REDIS_OPTS: `Using a redis instance with enableReadyCheck or maxRetriesPerRequest for bclient/subscriber is not permitted.
see https://github.com/OptimalBits/bull/issues/1873
`
};
+25 -15
View File
@@ -1,6 +1,6 @@
'use strict';
const redis = require('ioredis');
const Redis = require('ioredis');
const EventEmitter = require('events');
const _ = require('lodash');
@@ -120,9 +120,8 @@ const Queue = function Queue(name, url, opts) {
this.token = uuid.v4();
opts.redis = {
...opts.redis,
maxRetriesPerRequest: null,
enableReadyCheck: false
enableReadyCheck: false,
...opts.redis
};
_.defaults(opts.redis, {
@@ -272,7 +271,11 @@ function redisClientGetter(queue, options, initCallback) {
const createClient = _.isFunction(options.createClient)
? options.createClient
: function(type, config) {
return new redis(config);
if (['bclient', 'subscriber'].includes(type)) {
return new Redis({ ...config, maxRetriesPerRequest: null });
} else {
return new Redis(config);
}
};
const connections = {};
@@ -289,7 +292,10 @@ function redisClientGetter(queue, options, initCallback) {
const opts = client.options.redisOptions || client.options;
if (opts.enableReadyCheck || opts.maxRetriesPerRequest) {
if (
['bclient', 'subscriber'].includes(type) &&
(opts.enableReadyCheck || opts.maxRetriesPerRequest)
) {
throw new Error(errors.Messages.MISSING_REDIS_OPTS);
}
@@ -542,13 +548,18 @@ async function redisClientDisconnect(client) {
}
}),
500
).finally(() => {
client.once('error', _reject);
client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
resolve();
}
});
)
.catch(() => {
// Ignore timeout error
})
.finally(() => {
client.once('error', _reject);
client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
resolve();
}
});
}).finally(() => {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
@@ -572,8 +583,7 @@ Queue.prototype.close = function(doNotWaitJobs) {
return (this.closing = this.isReady()
.then(this._initializingProcess)
.catch(err => {
console.error(err);
.catch(() => {
isReady = false;
})
.then(() => isReady && this.pause(true, doNotWaitJobs))
+47 -66
View File
@@ -3,19 +3,15 @@
const expect = require('expect.js');
const utils = require('./utils');
const { isRedisReady } = require('../lib/utils');
const redis = require('ioredis');
const Redis = require('ioredis');
const Queue = require('../lib/queue');
describe('connection', () => {
let queue;
let client;
beforeEach(() => {
client = new redis();
return client.flushdb().then(() => {
queue = utils.buildQueue();
return queue;
});
client = new Redis();
return client.flushdb();
});
afterEach(() => {
@@ -24,55 +20,41 @@ describe('connection', () => {
it('should fail if reusing connections with invalid options', () => {
const errMsg = Queue.ErrorMessages.MISSING_REDIS_OPTS;
{
try {
const client = new redis();
const opts = {
createClient(type) {
switch (type) {
case 'client':
return client;
default:
return new redis();
}
}
};
utils.buildQueue('external connections', opts);
throw new Error('should fail with invalid redis options');
} catch (err) {
expect(err.message).to.be.equal(errMsg);
}
}
{
const subscriber = new redis();
const client = new Redis();
const opts = {
createClient(type) {
switch (type) {
case 'subscriber':
return subscriber;
default:
return new redis({
maxRetriesPerRequest: null,
enableReadyCheck: false
});
}
const opts = {
createClient(type) {
switch (type) {
case 'client':
return client;
default:
return new Redis();
}
};
const testQueue = utils.buildQueue('external connections', opts);
try {
testQueue.on('global:completed', () => {});
} catch (err) {
expect(err.message).to.be.equal(errMsg);
testQueue.close();
}
};
const queue = utils.buildQueue('external connections', opts);
expect(queue).to.be.ok();
try {
// eslint-disable-next-line no-unused-vars
const _ = queue.bclient;
throw new Error('should fail with invalid redis options');
} catch (err) {
expect(err.message).to.be.equal(errMsg);
}
try {
// eslint-disable-next-line no-unused-vars
const _ = queue.eclient;
throw new Error('should fail with invalid redis options');
} catch (err) {
expect(err.message).to.be.equal(errMsg);
}
});
it('should recover from a connection loss', async () => {
const queue = utils.buildQueue();
queue.on('error', () => {
// error event has to be observed or the exception will bubble up
});
@@ -104,6 +86,8 @@ describe('connection', () => {
it('should handle jobs added before and after a redis disconnect', done => {
let count = 0;
const queue = utils.buildQueue();
queue
.process((job, jobDone) => {
if (count == 0) {
@@ -141,8 +125,8 @@ describe('connection', () => {
enableReadyCheck: false
};
const client = new redis(redisOpts);
const subscriber = new redis(redisOpts);
const client = new Redis(redisOpts);
const subscriber = new Redis(redisOpts);
const opts = {
createClient(type) {
@@ -152,7 +136,7 @@ describe('connection', () => {
case 'subscriber':
return subscriber;
default:
return new redis();
return new Redis();
}
}
};
@@ -184,31 +168,28 @@ describe('connection', () => {
});
});
it('should fail if redis connection fails and does not reconnect', done => {
queue = utils.buildQueue('connection fail', {
it('should fail if redis connection fails and does not reconnect', async () => {
const queue = utils.buildQueue('connection fail 123', {
redis: {
host: 'localhost',
port: 1234,
retryStrategy: () => false
}
});
isRedisReady(queue.client).then(
() => {
done(new Error('Did not fail connecting to invalid redis instance'));
},
err => {
expect(err.code).to.be.eql('ECONNREFUSED');
queue.close().then(done, done);
}
);
try {
await isRedisReady(queue.client);
new Error('Did not fail connecting to invalid redis instance');
} catch (err) {
expect(err.code).to.be.eql('ECONNREFUSED');
await queue.close();
}
});
it('should close cleanly if redis connection fails', async () => {
queue = utils.buildQueue('connection fail', {
const queue = new Queue('connection fail', {
redis: {
host: 'localhost',
port: 1234,
port: 1235,
retryStrategy: () => false
}
});
@@ -217,7 +198,7 @@ describe('connection', () => {
});
it('should accept ioredis options on the query string', async () => {
queue = new Queue(
const queue = new Queue(
'connection query string',
'redis://localhost?tls=RedisCloudFixed'
);
+16 -14
View File
@@ -294,6 +294,8 @@ describe('Queue', () => {
return client;
case 'subscriber':
return subscriber;
case 'bclient':
return new redis({ ...opts, ...redisOpts });
default:
return new redis(opts);
}
@@ -615,15 +617,15 @@ describe('Queue', () => {
describe('.retryJobs', () => {
it('should retry all failed jobs', async () => {
const jobCount = 8;
let fail = true;
queue.process(async () => {
await delay(10);
if (fail) {
throw new Error('failed');
}
if (fail) {
throw new Error('failed');
}
});
let order = 0;
const failing = new Promise(resolve => {
queue.on('failed', job => {
@@ -634,16 +636,16 @@ describe('Queue', () => {
order++;
});
});
for (const index of Array.from(Array(jobCount).keys())) {
await queue.add({ idx: index });
}
await failing;
const failedCount = await queue.getJobCounts('failed');
expect(failedCount.failed).to.be.equal(jobCount);
order = 0;
const completing = new Promise(resolve => {
queue.on('completed', job => {
@@ -654,17 +656,17 @@ describe('Queue', () => {
order++;
});
});
fail = false;
await queue.retryJobs({ count: 2 });
await completing;
const CompletedCount = await queue.getJobCounts('completed');
expect(CompletedCount.completed).to.be.equal(jobCount);
});
});
});
it('should keep specified number of jobs after completed with removeOnComplete', async () => {
const keepJobs = 3;
await testRemoveOnFinish(keepJobs, keepJobs);