merged v3.0.0

This commit is contained in:
Manuel Astudillo
2017-05-09 16:30:31 +02:00
8 changed files with 268 additions and 47 deletions
+10
View File
@@ -1,3 +1,13 @@
v.3.0.0-alpha.4
===============
- Implemented repeatable jobs. #252.
v.3.0.0-alpha.3
===============
- Simplified global events #501.
v.3.0.0-alpha.2
===============
+12 -3
View File
@@ -35,9 +35,11 @@ Features:
- Minimal CPU usage by poll-free design.
- Robust design based on Redis.
- Delayed jobs.
- Schedule and repeat jobs according to a cron specification.
- Retries.
- Priority.
- Concurrency.
- Multiple job types per queue.
- Pause/resume (globally or locally).
- Automatic recovery from process crashes.
@@ -53,12 +55,9 @@ There are a few third party UIs that can be used for easier administration of th
Roadmap:
--------
- Multiple job types per queue.
- Scheduling jobs as a cron specification.
- Rate limiter for jobs.
- Parent-child jobs relationships.
Install:
--------
@@ -528,6 +527,8 @@ interface JobOpts{
delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both
// server and clients should have their clocks synchronized. [optional].
repeat: RepeatOpts; // Define repeat options for adding jobs according to a Cron specification.
attempts: number; // The total number of attempts to try the job until it completes.
backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails
@@ -549,6 +550,14 @@ interface JobOpts{
}
```
```typescript
interface RepeatOpts{
cron: string; // Cron expression. See https://github.com/harrisiirak/cron-parser for details.
endDate?: number | Date; // Stop repeating jobs after this date.
tz?: string; // Timezone. For example: 'Europe/Athens'
}
```
```typescript
interface BackoffOpts{
type: string; // Backoff type, which can be either `fixed` or `exponential`
-3
View File
@@ -6,8 +6,6 @@
expiration time. The worker is responsible of keeping the lock fresh
so that no other worker picks this job again.
Note: This command only works in non-distributed redis deployments.
Input:
KEYS[1] wait key
KEYS[2] active key
@@ -35,4 +33,3 @@ if jobId then
return {redis.call("HGETALL", jobKey), jobId} -- get job data
end
+3 -2
View File
@@ -1,5 +1,6 @@
--[[
Updates the delay set
Updates the delay set, by picking a delayed job that should
be processed now.
Input:
KEYS[1] 'delayed'
@@ -18,7 +19,7 @@ local jobId = RESULT[1]
local score = RESULT[2]
if (score ~= nil) then
score = score / 0x1000
if (score <= tonumber(ARGV[2])) then
if (math.floor(score) <= tonumber(ARGV[2])) then
redis.call("ZREM", KEYS[1], jobId)
redis.call("LREM", KEYS[2], 0, jobId)
redis.call("LPUSH", KEYS[3], jobId) -- not sure if it is better to move the job at the begining of the queue with LPUSH
+89 -34
View File
@@ -19,6 +19,10 @@ var uuid = require('uuid');
var commands = require('./commands/');
Promise.config({
cancellation: true
});
/**
Gets or creates a new Queue with the given name.
@@ -140,7 +144,7 @@ var Queue = function Queue(name, url, opts){
this.eclient = createClient('subscriber', redisOpts);
this.handlers = {};
this.delayTimer = null;
this.delayTimer = Promise.resolve();
this.processing = [];
this.retrieving = 0;
@@ -246,33 +250,16 @@ Queue.prototype.off = Queue.prototype.removeListener;
Queue.prototype._init = function(name){
var _this = this;
var initializers = [this.client, this.eclient].map(function (client) {
var _resolve, errorHandler;
return new Promise(function(resolve, reject) {
_resolve = resolve;
errorHandler = function(err){
if(err.code !== 'ECONNREFUSED'){
reject(err);
}
};
client.once('ready', resolve);
client.on('error', errorHandler);
}).finally(function(){
client.removeListener('ready', _resolve);
client.removeListener('error', errorHandler);
});
});
this._initializing = Promise.all(initializers).then(function(){
return _this.eclient.psubscribe(_this.toKey('') + '*');
}).then(function(){
return commands(_this.client);
}).then(function(){
debuglog(name + ' queue ready');
}, function(err){
_this.emit('error', err, 'Error initializing queue');
throw err;
});
this._initializing = _this.eclient.psubscribe(_this.toKey('') + '*')
.then(function(){
return commands(_this.client);
}).then(function(){
debuglog(name + ' queue ready');
}, function(err){
_this.emit('error', err, 'Error initializing queue');
throw err;
});
};
Queue.prototype._setupQueueEventListeners = function(){
@@ -375,7 +362,7 @@ Queue.prototype.close = function( doNotWaitJobs ){
_.each(_this.errorRetryTimer, function(timer){
clearTimeout(timer);
});
clearTimeout(_this.delayTimer);
_this.delayTimer.cancel();
clearInterval(_this.guardianTimer);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
_this.timers.clearAll();
@@ -423,6 +410,58 @@ Queue.prototype.process = function(name, concurrency, handler){
});
};
//
// This code will be called everytime a job is going to be processed if the job has a repeat option. (from delay -> active).
//
var parser = require('cron-parser');
function nextRepeatableJob(queue, name, data, opts){
var repeat = opts.repeat;
var repeatKey = queue.toKey('repeat') + ':' + name + ':' + repeat.cron;
//
// Get millis for this repeatable job.
//
return queue.client.get(repeatKey).then(function(millis){
if(millis){
return parseInt(millis);
}else{
return Date.now();
}
}).then(function(millis){
var interval = parser.parseExpression(repeat.cron, _.defaults({
currentDate: new Date(millis)
}, repeat));
var nextMillis;
try{
nextMillis = interval.next();
} catch(e){
// Ignore error
}
if(nextMillis){
nextMillis = nextMillis.getTime();
var delay = nextMillis - millis;
//
// Generate unique job id for this iteration.
//
var customId = 'repeat:' + name + ':' + nextMillis;
//
// Set key and add job should be atomic.
//
return queue.client.set(repeatKey, nextMillis).then(function(){
return Job.create(queue, name, data, _.extend(_.clone(opts), {
jobId: customId,
delay: delay < 0 ? 0 : delay,
timestamp: Date.now()
}));
});
}
});
};
Queue.prototype.start = function(concurrency){
var _this = this;
return this.run(concurrency).catch(function(err){
@@ -449,6 +488,11 @@ Queue.prototype.setHandler = function(name, handler){
interface JobOptions
{
attempts: number;
repeat: {
tz?: string,
endDate?: Date | string | number
}
}
*/
@@ -459,7 +503,11 @@ interface JobOptions
@param opts: JobOptions Options for this job.
*/
Queue.prototype.add = function(name, data, opts){
return Job.create(this, name, data, opts);
if(opts && opts.repeat){
return nextRepeatableJob(this, name || DEFAULT_JOB_NAME, data, opts);
}else{
return Job.create(this, name, data, opts);
}
};
/**
@@ -588,14 +636,15 @@ Queue.prototype.run = function(concurrency){
*/
Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
var _this = this;
newDelayedTimestamp = Math.round(newDelayedTimestamp);
if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){
clearTimeout(this.delayTimer);
this.delayTimer.cancel();
this.delayedTimestamp = newDelayedTimestamp;
var nextDelayedJob = newDelayedTimestamp - Date.now();
nextDelayedJob = nextDelayedJob < 0 ? 0 : nextDelayedJob;
var delay = nextDelayedJob <= 0 ? Promise.resolve() : Promise.delay(nextDelayedJob);
this.delayTimer = setTimeout(function(){
this.delayTimer = delay.then(function(){
scripts.updateDelaySet(_this, _this.delayedTimestamp).then(function(nextTimestamp){
if(nextTimestamp){
nextTimestamp = nextTimestamp < Date.now() ? Date.now() : nextTimestamp;
@@ -607,7 +656,7 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){
_this.emit('error', err, 'Error updating the delay timer');
});
_this.delayedTimestamp = Number.MAX_VALUE;
}, nextDelayedJob);
});
}
};
@@ -791,7 +840,13 @@ Queue.prototype.getNextJob = function() {
return scripts.moveToActive(this).spread(function(jobData, jobId){
if(jobData){
return Job.fromData(_this, jobData, jobId);
var job = Job.fromData(_this, jobData, jobId);
if(job.opts.repeat){
return nextRepeatableJob(_this, job.name, job.data, job.opts).then(function(){
return job;
});
}
return job;
}else{
return newJobs;
}
+17 -3
View File
@@ -1,6 +1,10 @@
{
"name": "bull",
<<<<<<< HEAD
"version": "3.0.0-alpha.3",
=======
"version": "3.0.0-alpha.4",
>>>>>>> 9fa6d0445c5c3cf5d09d049ea45d0923871adb58
"description": "Job manager",
"main": "./lib/queue",
"repository": {
@@ -18,16 +22,18 @@
"readmeFilename": "README.md",
"dependencies": {
"bluebird": "^3.5.0",
"cron-parser": "^2.4.0",
"debuglog": "^1.0.0",
"ioredis": "^3.0.0-1",
"ioredis": "^3.0.0-2",
"lodash": "^4.17.4",
"semver": "^5.3.0",
"uuid": "^3.0.1"
},
"devDependencies": {
"chai": "^3.5.0",
"eslint": "^2.13.1",
"expect.js": "^0.3.1",
"mocha": "^2.5.3",
"mocha": "^3.3.0",
"sinon": "^1.17.7"
},
"scripts": {
@@ -52,7 +58,15 @@
"camelcase": 1,
"no-unused-vars": 1,
"no-alert": 1,
"no-console": [2, {"allow": ["warn", "error"]}],
"no-console": [
2,
{
"allow": [
"warn",
"error"
]
}
],
"quotes": [
2,
"single"
+2 -2
View File
@@ -97,7 +97,7 @@ describe('Job', function(){
});
});
it('fails to remove a locked job', function(done) {
it('fails to remove a locked job', function() {
return Job.create(queue, 1, {foo: 'bar'}).then(function(job) {
return job.takeLock().then(function(lock) {
expect(lock).to.be.truthy;
@@ -108,7 +108,7 @@ describe('Job', function(){
}).then(function() {
throw new Error('Should not be able to remove a locked job');
}).catch(function(/*err*/) {
done();
// Good!
});
});
});
+135
View File
@@ -0,0 +1,135 @@
/*eslint-env node */
'use strict';
var expect = require('chai').expect;
var utils = require('./utils');
var sinon = require('sinon');
var redis = require('ioredis');
var ONE_SECOND = 1000;
var ONE_MINUTE = 60 * ONE_SECOND;
var ONE_HOUR = 60 * ONE_MINUTE;
var ONE_DAY = 24 * ONE_HOUR;
var ONE_MONTH = 31 * ONE_DAY;
describe('repeat', function () {
var sandbox = sinon.sandbox.create();
var queue;
beforeEach(function(){
this.clock = sinon.useFakeTimers();
var client = new redis();
return client.flushdb().then(function(){
queue = utils.buildQueue('repeat', {settings: {
guardInterval: Number.MAX_VALUE,
stalledInterval: Number.MAX_VALUE
}});
});
});
afterEach(function(){
this.clock.restore();
return queue.close();
});
it('should repeat every 2 seconds', function (done) {
var _this = this;
var date = new Date('2017-02-07 9:24:00');
this.clock.tick(date.getTime());
var nextTick = 2 * ONE_SECOND + 500;
queue.add('repeat', {foo: 'bar'}, { repeat: {cron: '*/2 * * * * *'}}).then(function(){
_this.clock.tick(nextTick);
});
queue.process('repeat', function(){
// dummy
});
var prev;
var counter = 0;
queue.on('completed', function(job){
_this.clock.tick(nextTick);
if(prev){
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(2000);
}
prev = job;
counter ++;
if(counter == 20){
done();
}
});
});
it('should repeat once a day for 5 days', function (done) {
var _this = this;
//this.timeout(50000);
var date = new Date('2017-05-05 13:12:00');
this.clock.tick(date.getTime());
var nextTick = ONE_DAY;
queue.add('repeat', {foo: 'bar'}, {repeat: {
cron: '0 1 * * *',
endDate: new Date('2017-05-10 13:12:00')}
}).then(function(){
_this.clock.tick(nextTick);
});
queue.process('repeat', function(){
// Dummy
});
var prev;
var counter = 0;
queue.on('completed', function(job){
_this.clock.tick(nextTick);
if(prev){
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(ONE_DAY);
}
prev = job;
counter ++;
if(counter == 5){
queue.getWaiting().then(function(jobs){
expect(jobs.length).to.be.zero;
queue.getDelayed().then(function(jobs){
expect(jobs.length).to.be.zero;
done();
});
});
}
});
});
it('should repeat 7:th day every month at 9:25', function (done) {
var _this = this;
var date = new Date('2017-02-02 7:21:42');
this.clock.tick(date.getTime());
queue.add('repeat', {foo: 'bar'}, { repeat: {cron: '* 25 9 7 * *'}}).then(function(){
_this.clock.tick(ONE_MONTH);
});
queue.process('repeat', function(){
// Dummy
});
var counter = 20;
var prev;
queue.on('completed', function(job){
if(prev){
expect(prev.timestamp).to.be.lt(job.timestamp);
expect(job.timestamp - prev.timestamp).to.be.gte(ONE_MONTH);
}
prev = job;
counter --;
if(counter == 0){
done();
}
_this.clock.tick(ONE_MONTH);
});
});
});