1070 Add discard config for rate-limiter

- Meant for Manually Fetching Jobs pattern
This commit is contained in:
Alec Brunelle
2018-10-01 15:16:17 -04:00
parent 9c439d150d
commit b96597cb51
6 changed files with 205 additions and 134 deletions
+1
View File
@@ -2,3 +2,4 @@ node_modules
tmp
coverage
*.rdb
.vscode
+68 -63
View File
@@ -1,6 +1,4 @@
Patterns
========
# Patterns
Here are a few examples of useful patterns that are often implemented with Bull:
@@ -14,28 +12,26 @@ Here are a few examples of useful patterns that are often implemented with Bull:
If you have any other common patterns you want to add, pull request them!
Message Queue
-------------
## Message Queue
Bull can also be used for persistent message queues. This is a quite useful
feature in some use cases. For example, you can have two servers that need to
communicate with each other. By using a queue the servers do not need to be online at the same time, so this creates a very robust communication channel. You can treat `add` as *send* and `process` as *receive*:
feature in some usecases. For example, you can have two servers that need to
communicate with each other. By using a queue the servers do not need to be online at the same time, this creates a very robust communication channel. You can treat `add` as _send_ and `process` as _receive_:
Server A:
```js
var Queue = require('bull');
var sendQueue = new Queue("Server B");
var receiveQueue = new Queue("Server A");
var sendQueue = new Queue('Server B');
var receiveQueue = new Queue('Server A');
receiveQueue.process(function(job, done){
console.log("Received message", job.data.msg);
receiveQueue.process(function(job, done) {
console.log('Received message', job.data.msg);
done();
});
sendQueue.add({msg:"Hello"});
sendQueue.add({ msg: 'Hello' });
```
Server B:
@@ -43,40 +39,36 @@ Server B:
```js
var Queue = require('bull');
var sendQueue = new Queue("Server A");
var receiveQueue = new Queue("Server B");
var sendQueue = new Queue('Server A');
var receiveQueue = new Queue('Server B');
receiveQueue.process(function(job, done){
console.log("Received message", job.data.msg);
receiveQueue.process(function(job, done) {
console.log('Received message', job.data.msg);
done();
});
sendQueue.add({msg:"World"});
sendQueue.add({ msg: 'World' });
```
Returning Job Completions
-------------------------
## Returning Job Completions
A common pattern is where you have a cluster of queue processors that just process jobs as fast as they can, and some other services that need to take the result of this processors and do something with it, maybe storing results in a database.
The most robust and scalable way to accomplish this is by combining the standard job queue with the message queue pattern: a service sends jobs to the cluster just by opening a job queue and adding jobs to it, and the cluster will start processing as fast as it can. Everytime a job gets completed in the cluster a message is sent to a results message queue with the result data, and this queue is listened by some other service that stores the results in a database.
Reusing Redis Connections
-------------------------
## Reusing Redis Connections
A standard queue requires **3 connections** to the Redis server. In some situations you might want to re-use connections—for example on Heroku where the connection count is restricted. You can do this with the `createClient` option in the `Queue` constructor:
```js
var {REDIS_URL} = process.env
var { REDIS_URL } = process.env;
var Redis = require('ioredis')
var Redis = require('ioredis');
var client = new Redis(REDIS_URL);
var subscriber = new Redis(REDIS_URL);
var opts = {
createClient: function (type) {
createClient: function(type) {
switch (type) {
case 'client':
return client;
@@ -86,14 +78,13 @@ var opts = {
return new Redis();
}
}
}
};
var queueFoo = new Queue('foobar', opts);
var queueQux = new Queue('quxbaz', opts);
```
Redis cluster
-------------
## Redis cluster
Bull internals requires atomic operations that spans different keys. This fact breaks Redis'
rules for cluster configurations. However it is still possible to use a cluster environment
@@ -106,16 +97,15 @@ substring to determine in which hash slot the key will be placed. So to make bul
cluster, just use a queue prefix inside brackets, for example:
```js
var queue = new Queue('cluster', {
prefix: '{myprefix}'
})
var queue = new Queue('cluster', {
prefix: '{myprefix}'
});
```
If you use several queues in the same cluster, you should use different prefixes so that the
queues are evenly placed in the cluster nodes.
Debugging
---------
## Debugging
To see debug statements set or add `bull` to the `NODE_DEBUG` environment variable:
@@ -127,18 +117,17 @@ export NODE_DEBUG=bull
NODE_DEBUG=bull node ./your-script.js
```
Custom backoff strategy
-----------------------
## Custom backoff strategy
When the builtin backoff strategies on retries are not sufficient, a custom strategy can be defined. Custom backoff strategies are defined by a function on the queue. The number of attempts already made to process the job is passed to this function as the first parameter, and the error that the job failed with as the second parameter.
```js
var Queue = require('bull');
var myQueue = new Queue("Server B", {
var myQueue = new Queue('Server B', {
settings: {
backoffStrategies: {
jitter: function (attemptsMade, err) {
jitter: function(attemptsMade, err) {
return 5000 + Math.random() * 500;
}
}
@@ -149,15 +138,19 @@ var myQueue = new Queue("Server B", {
The new backoff strategy can then be specified on the job, using the name defined above:
```js
myQueue.add({foo: 'bar'}, {
attempts: 3,
backoff: {
type: 'jitter'
myQueue.add(
{ foo: 'bar' },
{
attempts: 3,
backoff: {
type: 'jitter'
}
}
});
);
```
You may base your backoff strategy on the error that the job throws:
You may base you backoff strategy on the error that the job throws:
```js
var Queue = require('bull');
@@ -166,7 +159,7 @@ function MySpecificError() {}
var myQueue = new Queue('Server C', {
settings: {
backoffStrategies: {
foo: function (attemptsMade, err) {
foo: function(attemptsMade, err) {
if (err instanceof MySpecificError) {
return 10000;
}
@@ -176,7 +169,7 @@ var myQueue = new Queue('Server C', {
}
});
myQueue.process(function(job, done){
myQueue.process(function(job, done) {
if (job.data.msg === 'Specific Error') {
throw new MySpecificError();
} else {
@@ -184,23 +177,28 @@ myQueue.process(function(job, done){
}
});
myQueue.add({msg: 'Hello'}, {
attempts: 3,
backoff: {
type: 'foo'
myQueue.add(
{ msg: 'Hello' },
{
attempts: 3,
backoff: {
type: 'foo'
}
}
});
);
myQueue.add({msg: 'Specific Error'}, {
attempts: 3,
backoff: {
type: 'foo'
}
});
myQueue.add(
{ msg: 'Specific Error' },
{
attempts: 3,
backoff: {
type: 'foo'
}
}
);
```
Manually fetching jobs
----------------------------------
## Manually fetching jobs
If you want the actual job processing to be done in a seperate repo/service than where `bull` is running, this pattern may be for you.
@@ -209,10 +207,17 @@ Manually transitioning states for jobs can be done with a few simple methods.
1. Adding a job to the 'waiting' queue. Grab the queue and call `add`.
```typescript
import Queue from "bull";
import Queue from 'bull';
const queue = new Queue(description, queueOptions);
queue.add({ random_attr: "random_value" });
const queue = new Queue(description, {
limiter: {
max: 5,
duration: 5000,
discard: true // important
},
...queueOptions
});
queue.add({ random_attr: 'random_value' });
```
2. Pulling a job from 'waiting' and moving it to 'active'.
+65 -62
View File
@@ -1,8 +1,7 @@
Reference
=========
# Reference
- [Queue](#queue)
- [Queue#process](#queueprocess)
- [Queue#add](#queueadd)
- [Queue#pause](#queuepause)
@@ -29,6 +28,7 @@ Reference
- [Queue#getFailed](#queuegetfailed)
- [Job](#job)
- [Job#progress](#jobprogress)
- [Job#getState](#jobgetstate)
- [Job#update](#jobupdate)
@@ -43,9 +43,7 @@ Reference
- [Events](#events)
- [Global events](#global-events)
Queue
-----
## Queue
```ts
Queue(queueName: string, url?: string, opts?: QueueOptions): Queue
@@ -55,8 +53,8 @@ This is the Queue constructor. It creates a new Queue that is persisted in
Redis. Everytime the same queue is instantiated it tries to process all the
old jobs that may exist from a previous unfinished session.
The optional ```url``` argument, allows to specify a redis connection string such as for example:
```redis://mypassword@myredis.server.com:1234```
The optional `url` argument, allows to specify a redis connection string such as for example:
`redis://mypassword@myredis.server.com:1234`
```typescript
interface QueueOptions {
@@ -68,14 +66,17 @@ interface QueueOptions {
}
```
When a job gets hit with the rate limiter, it is moved to the delayed queue.
```typescript
interface RateLimiter {
max: number, // Max number of jobs processed
duration: number, // per duration in milliseconds
max: number; // Max number of jobs processed
duration: number; // per duration in milliseconds
discard: boolean = false; // When jobs get rate limited, nothing happens
}
```
```RedisOpts``` are passed directly to ioredis constructor, check [ioredis](https://github.com/luin/ioredis/blob/master/API.md)
`RedisOpts` are passed directly to ioredis constructor, check [ioredis](https://github.com/luin/ioredis/blob/master/API.md)
for details. We document here just the most important ones.
```typescript
@@ -99,9 +100,9 @@ interface AdvancedSettings {
}
```
__Advanced Settings__
**Advanced Settings**
__Warning:__ Do not override these advanced settings unless you understand the internals of the queue.
**Warning:** Do not override these advanced settings unless you understand the internals of the queue.
`lockDuration`: Time in milliseconds to acquire the job lock. Set this to a higher value if you find that your jobs are being stalled because your job processor is CPU-intensive and blocking the event loop (see note below about stalled jobs). Set this to a lower value if your jobs are extremely time-sensitive and it might be OK if they get double-processed (due to them be falsly considered stalled).
@@ -160,45 +161,48 @@ You can specify a `concurrency` argument. Bull will then call your handler in pa
A process function can also be declared as a separate process. This will make a better use of the available CPU cores
and run the jobs in parallel. This is a perfect way to run blocking code. Just specify an absolute path to a processor module.
i.e. a file exporting the process function like this:
```js
// my-processor.js
module.exports = function(job){
module.exports = function(job) {
// do some job
return value;
}
};
```
You can return a value or a promise to signal that the job has been completed.
A `name` argument can be provided so that multiple process functions can be defined per queue. A named process will only process jobs that matches the given name. However, if you define multiple named process functions in one Queue, the defined concurrency for each process function stacks up for the Queue. See the following examples:
```js
/***
* For each named processor, concurrency stacks up, so any of these three process functions
* can run with a concurrency of 125. To avoid this behaviour you need to create an own queue
* for each process function.
*/
const loadBalancerQueue = new Queue('loadbalancer')
loadBalancerQueue.process('requestProfile', 100, requestProfile)
loadBalancerQueue.process('sendEmail', 25, sendEmail)
loadBalancerQueue.process('sendInvitation', 0, sendInvite)
const loadBalancerQueue = new Queue('loadbalancer');
loadBalancerQueue.process('requestProfile', 100, requestProfile);
loadBalancerQueue.process('sendEmail', 25, sendEmail);
loadBalancerQueue.process('sendInvitation', 0, sendInvite);
const profileQueue = new Queue('profile')
const profileQueue = new Queue('profile');
// Max concurrency for requestProfile is 100
profileQueue.process('requestProfile', 100, requestProfile)
profileQueue.process('requestProfile', 100, requestProfile);
const emailQueue = new Queue('email')
const emailQueue = new Queue('email');
// Max concurrency for sendEmail is 25
emailQueue.process('sendEmail', 25, sendEmail)
emailQueue.process('sendEmail', 25, sendEmail);
```
Specifying `*` as the process name will make it the default processor for all named jobs.
It is frequently used to process all named jobs from one process function:
```js
const differentJobsQueue = new Queue('differentJobsQueue')
differentJobsQueue.process('*', processFunction)
differentJobsQueue.add('jobA', data, opts)
differentJobsQueue.add('jobB', data, opts)
const differentJobsQueue = new Queue('differentJobsQueue');
differentJobsQueue.process('*', processFunction);
differentJobsQueue.add('jobA', data, opts);
differentJobsQueue.add('jobB', data, opts);
```
**Note:** in order to determine whether job completion is signaled by
@@ -208,7 +212,8 @@ So watch out, as the following won't work:
```js
// THIS WON'T WORK!!
queue.process(function(job, done) { // Oops! done callback here!
queue.process(function(job, done) {
// Oops! done callback here!
return Promise.resolve();
});
```
@@ -216,7 +221,8 @@ queue.process(function(job, done) { // Oops! done callback here!
This, however, will:
```js
queue.process(function(job) { // No done callback here :)
queue.process(function(job) {
// No done callback here :)
return Promise.resolve();
});
```
@@ -234,15 +240,15 @@ Creates a new job and adds it to the queue. If the queue is empty the job will b
An optional name can be added, so that only process functions defined for that name (also called job type) will process the job.
**Note:**
You need to define *processors* for all the named jobs that you add to your queue or the queue will complain that you are missing a processor for the given job, unless you use the ```*``` as job name when defining the processor.
You need to define _processors_ for all the named jobs that you add to your queue or the queue will complain that you are missing a processor for the given job, unless you use the `*` as job name when defining the processor.
```typescript
interface JobOpts{
interface JobOpts {
priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
// using priorities has a slight impact on performance, so do not use it if not required.
// using priorities has a slight impact on performance, so do not use it if not required.
delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both
// server and clients should have their clocks synchronized. [optional].
// server and clients should have their clocks synchronized. [optional].
attempts: number; // The total number of attempts to try the job until it completes.
@@ -254,26 +260,25 @@ interface JobOpts{
timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]
jobId: number | string; // Override the job ID - by default, the job ID is a unique
// integer, but you can use this setting to override it.
// If you use this option, it is up to you to ensure the
// jobId is unique. If you attempt to add a job with an id that
// already exists, it will not be added.
// integer, but you can use this setting to override it.
// If you use this option, it is up to you to ensure the
// jobId is unique. If you attempt to add a job with an id that
// already exists, it will not be added.
removeOnComplete: boolean; // If true, removes the job when it successfully
// completes. Default behavior is to keep the job in the completed set.
// completes. Default behavior is to keep the job in the completed set.
removeOnFail: boolean; // If true, removes the job when it fails after all attempts.
// Default behavior is to keep the job in the failed set.
// Default behavior is to keep the job in the failed set.
stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
}
```
```typescript
interface RepeatOpts{
interface RepeatOpts {
cron?: string; // Cron string
tz?: string, // Timezone
startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron).
endDate?: Date | string | number; // End date when the repeat job should stop repeating.
tz?: string; // Timezone
endDate?: Date | string | number; // End data when the repeat job should stop repeating.
limit?: number; // Number of times the job should repeat at max.
every?: number; // Repeat every millis (cron setting cannot be used together with this setting.)
}
@@ -281,9 +286,8 @@ interface RepeatOpts{
More information regarding the [cron expression](https://github.com/harrisiirak/cron-parser)
```typescript
interface BackoffOpts{
interface BackoffOpts {
type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings.
delay: number; // Backoff delay, in milliseconds.
}
@@ -309,7 +313,7 @@ Pausing a queue that is already paused does nothing.
resume(isLocal?: boolean): Promise
```
Returns a promise that resolves when the queue is resumed after being paused. The resume can be either local or global. If global, all workers in all queue instances for a given queue will be resumed. If local, only this worker will be resumed. Note that resuming a queue globally will *not* resume workers that have been paused locally; for those, `resume(true)` must be called directly on their instances.
Returns a promise that resolves when the queue is resumed after being paused. The resume can be either local or global. If global, all workers in all queue instances for a given queue will be resumed. If local, only this worker will be resumed. Note that resuming a queue globally will _not_ resume workers that have been paused locally; for those, `resume(true)` must be called directly on their instances.
Resuming a queue that is not paused does nothing.
@@ -347,19 +351,21 @@ Closes the underlying Redis client. Use this to perform a graceful shutdown.
var Queue = require('bull');
var queue = Queue('example');
var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
var after100 = _.after(100, function() {
queue.close().then(function() {
console.log('done');
});
});
queue.on('completed', after100);
```
`close` can be called from anywhere, with one caveat: if called
from within a job handler the queue won't close until *after*
from within a job handler the queue won't close until _after_
the job has been processed, so the following won't work:
```js
queue.process(function (job, jobDone) {
queue.process(function(job, jobDone) {
handle(job);
queue.close().then(jobDone);
});
@@ -368,7 +374,7 @@ queue.process(function (job, jobDone) {
Instead, do this:
```js
queue.process(function (job, jobDone) {
queue.process(function(job, jobDone) {
handle(job);
queue.close();
jobDone();
@@ -408,6 +414,7 @@ Returns a promise that will return an array of job instances of the given types.
---
### Queue#getRepeatableJobs
```ts
getRepeatableJobs(start?: number, end?: number, asc?: boolean): Promise <Job[]>
```
@@ -488,7 +495,6 @@ Returns a promise that will return the active job counts for the given queue.
---
### Queue#getWaitingCount
```ts
@@ -539,7 +545,6 @@ Returns a promise that will return an array with the delayed jobs between start
---
### Queue#getCompleted
```ts
@@ -575,7 +580,7 @@ Tells the queue remove jobs of a specific type created outside of a grace period
queue.clean(5000);
//clean all jobs that failed over 10 seconds ago.
queue.clean(10000, 'failed');
queue.on('cleaned', function (job, type) {
queue.on('cleaned', function(job, type) {
console.log('Cleaned %s %s jobs', job.length, type);
});
```
@@ -601,9 +606,7 @@ The cleaner emits the `cleaned` event anytime the queue is cleaned.
---
Job
---
## Job
A job includes all data needed to perform its execution, as well as the progress method needed to update its progress.
@@ -717,9 +720,11 @@ Moves a job to the `failed` queue. Pulls a job from 'waiting' to 'active' and re
---
## Events
Events
------
=======
## Events
A queue emits also some useful events:
@@ -773,7 +778,6 @@ A queue emits also some useful events:
.on('removed', function(job){
// A job successfully removed.
});
```
### Global events
@@ -793,7 +797,6 @@ When working with global events whose local counterparts pass a `Job` instance t
If you need to access the `Job` instance in a global listener, use [Queue#getJob](#queuegetjob) to retrieve it. However, remember that if `removeOnComplete` is enabled when adding the job, the job will no longer be available after completion. Should you need to both access the job and remove it after completion, you can use [Job#remove](#jobremove) to remove it in the listener.
```js
// Local events pass the job instance...
queue.on('progress', function(job, progress) {
console.log(`Job ${job.id} is ${progress * 100}% ready!`);
+6 -1
View File
@@ -27,7 +27,8 @@
ARGV[5] optional jobid
ARGV[6] optional jobs per time unit (rate limiter)
ARGV[7] optional time unit
ARGV[7] optional time unit (rate limiter)
ARGV[8] optional discard delayed rate limit
]]
local jobId
@@ -48,8 +49,12 @@ if jobId then
if(ARGV[6]) then
local jobCounter
local maxJobs = tonumber(ARGV[6])
local discard = ARGV[8]
jobCounter = tonumber(rcall("GET", KEYS[6]))
if jobCounter ~= nil and jobCounter >= maxJobs then
if discard == 'true' then
return
end
local delay = tonumber(rcall("PTTL", KEYS[6]))
local timestamp = delay + tonumber(ARGV[4])
+5 -1
View File
@@ -92,7 +92,11 @@ var scripts = {
];
if (queue.limiter) {
args.push(queue.limiter.max, queue.limiter.duration);
args.push(
queue.limiter.max,
queue.limiter.duration,
queue.limiter.discard ? true : false
);
}
return queue.client.moveToActive(keys.concat(args)).then(raw2jobData);
},
+60 -7
View File
@@ -31,7 +31,7 @@ describe('Rate limiter', function() {
it('should throw exception if missing duration option', function(done) {
try {
var queue = utils.buildQueue('rate limiter fail', {
utils.buildQueue('rate limiter fail', {
limiter: {
max: 5
}
@@ -44,7 +44,7 @@ describe('Rate limiter', function() {
it('should throw exception if missing max option', function(done) {
try {
var queue = utils.buildQueue('rate limiter fail', {
utils.buildQueue('rate limiter fail', {
limiter: {
duration: 5000
}
@@ -69,18 +69,71 @@ describe('Rate limiter', function() {
queue.on(
'completed',
// after every job has been completed
_.after(numJobs, function() {
try {
expect(new Date().getTime() - startTime).to.be.above(
(numJobs - 1) * 1000
);
var timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.above((numJobs - 1) * 1000);
done();
} catch (e) {
done(e);
assert.fail(e);
}
})
);
queue.on('failed', done);
queue.on('failed', function(e) {
assert.fail(e);
});
});
it('should put a job into the delayed queue when limit is hit', function() {
queue.on('failed', function(e) {
assert.fail(e);
});
return Promise.all(
[1, 2, 3, 4].map(function() {
return queue.add({});
})
).then(function() {
Promise.all(
[1, 2, 3, 4].map(function() {
return queue.getNextJob();
})
).then(function() {
return queue.getDelayedCount().then(function(delayedCount) {
expect(delayedCount).to.eq(3);
});
});
});
});
it('should not put a job into the delayed queue when discard is true', function() {
var newQueue = utils.buildQueue('test rate limiter', {
limiter: {
max: 1,
duration: 1000,
discard: true
}
});
newQueue.on('failed', function(e) {
assert.fail(e);
});
return Promise.all(
[1, 2, 3, 4].map(function() {
return newQueue.add({});
})
).then(function() {
Promise.all(
[1, 2, 3, 4].map(function() {
return newQueue.getNextJob();
})
).then(function() {
return newQueue.getDelayedCount().then(function(delayedCount) {
expect(delayedCount).to.eq(0);
});
});
});
});
});