feat(metrics): add support for collecting queue metrics

This commit is contained in:
Manuel Astudillo
2022-03-01 22:46:35 +08:00
parent a0d79463fc
commit 886d764381
10 changed files with 651 additions and 158 deletions
+27 -1
View File
@@ -34,6 +34,7 @@
- [Queue#getCompleted](#queuegetcompleted)
- [Queue#getFailed](#queuegetfailed)
- [Queue#getWorkers](#queuegetworkers)
- [Queue#getMetrics](#queuegetmetrics)
- [Job](#job)
@@ -71,11 +72,18 @@ interface QueueOptions {
limiter?: RateLimiter;
redis?: RedisOpts;
prefix?: string = 'bull'; // prefix for all queue keys.
metrics?: MetricsOpts; // Configure metrics
defaultJobOptions?: JobOpts;
settings?: AdvancedSettings;
}
```
```typescript
interface MetricsOpts {
maxDataPoints?: number; // Max number of data points to collect, granularity is fixed at one minute.
}
```
```typescript
interface RateLimiter {
max: number; // Max number of jobs processed
@@ -796,11 +804,29 @@ Returns a promise that will return an array with the failed jobs between start a
getWorkers() : Promise<Array<Object>>
```
Returns a promise that will return an array workers currently listening or processing jobs.
Returns a promise that will resolve to an array workers currently listening or processing jobs.
The object includes the same fields as [Redis CLIENT LIST](https://redis.io/commands/client-list) command.
---
### Queue#getMetrics
```ts
getMetrics(type: 'completed' | 'failed', start = 0, end = -1) : Promise<{
meta: {
count: number;
prevTS: number;
prevCount: number;
};
data: number[];
count: number;
}>
```
Returns a promise that resolves to a Metrics object.
---
### Queue#clean
```ts
+1
View File
@@ -2,3 +2,4 @@
module.exports = require('./lib/queue');
module.exports.Job = require('./lib/job');
module.exports.utils = require('./lib/utils');
-118
View File
@@ -1,118 +0,0 @@
--[[
Move job from active to a finished status (completed or failed)
A job can only be moved to completed if it was active.
The job must be locked before it can be moved to a finished status,
and the lock must be released in this script.
Input:
KEYS[1] active key
KEYS[2] completed/failed key
KEYS[3] jobId key
KEYS[4] wait key
KEYS[5] priority key
KEYS[6] active event key
KEYS[7] delayed key
KEYS[8] stalled key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] msg property
ARGV[4] return value / failed reason
ARGV[5] token
ARGV[6] shouldRemove
ARGV[7] event data (? maybe just send jobid).
ARGV[8] should fetch next job
ARGV[9] base key
Output:
0 OK
-1 Missing key.
-2 Missing lock.
Events:
'completed/failed'
]]
local rcall = redis.call
if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
if ARGV[5] ~= "0" then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[8], ARGV[1])
else
return -2
end
end
-- Remove from active list
rcall("LREM", KEYS[1], -1, ARGV[1])
-- Remove job?
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
local maxAge = keepJobs['age']
local targetSet = KEYS[2]
local timestamp = ARGV[2]
if maxCount ~= 0 then
-- Add to complete/failed set
rcall("ZADD", targetSet, timestamp, ARGV[1])
rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn"
local function removeJobs(jobIds)
for i, jobId in ipairs(jobIds) do
local jobKey = ARGV[9] .. jobId
local jobLogKey = jobKey .. ':logs'
rcall("DEL", jobKey, jobLogKey)
end
end
-- Remove old jobs?
if maxAge ~= nil then
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
removeJobs(jobIds)
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end
if maxCount ~= nil and maxCount > 0 then
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
removeJobs(jobIds)
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1));
end
else
local jobLogKey = KEYS[3] .. ':logs'
rcall("DEL", KEYS[3], jobLogKey)
end
rcall("PUBLISH", targetSet, ARGV[7])
-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
if(ARGV[8] == "1") then
-- move from wait to active
local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1])
if jobId then
local jobKey = ARGV[9] .. jobId
local lockKey = jobKey .. ':lock'
-- get a lock
rcall("SET", lockKey, ARGV[11], "PX", ARGV[10])
rcall("ZREM", KEYS[5], jobId) -- remove from priority
rcall("PUBLISH", KEYS[6], jobId)
rcall("HSET", jobKey, "processedOn", ARGV[2])
return {rcall("HGETALL", jobKey), jobId} -- get job data
end
end
return 0
else
return -1
end
+168
View File
@@ -0,0 +1,168 @@
--[[
Move job from active to a finished status (completed or failed)
A job can only be moved to completed if it was active.
The job must be locked before it can be moved to a finished status,
and the lock must be released in this script.
Input:
KEYS[1] active key
KEYS[2] completed/failed key
KEYS[3] jobId key
KEYS[4] wait key
KEYS[5] priority key
KEYS[6] active event key
KEYS[7] delayed key
KEYS[8] stalled key
KEYS[9] metrics key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] msg property
ARGV[4] return value / failed reason
ARGV[5] token
ARGV[6] shouldRemove
ARGV[7] event data (? maybe just send jobid).
ARGV[8] should fetch next job
ARGV[9] base key
ARGV[10] lock token
ARGV[11] lock duration in milliseconds
ARGV[12] maxMetricsSize
Output:
0 OK
-1 Missing key.
-2 Missing lock.
Events:
'completed/failed'
]]
local rcall = redis.call
--[[
Functions to collect metrics based on a current and previous count of jobs.
Granualarity is fixed at 1 minute.
]]
local function collectMetrics(metaKey, dataPointsList, maxDataPoints, timestamp)
-- Increment current count
local count = rcall("HINCRBY", metaKey, "count", 1) - 1
-- Compute how many data points we need to add to the list, N.
local prevTS = rcall("HGET", metaKey, "prevTS")
if not prevTS then
-- If prevTS is nil, set it to the current timestamp
rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0)
return
end
local N = math.floor((timestamp - prevTS) / 60000)
if N > 0 then
local delta = count - rcall("HGET", metaKey, "prevCount")
-- If N > 1, add N-1 zeros to the list
if N > 1 then
local points = {}
points[1] = delta
for i = 2, N do points[i] = 0 end
rcall("LPUSH", dataPointsList, unpack(points))
else
-- LPUSH delta to the list
rcall("LPUSH", dataPointsList, delta)
end
-- LTRIM to keep list to its max size
rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1)
-- update prev count with current count
rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp)
end
end
if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
if ARGV[5] ~= "0" then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[5] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[8], ARGV[1])
else
return -2
end
end
-- Remove from active list
rcall("LREM", KEYS[1], -1, ARGV[1])
-- Remove job?
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
local maxAge = keepJobs['age']
local targetSet = KEYS[2]
local timestamp = ARGV[2]
if maxCount ~= 0 then
-- Add to complete/failed set
rcall("ZADD", targetSet, timestamp, ARGV[1])
rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn"
local function removeJobs(jobIds)
for i, jobId in ipairs(jobIds) do
local jobKey = ARGV[9] .. jobId
local jobLogKey = jobKey .. ':logs'
rcall("DEL", jobKey, jobLogKey)
end
end
-- Remove old jobs?
if maxAge ~= nil then
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
removeJobs(jobIds)
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end
if maxCount ~= nil and maxCount > 0 then
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
removeJobs(jobIds)
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1));
end
else
local jobLogKey = KEYS[3] .. ':logs'
rcall("DEL", KEYS[3], jobLogKey)
end
-- Collect metrics
if ARGV[12] ~= "" then
collectMetrics(KEYS[9], KEYS[9]..':data', ARGV[12], timestamp)
end
rcall("PUBLISH", targetSet, ARGV[7])
-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
if (ARGV[8] == "1") then
-- move from wait to active
local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1])
if jobId then
local jobKey = ARGV[9] .. jobId
local lockKey = jobKey .. ':lock'
-- get a lock
rcall("SET", lockKey, ARGV[11], "PX", ARGV[10])
rcall("ZREM", KEYS[5], jobId) -- remove from priority
rcall("PUBLISH", KEYS[6], jobId)
rcall("HSET", jobKey, "processedOn", ARGV[2])
return {rcall("HGETALL", jobKey), jobId} -- get job data
end
end
return 0
else
return -1
end
+14 -26
View File
@@ -7,12 +7,10 @@
ARGV[1] count
ARGV[2] force
]]
]]
-- This command completely destroys a queue including all of its jobs, current or past
-- leaving no trace of its existence. Since this script needs to iterate to find all the job
-- keys, consider that this call may be slow for very large queues.
-- The queue needs to be "paused" or it will return an error
-- If the queue has currently active jobs then the script by default will return error,
-- however this behaviour can be overrided using the `force` option.
@@ -45,15 +43,11 @@ end
local function removeZSetJobs(keyName, max)
local jobs = getZSetItems(keyName, max)
removeJobs(keyName, jobs)
if(#jobs > 0) then
rcall("ZREM", keyName, unpack(jobs))
end
if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end
end
local function removeLockKeys(keys)
for i, key in ipairs(keys) do
rcall("DEL", baseKey .. key .. ':lock')
end
for i, key in ipairs(keys) do rcall("DEL", baseKey .. key .. ':lock') end
end
-- 1) Check if paused, if not return with error.
@@ -65,7 +59,7 @@ end
local activeKey = baseKey .. 'active'
local activeJobs = getListItems(activeKey, maxCount)
if (#activeJobs > 0) then
if(ARGV[2] == "") then
if (ARGV[2] == "") then
return -2 -- Error, ExistsActiveJobs
end
end
@@ -73,35 +67,25 @@ end
removeLockKeys(activeJobs)
removeJobs(activeKey, activeJobs)
rcall("LTRIM", activeKey, #activeJobs, -1)
if(maxCount <= 0) then
return 1
end
if (maxCount <= 0) then return 1 end
local waitKey = baseKey .. 'paused'
removeListJobs(waitKey, maxCount)
if(maxCount <= 0) then
return 1
end
if (maxCount <= 0) then return 1 end
local delayedKey = baseKey .. 'delayed'
removeZSetJobs(delayedKey, maxCount)
if(maxCount <= 0) then
return 1
end
if (maxCount <= 0) then return 1 end
local completedKey = baseKey .. 'completed'
removeZSetJobs(completedKey, maxCount)
if(maxCount <= 0) then
return 1
end
if (maxCount <= 0) then return 1 end
local failedKey = baseKey .. 'failed'
removeZSetJobs(failedKey, maxCount)
if(maxCount <= 0) then
return 1
end
if (maxCount <= 0) then return 1 end
if(maxCount > 0) then
if (maxCount > 0) then
rcall("DEL", baseKey .. 'priority')
rcall("DEL", baseKey .. 'stalled-check')
rcall("DEL", baseKey .. 'stalled')
@@ -109,6 +93,10 @@ if(maxCount > 0) then
rcall("DEL", baseKey .. 'meta')
rcall("DEL", baseKey .. 'id')
rcall("DEL", baseKey .. 'repeat')
rcall("DEL", baseKey .. 'metrics:completed')
rcall("DEL", baseKey .. 'metrics:completed:data')
rcall("DEL", baseKey .. 'metrics:failed')
rcall("DEL", baseKey .. 'metrics:failed:data')
return 0
else
return 1
+42
View File
@@ -199,6 +199,48 @@ module.exports = function(Queue) {
};
});
};
/**
* Get queue metrics related to the queue.
*
* This method returns the gathered metrics for the queue.
* The metrics are represented as an array of job counts
* per unit of time (1 minute).
*
* @param start - Start point of the metrics, where 0
* is the newest point to be returned.
* @param end - End poinf of the metrics, where -1 is the
* oldest point to be returned.
*
* @returns - Returns an object with queue metrics.
*/
Queue.prototype.getMetrics = async function(type, start = 0, end = -1) {
const metricsKey = this.toKey(`metrics:${type}`);
const dataKey = `${metricsKey}:data`;
const multi = this.multi();
multi.hmget(metricsKey, 'count', 'prevTS', 'prevCount');
multi.lrange(dataKey, start, end);
multi.llen(dataKey);
const [hmget, range, len] = await multi.exec();
const [err, [count, prevTS, prevCount]] = hmget;
const [err2, data] = range;
const [err3, numPoints] = len;
if (err || err2) {
throw err || err2 || err3;
}
return {
meta: {
count: parseInt(count || '0', 10),
prevTS: parseInt(prevTS || '0', 10),
prevCount: parseInt(prevCount || '0', 10)
},
data,
count: numPoints
};
};
};
function parseTypeArg(args) {
+2
View File
@@ -219,6 +219,8 @@ const Queue = function Queue(name, url, opts) {
isSharedChildPool: false
});
this.metrics = opts.metrics;
this.settings.lockRenewTime =
this.settings.lockRenewTime || this.settings.lockDuration / 2;
+10 -13
View File
@@ -114,27 +114,20 @@ const scripts = {
});
},
retryJobsArgs(
queue,
count,
) {
const keys = [
queue.toKey(''),
queue.toKey('failed'),
queue.toKey('wait'),
];
retryJobsArgs(queue, count) {
const keys = [queue.toKey(''), queue.toKey('failed'), queue.toKey('wait')];
const args = [count];
return keys.concat(args);
},
async retryJobs(queue, count = 1000){
async retryJobs(queue, count = 1000) {
const client = await queue.client;
const args = this.retryJobsArgs(queue, count);
return (client).retryJobs(args);
return client.retryJobs(args);
},
moveToFinishedArgs(
@@ -149,6 +142,8 @@ const scripts = {
const queue = job.queue;
const queueKeys = queue.keys;
const metricsKey = queue.toKey(`metrics:${target}`);
const keys = [
queueKeys.active,
queueKeys[target],
@@ -157,7 +152,8 @@ const scripts = {
queueKeys.priority,
queueKeys.active + '@' + queue.token,
queueKeys.delayed,
queueKeys.stalled
queueKeys.stalled,
metricsKey
];
const keepJobs = pack(
@@ -179,7 +175,8 @@ const scripts = {
notFetch || queue.paused || queue.closing || queue.limiter ? 0 : 1,
queueKeys[''],
queue.settings.lockDuration,
queue.token
queue.token,
queue.metrics?.maxDataPoints
];
return keys.concat(args);
+11
View File
@@ -57,3 +57,14 @@ module.exports.emitSafe = function(emitter, event, ...args) {
}
}
};
module.exports.MetricsTime = {
ONE_MINUTE: 1,
FIVE_MINUTES: 5,
FIFTEEN_MINUTES: 15,
THIRTY_MINUTES: 30,
ONE_HOUR: 60,
ONE_WEEK: 60 * 24 * 7,
TWO_WEEKS: 60 * 24 * 7 * 2,
ONE_MONTH: 60 * 24 * 7 * 2 * 4
};
+376
View File
@@ -0,0 +1,376 @@
'use strict';
const expect = require('chai').expect;
const utils = require('./utils');
const sinon = require('sinon');
const redis = require('ioredis');
const ONE_SECOND = 1000;
const ONE_MINUTE = 60 * ONE_SECOND;
const ONE_HOUR = 60 * ONE_MINUTE;
const { MetricsTime } = require('../lib/utils');
describe('metrics', function() {
beforeEach(async function() {
this.clock = sinon.useFakeTimers();
const client = new redis();
//await client.flushdb();
return client.quit();
});
it('should gather metrics for completed jobs', async function() {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
this.clock.tick(0);
const timmings = [
0,
0, // For the fixtures to work we need to use 0 as first timing
ONE_MINUTE / 2,
ONE_MINUTE / 2,
0,
0,
ONE_MINUTE,
ONE_MINUTE,
ONE_MINUTE * 3,
ONE_HOUR,
ONE_MINUTE
];
const fixture = [
'1',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'1',
'0',
'0',
'1',
'1',
'3',
'3'
];
const numJobs = timmings.length;
const queue = utils.buildQueue('metrics', {
metrics: {
maxDataPoints: MetricsTime.ONE_HOUR * 2
}
});
queue.process(job => {
this.clock.tick(timmings[job.data.index]);
});
let processed = 0;
const completing = new Promise(resolve => {
queue.on('completed', async () => {
processed++;
if (processed === numJobs) {
resolve();
}
});
});
for (let i = 0; i < numJobs; i++) {
await queue.add({ index: i });
}
await completing;
const metrics = await queue.getMetrics('completed');
const numPoints = Math.floor(
timmings.reduce((sum, timing) => sum + timing, 0) / ONE_MINUTE
);
expect(metrics.meta.count).to.be.equal(numJobs);
expect(metrics.data.length).to.be.equal(numPoints);
expect(metrics.count).to.be.equal(metrics.data.length);
expect(processed).to.be.equal(numJobs);
expect(metrics.data).to.be.deep.equal(fixture);
this.clock.restore();
await queue.close();
});
it('should only keep metrics for "maxDataPoints"', async function() {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
this.clock.tick(0);
const timmings = [
0, // For the fixtures to work we need to use 0 as first timing
0,
ONE_MINUTE / 2,
ONE_MINUTE / 2,
0,
0,
ONE_MINUTE,
ONE_MINUTE,
ONE_MINUTE * 3,
ONE_HOUR,
0,
0,
ONE_MINUTE,
ONE_MINUTE
];
const fixture = [
'1',
'3',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0',
'0'
];
const numJobs = timmings.length;
const queue = utils.buildQueue('metrics', {
metrics: {
maxDataPoints: MetricsTime.FIFTEEN_MINUTES
}
});
queue.process(job => {
this.clock.tick(timmings[job.data.index]);
});
let processed = 0;
const completing = new Promise(resolve => {
queue.on('completed', async () => {
processed++;
if (processed === numJobs) {
resolve();
}
});
});
for (let i = 0; i < numJobs; i++) {
await queue.add({ index: i });
}
await completing;
const metrics = await queue.getMetrics('completed');
expect(metrics.meta.count).to.be.equal(numJobs);
expect(metrics.data.length).to.be.equal(MetricsTime.FIFTEEN_MINUTES);
expect(metrics.count).to.be.equal(metrics.data.length);
expect(processed).to.be.equal(numJobs);
expect(metrics.data).to.be.deep.equal(fixture);
this.clock.restore();
await queue.close();
});
it('should gather metrics for failed jobs', async function() {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
this.clock.tick(0);
const timmings = [
0, // For the fixtures to work we need to use 0 as first timing
ONE_MINUTE,
ONE_MINUTE / 5,
ONE_MINUTE / 2,
0,
ONE_MINUTE,
ONE_MINUTE * 3,
0
];
const fixture = ['0', '0', '1', '4', '1'];
const numJobs = timmings.length;
const queue = utils.buildQueue('metrics', {
metrics: {
maxDataPoints: MetricsTime.ONE_HOUR * 2
}
});
queue.process(async job => {
this.clock.tick(timmings[job.data.index]);
throw new Error('test');
});
let processed = 0;
const completing = new Promise(resolve => {
queue.on('failed', async () => {
processed++;
if (processed === numJobs) {
resolve();
}
});
});
for (let i = 0; i < numJobs; i++) {
await queue.add({ index: i });
}
await completing;
const metrics = await queue.getMetrics('failed');
const numPoints = Math.floor(
timmings.reduce((sum, timing) => sum + timing, 0) / ONE_MINUTE
);
expect(metrics.meta.count).to.be.equal(numJobs);
expect(metrics.data.length).to.be.equal(numPoints);
expect(metrics.count).to.be.equal(metrics.data.length);
expect(processed).to.be.equal(numJobs);
expect(metrics.data).to.be.deep.equal(fixture);
this.clock.restore();
await queue.close();
});
it('should get metrics with pagination', async function() {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
this.clock.tick(0);
const timmings = [
0,
0, // For the fixtures to work we need to use 0 as first timing
ONE_MINUTE / 2,
ONE_MINUTE / 2,
0,
0,
ONE_MINUTE,
ONE_MINUTE,
ONE_MINUTE * 3,
ONE_HOUR,
ONE_MINUTE
];
const numJobs = timmings.length;
const queue = utils.buildQueue('metrics', {
metrics: {
maxDataPoints: MetricsTime.ONE_HOUR * 2
}
});
queue.process(async job => {
this.clock.tick(timmings[job.data.index]);
});
let processed = 0;
const completing = new Promise(resolve => {
queue.on('completed', async () => {
processed++;
if (processed === numJobs) {
resolve();
}
});
});
for (let i = 0; i < numJobs; i++) {
await queue.add({ index: i });
}
await completing;
expect(processed).to.be.equal(numJobs);
const numPoints = Math.floor(
timmings.reduce((sum, timing) => sum + timing, 0) / ONE_MINUTE
);
const pageSize = 10;
const data = [];
let skip = 0;
while (skip < numPoints) {
const metrics = await queue.getMetrics(
'completed',
skip,
skip + pageSize - 1
);
expect(metrics.meta.count).to.be.equal(numJobs);
expect(metrics.data.length).to.be.equal(
Math.min(numPoints - skip, pageSize)
);
data.push(...metrics.data);
skip += pageSize;
}
const metrics = await queue.getMetrics('completed');
expect(data).to.be.deep.equal(metrics.data);
this.clock.restore();
await queue.close();
});
});