feat: better rate limiter (#1816)

This commit is contained in:
Manuel Astudillo
2020-08-11 11:07:30 +02:00
committed by GitHub
parent 09b9a7f437
commit 923ef0c17b
17 changed files with 1423 additions and 432 deletions
+16 -12
View File
@@ -5,26 +5,30 @@ name: Node.js CI
on:
push:
branches: [ develop ]
branches: [develop]
pull_request:
branches: [ develop ]
branches: [develop]
jobs:
build:
runs-on: ubuntu-latest
services:
redis:
image: redis
ports:
- 6379:6379
strategy:
matrix:
node-version: [10.x, 12.x, 14.x]
steps:
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- run: yarn install --frozen-lockfile --non-interactive
- run: yarn prettier -- --list-different
- run: yarn test
- run: yarn coveralls
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- run: yarn install --frozen-lockfile --non-interactive
- run: yarn prettier -- --list-different
- run: yarn test
+1
View File
@@ -4,3 +4,4 @@ coverage
*.rdb
.vscode
package-lock.json
.nyc_output
-31
View File
@@ -1,31 +0,0 @@
language: node_js
sudo: false
cache:
yarn: true
# test on node.js versions
node_js:
- '14'
- '12'
- '10'
services:
- redis-server
before_install:
# Use a specific version of yarn in CI. This ensures yarn.lock format doesn't change.
- curl -o- -L https://yarnpkg.com/install.sh | bash -s -- --version 1.19.1
- export PATH="$HOME/.yarn/bin:$PATH"
install:
# ensure unexpected changes to yarn.lock break the build
- yarn install --frozen-lockfile --non-interactive
script:
- npm run prettier -- --list-different
- npm run test
after_script:
- npm run coveralls
-47
View File
@@ -1,47 +0,0 @@
--[[
Remove jobs from the specific set.
Input:
KEYS[1] set key,
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
]]
local command = "ZRANGE"
local isList = false
if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then
command = "LRANGE"
isList = true
end
local jobs = redis.call(command, KEYS[1], 0, -1)
local deleted = {}
local deletedCount = 0
local limit = tonumber(ARGV[3])
local jobTS
for _, job in ipairs(jobs) do
if limit > 0 and deletedCount >= limit then
break
end
local jobKey = ARGV[1] .. job
if (redis.call("EXISTS", jobKey .. ":lock") == 0) then
jobTS = redis.call("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < ARGV[2]) then
if isList then
redis.call("LREM", KEYS[1], 0, job)
else
redis.call("ZREM", KEYS[1], job)
end
redis.call("DEL", jobKey)
redis.call("DEL", jobKey .. ":logs")
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
end
end
return deleted
+60
View File
@@ -0,0 +1,60 @@
--[[
Remove jobs from the specific set.
Input:
KEYS[1] set key,
KEYS[2] rate limiter key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
]]
local command = "ZRANGE"
local isList = false
local rcall = redis.call
if ARGV[4] == "wait" or ARGV[4] == "active" or ARGV[4] == "paused" then
command = "LRANGE"
isList = true
end
local jobIds = rcall(command, KEYS[1], 0, -1)
local deleted = {}
local deletedCount = 0
local limit = tonumber(ARGV[3])
local jobTS
for _, jobId in ipairs(jobIds) do
if limit > 0 and deletedCount >= limit then
break
end
local jobKey = ARGV[1] .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
jobTS = rcall("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < ARGV[2]) then
if isList then
rcall("LREM", KEYS[1], 0, jobId)
else
rcall("ZREM", KEYS[1], jobId)
end
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")
-- delete keys related to rate limiter
-- NOTE: this code is unncessary for other sets than wait, paused and delayed.
local limiterIndexTable = KEYS[2] .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)
if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end
deletedCount = deletedCount + 1
table.insert(deleted, jobId)
end
end
end
return deleted
+75 -37
View File
@@ -32,12 +32,77 @@
ARGV[9] optional rate limit by key
]]
local jobId
local rcall = redis.call
if(ARGV[5] ~= "") then
jobId = ARGV[5]
local rateLimit = function(jobId, maxJobs)
local rateLimiterKey = KEYS[6];
local limiterIndexTable = rateLimiterKey .. ":index"
-- Rate limit by group?
if(ARGV[9]) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
end
end
-- -- key for storing rate limited jobs
local limitedSetKey = rateLimiterKey .. ":limited"
local delay = 0
-- -- Check if job was already limited
local isLimited = rcall("SISMEMBER", limitedSetKey, jobId);
if isLimited == 1 then
-- Remove from limited zset since we are going to try to process it
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
else
-- If not, check if there are any limited jobs
local numLimitedJobs = rcall("SCARD", limitedSetKey)
if numLimitedJobs > 0 then
-- Note, add some slack to compensate for drift.
delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
end
end
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
-- check if rate limit hit
if (delay == 0) and (jobCounter > maxJobs) then
local exceedingJobs = jobCounter - maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs - 1) * ARGV[7]) / maxJobs
elseif jobCounter == 1 then
rcall("PEXPIRE", rateLimiterKey, ARGV[7])
end
if delay > 0 then
local bounceBack = ARGV[8]
if bounceBack == 'false' then
local timestamp = delay + tonumber(ARGV[4])
-- put job into delayed queue
rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", KEYS[7], timestamp)
rcall("SADD", limitedSetKey, jobId)
-- store index so that we can delete rate limited data
rcall("HSET", limiterIndexTable, jobId, limitedSetKey)
end
-- remove from active queue
rcall("LREM", KEYS[2], 1, jobId)
return true
else
-- false indicates not rate limited
return false
end
end
local jobId = ARGV[5]
if jobId ~= '' then
-- clean stalled key
rcall("SREM", KEYS[5], jobId)
else
@@ -49,47 +114,19 @@ if jobId then
-- Check if we need to perform rate limiting.
local maxJobs = tonumber(ARGV[6])
if(maxJobs) then
local rateLimiterKey = KEYS[6];
if(ARGV[9]) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
end
end
-- local jobCounter = tonumber(rcall("GET", rateLimiterKey))
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
local bounceBack = ARGV[8]
-- check if rate limit hit
if jobCounter > maxJobs then
if bounceBack == 'false' then
local exceedingJobs = jobCounter - maxJobs
local delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs - 1) * ARGV[7]) / maxJobs
local timestamp = delay + tonumber(ARGV[4])
-- put job into delayed queue
rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", KEYS[7], timestamp)
end
-- remove from active queue
rcall("LREM", KEYS[2], 1, jobId)
return
else
if jobCounter == 1 then
rcall("PEXPIRE", rateLimiterKey, ARGV[7])
end
if maxJobs then
if rateLimit(jobId, maxJobs) then
return
end
end
-- get a lock
local jobKey = ARGV[1] .. jobId
local lockKey = jobKey .. ':lock'
-- get a lock
rcall("SET", lockKey, ARGV[2], "PX", ARGV[3])
rcall("ZREM", KEYS[3], jobId) -- remove from priority
-- remove from priority
rcall("ZREM", KEYS[3], jobId)
rcall("PUBLISH", KEYS[4], jobId)
rcall("HSET", jobKey, "processedOn", ARGV[4])
@@ -97,3 +134,4 @@ if jobId then
else
rcall("PUBLISH", KEYS[8], "")
end
@@ -13,6 +13,8 @@
KEYS[5] priority key
KEYS[6] active event key
KEYS[7] delayed key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] msg property
@@ -71,6 +73,25 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
rcall("PUBLISH", KEYS[2], ARGV[7])
-- -- Check if we should get from the delayed set instead of the waiting list
-- local delayedJobId = rcall("ZRANGEBYSCORE", KEYS[7], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1)[1]
-- if delayedJobId ~= nil then
-- local jobId = delayedJobId
-- if jobId then
-- local jobKey = ARGV[9] .. jobId
-- local lockKey = jobKey .. ':lock'
-- -- get a lock
-- rcall("SET", lockKey, ARGV[11], "PX", ARGV[10])
-- rcall("ZREM", KEYS[5], jobId) -- remove from priority
-- rcall("PUBLISH", KEYS[6], jobId)
-- rcall("HSET", jobKey, "processedOn", ARGV[2])
-- return {rcall("HGETALL", jobKey), jobId} -- get job data
-- end
-- end
-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
if(ARGV[8] == "1") then
@@ -12,6 +12,7 @@
KEYS[7] 'priority',
KEYS[8] jobId
KEYS[9] job logs
KEYS[10] rate limiter index table
ARGV[1] jobId
ARGV[2] lock token
@@ -22,18 +23,28 @@
-- TODO PUBLISH global event 'removed'
local rcall = redis.call
local lockKey = KEYS[8] .. ':lock'
local lock = redis.call("GET", lockKey)
if not lock then -- or (lock == ARGV[2])) then
redis.call("LREM", KEYS[1], 0, ARGV[1])
redis.call("LREM", KEYS[2], 0, ARGV[1])
redis.call("ZREM", KEYS[3], ARGV[1])
redis.call("LREM", KEYS[4], 0, ARGV[1])
redis.call("ZREM", KEYS[5], ARGV[1])
redis.call("ZREM", KEYS[6], ARGV[1])
redis.call("ZREM", KEYS[7], ARGV[1])
redis.call("DEL", KEYS[8])
redis.call("DEL", KEYS[9])
local jobId = ARGV[1]
rcall("LREM", KEYS[1], 0, jobId)
rcall("LREM", KEYS[2], 0, jobId)
rcall("ZREM", KEYS[3], jobId)
rcall("LREM", KEYS[4], 0, jobId)
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)
rcall("DEL", KEYS[8])
rcall("DEL", KEYS[9])
-- delete keys related to rate limiter
local limiterIndexTable = KEYS[10] .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)
if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end
return 1
else
return 0
@@ -10,6 +10,7 @@
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] 'rate-limiter'
ARGV[1] prefix
ARGV[2] pattern
@@ -44,9 +45,17 @@ for i, jobKey in ipairs(jobKeys) do
rcall("ZREM", KEYS[7], jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
-- delete keys related to rate limiter
local limiterIndexTable = KEYS[8] .. ":index"
local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)
if limitedSetKey then
rcall("SREM", limitedSetKey, jobId)
rcall("HDEL", limiterIndexTable, jobId)
end
table.insert(removed, jobId)
end
end
end
return {cursor, removed}
+63 -72
View File
@@ -276,7 +276,7 @@ function redisClientGetter(queue, options, initCallback) {
return function(type) {
return function() {
// getter function
// Memoized connection
if (connections[type] != null) {
return connections[type];
}
@@ -496,48 +496,37 @@ Queue.prototype.isReady = function() {
});
};
function redisClientDisconnect(client) {
if (client.status === 'end') {
return Promise.resolve();
}
let _resolve, _reject;
return new Promise((resolve, reject) => {
_resolve = resolve;
_reject = reject;
client.once('end', resolve);
client.once('error', reject);
async function redisClientDisconnect(client) {
if (client.status !== 'end') {
let _resolve, _reject;
return new Promise((resolve, reject) => {
_resolve = resolve;
_reject = reject;
client.once('end', _resolve);
pTimeout(
client.quit().catch(err => {
if (err.message !== 'Connection is closed.') {
throw err;
pTimeout(
client.quit().catch(err => {
if (err.message !== 'Connection is closed.') {
throw err;
}
}),
500
).catch(() => {
client.once('error', _reject);
client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
resolve();
}
}),
500
).catch(() => {
client.disconnect();
});
}).finally(() => {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
});
}).finally(() => {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
});
}
}
Queue.prototype.disconnect = function() {
//
// TODO: Only quit clients that we "own".
//
const clients = this.clients.filter(client => {
return client.status !== 'end';
});
return Promise.all(clients.map(redisClientDisconnect))
.catch(err => {
return console.error(err);
})
.then(() => {
return null;
});
Queue.prototype.disconnect = async function() {
await Promise.all(this.clients.map(redisClientDisconnect));
};
Queue.prototype.removeJobs = function(pattern) {
@@ -545,25 +534,21 @@ Queue.prototype.removeJobs = function(pattern) {
};
Queue.prototype.close = function(doNotWaitJobs) {
let isReady = true;
if (this.closing) {
return this.closing;
}
return (this.closing = this.isReady()
.then(
() => {
return this._initializingProcess;
},
(/*err*/) => {
// Ignore this error and try to close anyway.
() => this._initializingProcess,
err => {
console.error(err);
isReady = false;
}
)
.finally(() => {
return this._clearTimers();
})
.then(() => {
return this.pause(true, doNotWaitJobs);
})
.finally(() => this._clearTimers())
.then(() => isReady && this.pause(true, doNotWaitJobs))
.then(() => {
if (!this.childPool) {
return;
@@ -577,12 +562,8 @@ Queue.prototype.close = function(doNotWaitJobs) {
return cleanPromise;
})
.then(
() => {
return this.disconnect();
},
(/*err*/) => {
// Ignore this error and try to close anyway.
}
async () => this.disconnect(),
err => console.error(err)
)
.finally(() => {
this.closed = true;
@@ -758,31 +739,43 @@ Queue.prototype.addBulk = function(jobs) {
TODO: Use EVAL to make this operation fully atomic.
*/
Queue.prototype.empty = function() {
// Get all jobids and empty all lists atomically.
const queueKeys = this.keys;
let multi = this.multi();
multi.lrange(this.toKey('wait'), 0, -1);
multi.lrange(this.toKey('paused'), 0, -1);
multi.del(this.toKey('wait'));
multi.del(this.toKey('paused'));
multi.del(this.toKey('meta-paused'));
multi.del(this.toKey('delayed'));
multi.del(this.toKey('priority'));
multi.lrange(queueKeys.wait, 0, -1);
multi.lrange(queueKeys.paused, 0, -1);
multi.keys(this.toKey('*:limited'));
multi.del(
queueKeys.wait,
queueKeys.paused,
queueKeys['meta-paused'],
queueKeys.delayed,
queueKeys.priority,
queueKeys.limiter,
`${queueKeys.limiter}:index`
);
return multi.exec().then(res => {
let waiting = res[0],
paused = res[1];
let [waiting, paused, limited] = res;
waiting = waiting[1];
paused = paused[1];
limited = limited[1];
const jobKeys = paused.concat(waiting).map(this.toKey, this);
if (jobKeys.length) {
if (jobKeys.length || limited.length) {
multi = this.multi();
for (let i = 0; i < jobKeys.length; i += 10000) {
multi.del.apply(multi, jobKeys.slice(i, i + 10000));
}
for (let i = 0; i < limited.length; i += 10000) {
multi.del.apply(multi, limited.slice(i, i + 10000));
}
return multi.exec();
}
});
@@ -819,9 +812,9 @@ Queue.prototype.pause = function(isLocal, doNotWaitActive) {
if (doNotWaitActive) {
// Force reconnection of blocking connection to abort blocking redis call immediately.
return redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});
return redisClientDisconnect(this.bclient).then(() =>
this.bclient.connect()
);
}
return this.whenCurrentJobsFinished();
@@ -1238,9 +1231,7 @@ Queue.prototype.whenCurrentJobsFinished = function() {
return this.bclient.connect();
});
return Promise.all(this.processing).then(() => {
return forcedReconnection;
});
return Promise.all(this.processing).then(() => forcedReconnection);
};
//
+25 -32
View File
@@ -121,7 +121,8 @@ const scripts = {
queue.toKey(job.id),
queueKeys.wait,
queueKeys.priority,
queueKeys.active + '@' + queue.token
queueKeys.active + '@' + queue.token,
queueKeys.delayed
];
if (typeof shouldRemove === 'boolean') {
@@ -281,41 +282,32 @@ const scripts = {
},
remove(queue, jobId) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority',
jobId,
jobId + ':logs'
],
name => {
return queue.toKey(name);
}
);
const keys = [
queue.keys.active,
queue.keys.wait,
queue.keys.delayed,
queue.keys.paused,
queue.keys.completed,
queue.keys.failed,
queue.keys.priority,
queue.toKey(jobId),
queue.toKey(`${jobId}:logs`),
queue.keys.limiter
];
return queue.client.removeJob(keys.concat([jobId, queue.token]));
},
async removeWithPattern(queue, pattern) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority'
],
name => {
return queue.toKey(name);
}
);
const keys = [
queue.keys.active,
queue.keys.wait,
queue.keys.delayed,
queue.keys.paused,
queue.keys.completed,
queue.keys.failed,
queue.keys.priority,
queue.keys.limiter
];
const allRemoved = [];
let cursor = '0',
@@ -411,6 +403,7 @@ const scripts = {
cleanJobsInSet(queue, set, ts, limit) {
return queue.client.cleanJobsInSet([
queue.toKey(set),
queue.keys.limiter,
queue.toKey(''),
ts,
limit || 0,
+9 -8
View File
@@ -24,18 +24,18 @@
"debuglog": "^1.0.0",
"get-port": "^5.1.1",
"ioredis": "^4.14.1",
"lodash": "^4.17.15",
"lodash": "^4.17.19",
"p-timeout": "^3.2.0",
"promise.prototype.finally": "^3.1.2",
"semver": "^6.3.0",
"semver": "^7.3.2",
"util.promisify": "^1.0.1",
"uuid": "^8.2.0"
"uuid": "^8.3.0"
},
"devDependencies": {
"@commitlint/cli": "^7.6.1",
"@commitlint/config-conventional": "^7.6.0",
"chai": "^4.2.0",
"coveralls": "^3.0.9",
"coveralls": "^3.1.0",
"delay": "^4.3.0",
"eslint": "^7.4.0",
"eslint-plugin-mocha": "^7.0.1",
@@ -44,9 +44,10 @@
"husky": "^1.3.1",
"istanbul": "^0.4.5",
"lint-staged": "^8.2.1",
"mocha": "^6.2.2",
"mocha": "^8.1.1",
"mocha-lcov-reporter": "^1.3.0",
"moment": "^2.24.0",
"nyc": "^15.1.0",
"p-reflect": "^1.0.0",
"prettier": "^1.19.1",
"sinon": "^7.5.0"
@@ -54,9 +55,9 @@
"scripts": {
"pretest": "npm run lint",
"lint": "eslint lib test *.js",
"test": "NODE_ENV=test mocha 'test/test_*'",
"test:nolint": "NODE_ENV=test mocha 'test/test_*'",
"coveralls": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- --exit -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage",
"test": "NODE_ENV=test nyc mocha -- 'test/test_*' --recursive --exit",
"test:nolint": "NODE_ENV=test mocha 'test/test_*' --recursive --exit",
"coverage": "nyc report --reporter=text-lcov | coveralls",
"postpublish": "git push && git push --tags",
"prettier": "prettier --config package.json --write '**/*.js'",
"precommit": "lint-staged",
+33 -31
View File
@@ -120,7 +120,7 @@ describe('Queue', () => {
});
describe('instantiation', () => {
it('should create a queue with standard redis opts', done => {
it('should create a queue with standard redis opts', () => {
const queue = new Queue('standard');
expect(queue.client.options.host).to.be.eql('127.0.0.1');
@@ -132,7 +132,7 @@ describe('Queue', () => {
expect(queue.client.options.db).to.be.eql(0);
expect(queue.eclient.options.db).to.be.eql(0);
queue.close().then(done, done);
return queue.close();
});
it('should create a queue with a redis connection string', () => {
@@ -147,8 +147,8 @@ describe('Queue', () => {
expect(queue.client.options.db).to.be.eql(2);
expect(queue.eclient.options.db).to.be.eql(2);
queue.close();
});
return queue.close();
}).timeout(5000);
it('should create a queue with only a hostname', () => {
const queue = new Queue('connstring', 'redis://127.2.3.4');
@@ -187,7 +187,7 @@ describe('Queue', () => {
});
});
it('creates a queue using the supplied redis DB', done => {
it('creates a queue using the supplied redis DB', () => {
const queue = new Queue('custom', { redis: { DB: 1 } });
expect(queue.client.options.host).to.be.eql('127.0.0.1');
@@ -199,10 +199,10 @@ describe('Queue', () => {
expect(queue.client.options.db).to.be.eql(1);
expect(queue.eclient.options.db).to.be.eql(1);
queue.close().then(done, done);
return queue.close();
});
it('creates a queue using the supplied redis host', done => {
it('creates a queue using the supplied redis host', () => {
const queue = new Queue('custom', { redis: { host: 'localhost' } });
expect(queue.client.options.host).to.be.eql('localhost');
@@ -211,7 +211,7 @@ describe('Queue', () => {
expect(queue.client.options.db).to.be.eql(0);
expect(queue.eclient.options.db).to.be.eql(0);
queue.close().then(done, done);
return queue.close();
});
it('creates a queue with dots in its name', () => {
@@ -1646,29 +1646,6 @@ describe('Queue', () => {
});
});
it('count added, unprocessed jobs', () => {
const maxJobs = 100;
const added = [];
const queue = utils.buildQueue();
for (let i = 1; i <= maxJobs; i++) {
added.push(queue.add({ foo: 'bar', num: i }));
}
return Promise.all(added)
.then(queue.count.bind(queue))
.then(count => {
expect(count).to.be.eql(maxJobs);
})
.then(queue.empty.bind(queue))
.then(queue.count.bind(queue))
.then(count => {
expect(count).to.be.eql(0);
return queue.close();
});
});
describe('Delayed jobs', () => {
let queue;
@@ -2483,6 +2460,31 @@ describe('Queue', () => {
});
});
describe('Drain queue', () => {
it('should count zero after draining the queue', () => {
const maxJobs = 100;
const added = [];
const queue = utils.buildQueue();
for (let i = 1; i <= maxJobs; i++) {
added.push(queue.add({ foo: 'bar', num: i }));
}
return Promise.all(added)
.then(queue.count.bind(queue))
.then(count => {
expect(count).to.be.eql(maxJobs);
})
.then(queue.empty.bind(queue))
.then(queue.count.bind(queue))
.then(count => {
expect(count).to.be.eql(0);
return queue.close();
});
});
});
describe('Cleaner', () => {
let queue;
+81 -2
View File
@@ -55,7 +55,7 @@ describe('Rate limiter', () => {
}
});
it.skip('should obey the rate limit', done => {
it('should obey the rate limit', done => {
const startTime = new Date().getTime();
const numJobs = 4;
@@ -84,7 +84,86 @@ describe('Rate limiter', () => {
queue.on('failed', err => {
done(err);
});
});
}).timeout(5000);
// Skip because currently job priority is maintained in a best effort way, but cannot
// be guaranteed for rate limited jobs.
it.skip('should obey job priority', async () => {
const newQueue = utils.buildQueue('test rate limiter', {
limiter: {
max: 1,
duration: 150
}
});
const numJobs = 20;
const priorityBuckets = {
1: 0,
2: 0,
3: 0,
4: 0
};
const numPriorities = Object.keys(priorityBuckets).length;
newQueue.process(job => {
const priority = job.opts.priority;
priorityBuckets[priority] = priorityBuckets[priority] - 1;
for (let p = 1; p < priority; p++) {
if (priorityBuckets[p] > 0) {
const before = JSON.stringify(priorityBucketsBefore);
const after = JSON.stringify(priorityBuckets);
throw new Error(
`Priority was not enforced, job with priority ${priority} was processed before all jobs with priority ${p} were processed. Bucket counts before: ${before} / after: ${after}`
);
}
}
});
const result = new Promise((resolve, reject) => {
newQueue.on('failed', (job, err) => {
reject(err);
});
const afterNumJobs = _.after(numJobs, () => {
try {
expect(_.every(priorityBuckets, value => value === 0)).to.eq(true);
resolve();
} catch (err) {
reject(err);
}
});
newQueue.on('completed', () => {
afterNumJobs();
});
});
await newQueue.pause();
const promises = [];
for (let i = 0; i < numJobs; i++) {
const opts = { priority: (i % numPriorities) + 1 };
priorityBuckets[opts.priority] = priorityBuckets[opts.priority] + 1;
promises.push(newQueue.add({ id: i }, opts));
}
const priorityBucketsBefore = _.reduce(
priorityBuckets,
(acc, value, key) => {
acc[key] = value;
return acc;
},
{}
);
await Promise.all(promises);
await newQueue.resume();
return result;
}).timeout(60000);
it('should put a job into the delayed queue when limit is hit', () => {
const newQueue = utils.buildQueue('test rate limiter', {
+1 -1
View File
@@ -604,7 +604,7 @@ describe('repeat', () => {
done(Error('should not repeat more than 5 times'));
}
});
});
}).timeout(5000);
it('should processes delayed jobs by priority', function(done) {
const _this = this;
+1 -1
View File
@@ -177,7 +177,7 @@ describe('sandboxed process', () => {
]).then(() => {
queue.process(__dirname + '/fixtures/fixture_processor_slow.js');
});
});
}).timeout(5000);
it('should process and complete using done', done => {
queue.process(__dirname + '/fixtures/fixture_processor_callback.js');
+1007 -148
View File
File diff suppressed because it is too large Load Diff