rearranged and fixed some unit tests

This commit is contained in:
Manuel Astudillo
2015-01-27 10:41:01 +01:00
parent 11bc03e784
commit 4cc8bd2f2b
+171 -172
View File
@@ -67,87 +67,89 @@ describe('Queue', function(){
});
});
it('creates a queue with standard redis opts', function(done){
queue = Queue('standard');
describe('instantiation', function(){
it('should create a queue with standard redis opts', function(done){
queue = Queue('standard');
queue.once('ready', function(){
expect(queue.client.host).to.be('127.0.0.1');
expect(queue.bclient.host).to.be('127.0.0.1');
queue.once('ready', function(){
expect(queue.client.host).to.be('127.0.0.1');
expect(queue.bclient.host).to.be('127.0.0.1');
expect(queue.client.port).to.be(6379);
expect(queue.bclient.port).to.be(6379);
expect(queue.client.port).to.be(6379);
expect(queue.bclient.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
done();
done();
});
});
});
it('creates a queue using the supplied redis DB', function(done){
queue = Queue('custom', {redis: {DB: 1}});
it('creates a queue using the supplied redis DB', function(done){
queue = Queue('custom', {redis: {DB: 1}});
queue.once('ready', function(){
expect(queue.client.host).to.be('127.0.0.1');
expect(queue.bclient.host).to.be('127.0.0.1');
queue.once('ready', function(){
expect(queue.client.host).to.be('127.0.0.1');
expect(queue.bclient.host).to.be('127.0.0.1');
expect(queue.client.port).to.be(6379);
expect(queue.bclient.port).to.be(6379);
expect(queue.client.port).to.be(6379);
expect(queue.bclient.port).to.be(6379);
expect(queue.client.selected_db).to.be(1);
expect(queue.bclient.selected_db).to.be(1);
expect(queue.client.selected_db).to.be(1);
expect(queue.bclient.selected_db).to.be(1);
done();
done();
});
});
});
it('creates a queue using custom the supplied redis host', function(done){
queue = Queue('custom', {redis: {host: 'localhost'}});
it('creates a queue using custom the supplied redis host', function(done){
queue = Queue('custom', {redis: {host: 'localhost'}});
queue.once('ready', function(){
expect(queue.client.host).to.be('localhost');
expect(queue.bclient.host).to.be('localhost');
queue.once('ready', function(){
expect(queue.client.host).to.be('localhost');
expect(queue.bclient.host).to.be('localhost');
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
done();
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
done();
});
});
});
it('creates a queue with dots in its name', function(){
queue = Queue('using. dots. in.name.');
it('creates a queue with dots in its name', function(){
queue = Queue('using. dots. in.name.');
return queue.add({foo: 'bar'}).then(function(job){
return queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.foo).to.be('bar')
})
.then(function(){
}).then(function(){
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
jobDone();
});
});
});
});
it('should recover from a connection loss', function(done){
queue = Queue('test connection loss');
queue.on('error', function(err){
// error event has to be observed or the exception will bubble up
}).process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
jobDone();
done();
describe('connection', function(){
it('should recover from a connection loss', function(done){
queue = Queue('test connection loss');
queue.on('error', function(err){
// error event has to be observed or the exception will bubble up
}).process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
jobDone();
done();
});
// Simulate disconnect
queue.bclient.stream.end();
queue.bclient.emit('error', new Error('ECONNRESET'));
// add something to the queue
queue.add({'foo': 'bar'});
});
// Simulate disconnect
queue.bclient.stream.end();
queue.bclient.emit('error', new Error('ECONNRESET'));
// add something to the queue
queue.add({'foo': 'bar'});
});
it('should reconnect when the blocking client triggers an "end" event', function (done) {
it('should reconnect when the blocking client triggers an "end" event', function (done) {
queue = buildQueue();
var runSpy = sandbox.spy(queue, 'run');
@@ -161,22 +163,24 @@ describe('Queue', function(){
queue.add({'foo': 'bar'});
queue.bclient.emit('end');
});
it('should not try to reconnect when the blocking client triggers an "end" event and no process have been called', function (done) {
queue = buildQueue();
var runSpy = sandbox.spy(queue, 'run');
queue.bclient.emit('end');
setTimeout(function() {
expect(runSpy.callCount).to.be(0);
done()
}, 100)
});
});
it('should not try to reconnect when the blocking client triggers an "end" event and no process have been called', function (done) {
queue = buildQueue();
var runSpy = sandbox.spy(queue, 'run');
queue.bclient.emit('end');
setTimeout(function() {
expect(runSpy.callCount).to.be(0);
done()
}, 100)
});
it('process a job', function(done){
describe(' a worker', function(){
it('should process a job', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
@@ -476,91 +480,6 @@ describe('Queue', function(){
}
});
it('count added, unprocessed jobs', function(){
var counter = 1;
var maxJobs = 100;
var added = [];
queue = buildQueue();
for(var i=1; i<=maxJobs; i++){
added.push(queue.add({foo: 'bar', num: i}));
}
return Promise.all(added)
.then(queue.count.bind(queue))
.then(function(count){
expect(count).to.be(100);
})
.then(queue.empty.bind(queue))
.then(queue.count.bind(queue))
.then(function(count){
expect(count).to.be(0);
});
});
it('add jobs to a paused queue', function(done){
var ispaused = false, counter = 2;
queue = buildQueue();
queue.process(function(job, jobDone){
expect(ispaused).to.be(false);
expect(job.data.foo).to.be.equal('paused');
jobDone();
counter--;
if(counter === 0) done();
});
queue.pause();
ispaused = true;
queue.add({foo: 'paused'});
queue.add({foo: 'paused'});
setTimeout(function(){
ispaused = false;
queue.resume();
}, 100); // We hope that this was enough to trigger a process if
// we were not paused.
});
it('paused a running queue', function(done){
var ispaused = false, isresumed = true, first = true;
queue = buildQueue();
queue.process(function(job, jobDone){
expect(ispaused).to.be(false);
expect(job.data.foo).to.be.equal('paused');
jobDone();
if(first){
first = false;
queue.pause();
ispaused = true;
}else{
expect(isresumed).to.be(true);
done();
}
});
queue.add({foo: 'paused'});
queue.add({foo: 'paused'});
queue.on('paused', function(){
setTimeout(function(){
ispaused = false;
queue.resume();
}, 100); // We hope that this was enough to trigger a process if
});
queue.on('resumed', function(){
isresumed = true;
});
});
it('process a lifo queue', function(done){
var currentValue = 0, first = true;
queue = Queue('test lifo');
@@ -595,6 +514,98 @@ describe('Queue', function(){
});
});
});
it('count added, unprocessed jobs', function(){
var counter = 1;
var maxJobs = 100;
var added = [];
queue = buildQueue();
for(var i=1; i<=maxJobs; i++){
added.push(queue.add({foo: 'bar', num: i}));
}
return Promise.all(added)
.then(queue.count.bind(queue))
.then(function(count){
expect(count).to.be(100);
})
.then(queue.empty.bind(queue))
.then(queue.count.bind(queue))
.then(function(count){
expect(count).to.be(0);
});
});
describe(".pause", function(){
it('should pause a queue until resumed', function(){
var ispaused = false, counter = 2;
queue = buildQueue();
var resultPromise = new Promise(function(resolve, reject){
queue.process(function(job, jobDone){
expect(ispaused).to.be(false);
expect(job.data.foo).to.be.equal('paused');
jobDone();
counter--;
if(counter === 0){
resolve();
}
});
});
return Promise.join(queue.pause().then(function(){
ispaused = true;
return queue.add({foo: 'paused'});
}).then(function(){
return queue.add({foo: 'paused'});
}).then(function(){
ispaused = false;
queue.resume();
}), resultPromise);;
})
it('should be able to pause a running queue and emit relevant events', function(done){
var ispaused = false, isresumed = true, first = true;
queue = buildQueue();
queue.empty().then(function(){
queue.process(function(job, jobDone){
expect(ispaused).to.be(false);
expect(job.data.foo).to.be.equal('paused');
jobDone();
if(first){
first = false;
ispaused = true;
queue.pause();
}else{
expect(isresumed).to.be(true);
done();
}
});
queue.add({foo: 'paused'});
queue.add({foo: 'paused'});
queue.on('paused', function(){
ispaused = false;
queue.resume();
});
queue.on('resumed', function(){
isresumed = true;
});
});
});
});
it('should publish a message when a new message is added to the queue', function(done) {
var client = redis.createClient(6379, '127.0.0.1', {});
client.select(0);
@@ -618,37 +629,25 @@ describe('Queue', function(){
queue.process(function(job, jobDone){
jobDone();
});
queue.on('completed', function(){
expect(Date.now() > timestamp + delay);
queue.getWaiting().then(function(jobs){
expect(jobs.length).to.be.equal(0);
}).then(function(){
return queue.getActive().then(function(jobs){
expect(jobs.length).to.be.equal(0);
})
}).then(function(){
return queue.getDelayed().then(function(jobs){
expect(jobs.length).to.be.equal(0);
})
}).then(function(){
return queue.getCompleted().then(function(jobs){
//expect(jobs.length).to.be.equal(1);
//console.log("COMPLETED", jobs)
})
}).then(function(){
return queue.getFailed().then(function(jobs){
expect(jobs.length).to.be.equal(0);
})
}).then(function(){
return queue.empty();
}).then(done)
});
}).then(done);
})
return queue.add({delayed: 'foobar'}, {delay: delay}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.delayed).to.be('foobar')
expect(job.delay).to.be(delay)
})
});
});
it("should process delayed jobs in correct order", function(done){