From d3b91386e30d7205efdc19bcd18fe1e5fefa3542 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Wed, 23 Feb 2022 12:06:42 +0800 Subject: [PATCH] fix: better handling of maxRetriesPerRequest --- lib/errors.js | 2 +- lib/queue.js | 40 ++++++++------ test/test_connection.js | 113 +++++++++++++++++----------------------- test/test_queue.js | 30 ++++++----- 4 files changed, 89 insertions(+), 96 deletions(-) diff --git a/lib/errors.js b/lib/errors.js index c1fbb47..fe2c1fd 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -5,7 +5,7 @@ module.exports.Messages = { RETRY_JOB_IS_LOCKED: "Couldn't retry job: The job is locked", RETRY_JOB_NOT_FAILED: "Couldn't retry job: The job has been already retried or has not failed", - MISSING_REDIS_OPTS: `Using a redis instance with enableReadyCheck or maxRetriesPerRequest is not permitted. + MISSING_REDIS_OPTS: `Using a redis instance with enableReadyCheck or maxRetriesPerRequest for bclient/subscriber is not permitted. see https://github.com/OptimalBits/bull/issues/1873 ` }; diff --git a/lib/queue.js b/lib/queue.js index f9bfd46..2869100 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,6 +1,6 @@ 'use strict'; -const redis = require('ioredis'); +const Redis = require('ioredis'); const EventEmitter = require('events'); const _ = require('lodash'); @@ -120,9 +120,8 @@ const Queue = function Queue(name, url, opts) { this.token = uuid.v4(); opts.redis = { - ...opts.redis, - maxRetriesPerRequest: null, - enableReadyCheck: false + enableReadyCheck: false, + ...opts.redis }; _.defaults(opts.redis, { @@ -272,7 +271,11 @@ function redisClientGetter(queue, options, initCallback) { const createClient = _.isFunction(options.createClient) ? options.createClient : function(type, config) { - return new redis(config); + if (['bclient', 'subscriber'].includes(type)) { + return new Redis({ ...config, maxRetriesPerRequest: null }); + } else { + return new Redis(config); + } }; const connections = {}; @@ -289,7 +292,10 @@ function redisClientGetter(queue, options, initCallback) { const opts = client.options.redisOptions || client.options; - if (opts.enableReadyCheck || opts.maxRetriesPerRequest) { + if ( + ['bclient', 'subscriber'].includes(type) && + (opts.enableReadyCheck || opts.maxRetriesPerRequest) + ) { throw new Error(errors.Messages.MISSING_REDIS_OPTS); } @@ -542,13 +548,18 @@ async function redisClientDisconnect(client) { } }), 500 - ).finally(() => { - client.once('error', _reject); - client.disconnect(); - if (['connecting', 'reconnecting'].includes(client.status)) { - resolve(); - } - }); + ) + .catch(() => { + // Ignore timeout error + }) + .finally(() => { + client.once('error', _reject); + + client.disconnect(); + if (['connecting', 'reconnecting'].includes(client.status)) { + resolve(); + } + }); }).finally(() => { client.removeListener('end', _resolve); client.removeListener('error', _reject); @@ -572,8 +583,7 @@ Queue.prototype.close = function(doNotWaitJobs) { return (this.closing = this.isReady() .then(this._initializingProcess) - .catch(err => { - console.error(err); + .catch(() => { isReady = false; }) .then(() => isReady && this.pause(true, doNotWaitJobs)) diff --git a/test/test_connection.js b/test/test_connection.js index f2acaa2..8d19374 100644 --- a/test/test_connection.js +++ b/test/test_connection.js @@ -3,19 +3,15 @@ const expect = require('expect.js'); const utils = require('./utils'); const { isRedisReady } = require('../lib/utils'); -const redis = require('ioredis'); +const Redis = require('ioredis'); const Queue = require('../lib/queue'); describe('connection', () => { - let queue; let client; beforeEach(() => { - client = new redis(); - return client.flushdb().then(() => { - queue = utils.buildQueue(); - return queue; - }); + client = new Redis(); + return client.flushdb(); }); afterEach(() => { @@ -24,55 +20,41 @@ describe('connection', () => { it('should fail if reusing connections with invalid options', () => { const errMsg = Queue.ErrorMessages.MISSING_REDIS_OPTS; - { - try { - const client = new redis(); - const opts = { - createClient(type) { - switch (type) { - case 'client': - return client; - default: - return new redis(); - } - } - }; - utils.buildQueue('external connections', opts); - throw new Error('should fail with invalid redis options'); - } catch (err) { - expect(err.message).to.be.equal(errMsg); - } - } - { - const subscriber = new redis(); + const client = new Redis(); - const opts = { - createClient(type) { - switch (type) { - case 'subscriber': - return subscriber; - default: - return new redis({ - maxRetriesPerRequest: null, - enableReadyCheck: false - }); - } + const opts = { + createClient(type) { + switch (type) { + case 'client': + return client; + default: + return new Redis(); } - }; - - const testQueue = utils.buildQueue('external connections', opts); - - try { - testQueue.on('global:completed', () => {}); - } catch (err) { - expect(err.message).to.be.equal(errMsg); - testQueue.close(); } + }; + const queue = utils.buildQueue('external connections', opts); + expect(queue).to.be.ok(); + + try { + // eslint-disable-next-line no-unused-vars + const _ = queue.bclient; + throw new Error('should fail with invalid redis options'); + } catch (err) { + expect(err.message).to.be.equal(errMsg); + } + + try { + // eslint-disable-next-line no-unused-vars + const _ = queue.eclient; + throw new Error('should fail with invalid redis options'); + } catch (err) { + expect(err.message).to.be.equal(errMsg); } }); it('should recover from a connection loss', async () => { + const queue = utils.buildQueue(); queue.on('error', () => { // error event has to be observed or the exception will bubble up }); @@ -104,6 +86,8 @@ describe('connection', () => { it('should handle jobs added before and after a redis disconnect', done => { let count = 0; + const queue = utils.buildQueue(); + queue .process((job, jobDone) => { if (count == 0) { @@ -141,8 +125,8 @@ describe('connection', () => { enableReadyCheck: false }; - const client = new redis(redisOpts); - const subscriber = new redis(redisOpts); + const client = new Redis(redisOpts); + const subscriber = new Redis(redisOpts); const opts = { createClient(type) { @@ -152,7 +136,7 @@ describe('connection', () => { case 'subscriber': return subscriber; default: - return new redis(); + return new Redis(); } } }; @@ -184,31 +168,28 @@ describe('connection', () => { }); }); - it('should fail if redis connection fails and does not reconnect', done => { - queue = utils.buildQueue('connection fail', { + it('should fail if redis connection fails and does not reconnect', async () => { + const queue = utils.buildQueue('connection fail 123', { redis: { host: 'localhost', port: 1234, retryStrategy: () => false } }); - - isRedisReady(queue.client).then( - () => { - done(new Error('Did not fail connecting to invalid redis instance')); - }, - err => { - expect(err.code).to.be.eql('ECONNREFUSED'); - queue.close().then(done, done); - } - ); + try { + await isRedisReady(queue.client); + new Error('Did not fail connecting to invalid redis instance'); + } catch (err) { + expect(err.code).to.be.eql('ECONNREFUSED'); + await queue.close(); + } }); it('should close cleanly if redis connection fails', async () => { - queue = utils.buildQueue('connection fail', { + const queue = new Queue('connection fail', { redis: { host: 'localhost', - port: 1234, + port: 1235, retryStrategy: () => false } }); @@ -217,7 +198,7 @@ describe('connection', () => { }); it('should accept ioredis options on the query string', async () => { - queue = new Queue( + const queue = new Queue( 'connection query string', 'redis://localhost?tls=RedisCloudFixed' ); diff --git a/test/test_queue.js b/test/test_queue.js index 0ccacb5..87fc4ea 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -294,6 +294,8 @@ describe('Queue', () => { return client; case 'subscriber': return subscriber; + case 'bclient': + return new redis({ ...opts, ...redisOpts }); default: return new redis(opts); } @@ -615,15 +617,15 @@ describe('Queue', () => { describe('.retryJobs', () => { it('should retry all failed jobs', async () => { const jobCount = 8; - + let fail = true; queue.process(async () => { await delay(10); - if (fail) { - throw new Error('failed'); - } + if (fail) { + throw new Error('failed'); + } }); - + let order = 0; const failing = new Promise(resolve => { queue.on('failed', job => { @@ -634,16 +636,16 @@ describe('Queue', () => { order++; }); }); - + for (const index of Array.from(Array(jobCount).keys())) { await queue.add({ idx: index }); } - + await failing; - + const failedCount = await queue.getJobCounts('failed'); expect(failedCount.failed).to.be.equal(jobCount); - + order = 0; const completing = new Promise(resolve => { queue.on('completed', job => { @@ -654,17 +656,17 @@ describe('Queue', () => { order++; }); }); - + fail = false; await queue.retryJobs({ count: 2 }); - + await completing; - + const CompletedCount = await queue.getJobCounts('completed'); expect(CompletedCount.completed).to.be.equal(jobCount); - }); + }); }); - + it('should keep specified number of jobs after completed with removeOnComplete', async () => { const keepJobs = 3; await testRemoveOnFinish(keepJobs, keepJobs);