Files
bull/test/test_pause.js
Manuel Astudillo 3ade8e6727 fix: force options to guarantee correct reconnects
BREAKING CHANGE:
If redis opts are missing:
 { maxRetriesPerRequest: null,
   enableReadyCheck: false }
then a exception will be thrown.
2021-10-27 18:42:38 +08:00

436 lines
12 KiB
JavaScript

'use strict';
const Queue = require('../');
const expect = require('chai').expect;
const redis = require('ioredis');
const utils = require('./utils');
const delay = require('delay');
const sinon = require('sinon');
describe('.pause', () => {
let client;
beforeEach(() => {
client = new redis({
maxRetriesPerRequest: null,
enableReadyCheck: false
});
return client.flushdb();
});
afterEach(async () => {
sinon.restore();
await utils.cleanupQueues();
await client.flushdb();
return client.quit();
});
describe('globally', () => {
it('should pause a queue until resumed', () => {
let ispaused = false,
counter = 2;
return utils.newQueue().then(queue => {
const resultPromise = new Promise(resolve => {
queue.process((job, jobDone) => {
expect(ispaused).to.be.eql(false);
expect(job.data.foo).to.be.equal('paused');
jobDone();
counter--;
if (counter === 0) {
resolve(queue.close());
}
});
});
return Promise.all([
queue
.pause()
.then(() => {
ispaused = true;
return queue.isPaused().then(paused => {
expect(paused).to.be.equal(true);
return queue.add({ foo: 'paused' });
});
})
.then(() => {
return queue.add({ foo: 'paused' });
})
.then(() => {
ispaused = false;
return queue.resume().then(() => {
return queue.isPaused().then(paused => {
expect(paused).to.be.equal(false);
});
});
}),
resultPromise
]);
});
});
it('should be able to pause a running queue and emit relevant events', done => {
let ispaused = false,
isresumed = true,
first = true;
utils.newQueue().then(queue => {
queue.process(job => {
expect(ispaused).to.be.eql(false);
expect(job.data.foo).to.be.equal('paused');
if (first) {
first = false;
ispaused = true;
return queue.pause();
} else {
expect(isresumed).to.be.eql(true);
queue.close().then(done, done);
}
});
queue.add({ foo: 'paused' });
queue.add({ foo: 'paused' });
queue.on('paused', () => {
ispaused = false;
queue.resume().catch((/*err*/) => {
// Swallow error.
});
});
queue.on('resumed', () => {
isresumed = true;
});
});
});
it('should not processed delayed jobs', function(done) {
this.timeout(5000);
const queue = new Queue('pause-test');
queue.process(() => {
done(new Error('should not process delayed jobs in paused queue.'));
});
queue.pause().then(() => {
queue
.add(
{},
{
delay: 500
}
)
.then(() => {
return queue.getJobCounts();
})
.then(counts => {
expect(counts).to.have.property('paused', 0);
expect(counts).to.have.property('waiting', 0);
expect(counts).to.have.property('delayed', 1);
return delay(1000);
})
.then(() => {
return queue.getJobCounts();
})
.then(counts => {
expect(counts).to.have.property('paused', 1);
expect(counts).to.have.property('waiting', 0);
done();
});
});
});
});
describe('locally', () => {
it('should pause the queue locally', done => {
let counter = 2;
const queue = utils.buildQueue();
queue
.pause(true /* Local */)
.then(() => {
return queue.isPaused(true).then(paused => {
expect(paused).to.be.equal(true);
});
})
.then(() => {
// Add the worker after the queue is in paused mode since the normal behavior is to pause
// it after the current lock expires. This way, we can ensure there isn't a lock already
// to test that pausing behavior works.
queue
.process((job, jobDone) => {
expect(queue.paused).not.to.be.ok;
jobDone();
counter--;
if (counter === 0) {
queue.close().then(done);
}
})
.catch(done);
})
.then(() => {
return queue.add({ foo: 'paused' });
})
.then(() => {
return queue.add({ foo: 'paused' });
})
.then(() => {
expect(counter).to.be.eql(2);
expect(queue.paused).to.be.ok; // Parameter should exist.
return queue.resume(true /* Local */);
})
.then(() => {
return queue.isPaused(true).then(paused => {
expect(paused).to.be.equal(false);
});
})
.catch(done);
});
it('should wait until active jobs are finished before resolving pause', done => {
const queue = utils.buildQueue();
const startProcessing = new Promise(resolve => {
queue.process((/*job*/) => {
resolve();
return delay(200);
});
});
queue.isReady().then(() => {
const jobs = [];
for (let i = 0; i < 10; i++) {
jobs.push(queue.add(i));
}
//
// Add start processing so that we can test that pause waits for this job to be completed.
//
jobs.push(startProcessing);
Promise.all(jobs)
.then(() => {
return queue
.pause(true)
.then(() => {
const active = queue
.getJobCountByTypes(['active'])
.then(count => {
expect(count).to.be.eql(0);
expect(queue.paused).to.be.ok;
return null;
});
// One job from the 10 posted above will be processed, so we expect 9 jobs pending
const paused = queue
.getJobCountByTypes(['delayed', 'wait'])
.then(count => {
expect(count).to.be.eql(9);
return null;
});
return Promise.all([active, paused]);
})
.then(() => {
return queue.add({});
})
.then(() => {
const active = queue
.getJobCountByTypes(['active'])
.then(count => {
expect(count).to.be.eql(0);
return null;
});
const paused = queue
.getJobCountByTypes(['paused', 'wait', 'delayed'])
.then(count => {
expect(count).to.be.eql(10);
return null;
});
return Promise.all([active, paused]);
})
.then(() => {
return queue.close().then(done, done);
});
})
.catch(done);
});
});
it('should pause the queue locally when more than one worker is active', () => {
const queue1 = utils.buildQueue('pause-queue');
const queue1IsProcessing = new Promise(resolve => {
queue1.process((job, jobDone) => {
resolve();
setTimeout(jobDone, 200);
});
});
const queue2 = utils.buildQueue('pause-queue');
const queue2IsProcessing = new Promise(resolve => {
queue2.process((job, jobDone) => {
resolve();
setTimeout(jobDone, 200);
});
});
queue1.add(1);
queue1.add(2);
queue1.add(3);
queue1.add(4);
return Promise.all([queue1IsProcessing, queue2IsProcessing]).then(() => {
return Promise.all([
queue1.pause(true /* local */),
queue2.pause(true /* local */)
]).then(() => {
const active = queue1.getJobCountByTypes(['active']).then(count => {
expect(count).to.be.eql(0);
});
const pending = queue1.getJobCountByTypes(['wait']).then(count => {
expect(count).to.be.eql(2);
});
const completed = queue1
.getJobCountByTypes(['completed'])
.then(count => {
expect(count).to.be.eql(2);
});
return Promise.all([active, pending, completed]).then(() => {
return Promise.all([queue1.close(), queue2.close()]);
});
});
});
});
it('should wait for blocking job retrieval to complete before pausing', () => {
const queue = utils.buildQueue();
const startsProcessing = new Promise(resolve => {
queue.process((/*job*/) => {
resolve();
return delay(200);
});
});
return queue
.add(1)
.then(() => {
return startsProcessing;
})
.then(() => {
return queue.pause(true);
})
.then(() => {
return queue.add(2);
})
.then(() => {
const active = queue.getJobCountByTypes(['active']).then(count => {
expect(count).to.be.eql(0);
});
const pending = queue.getJobCountByTypes(['wait']).then(count => {
expect(count).to.be.eql(1);
});
const completed = queue
.getJobCountByTypes(['completed'])
.then(count => {
expect(count).to.be.eql(1);
});
return Promise.all([active, pending, completed]).then(() => {
return queue.close();
});
});
});
it('should not initialize blocking client if not already initialized', async () => {
const createClient = sinon.spy(() => client);
const queue = utils.buildQueue('pause-queue', { createClient });
await queue.pause(true);
const bClientCalls = createClient
.getCalls()
.filter(c => c.args[0] === 'bclient');
expect(bClientCalls).to.have.lengthOf(0);
});
it('pauses fast when queue is drained', function(done) {
this.timeout(10000);
const queue = new Queue('test');
queue.process((/*job*/) => {
Promise.resolve();
});
queue.add({});
queue.on('drained', () => {
delay(500).then(() => {
const start = new Date().getTime();
return queue.pause(true).finally(() => {
const finish = new Date().getTime();
expect(finish - start).to.be.lt(1000);
queue.close().then(done, done);
});
});
});
});
describe('with doNotWaitActive=true', () => {
it('should not wait for active jobs to finish', async () => {
const queue = utils.buildQueue();
await queue.add({});
let finishJob;
// wait for us to start processing job
await new Promise(resolve => {
queue.process(() => {
// resolve promise, but continue processing job forever
resolve();
return new Promise(resolve => {
finishJob = resolve;
});
});
});
return queue.pause(true, true).then(() => finishJob());
});
it('should not process new jobs', async () => {
const queue = utils.buildQueue();
// block on brpoplpush
const nextJobPromise = queue.getNextJob();
await queue.pause(true, true);
// Add job. This should not be processed
await queue.add({});
const nextJob = await nextJobPromise;
expect(nextJob).to.equal(
undefined,
'getNextJob should return without getting job'
);
});
it('should not initialize blocking client if not already initialized', async () => {
const createClient = sinon.spy(() => client);
const queue = utils.buildQueue('pause-queue', { createClient });
await queue.pause(true, true);
const bClientCalls = createClient
.getCalls()
.filter(c => c.args[0] === 'bclient');
expect(bClientCalls).to.have.lengthOf(0);
});
});
});
});