fix(retry-job): consider priority (#2737) fixes #1755

This commit is contained in:
Rogger Valverde
2024-05-14 23:12:02 -06:00
committed by GitHub
parent 8c3bf1f8a4
commit 09ce146563
12 changed files with 167 additions and 115 deletions
+2
View File
@@ -22,6 +22,8 @@ jobs:
node-version: 12
- name: Install dependencies
run: yarn install --frozen-lockfile --non-interactive
- name: Generate scripts
run: yarn pretest
- name: Release
env:
GITHUB_TOKEN: ${{ secrets.GH_TOKEN }}
+3 -3
View File
@@ -26,11 +26,11 @@ class RawScriptLoader extends ScriptLoader {
for (const command of scripts) {
const {
name,
options: { numberOfKeys, lua },
options: { numberOfKeys, lua }
} = command;
await writeFile(
path.join(writeFilenamePath, `${name}-${numberOfKeys}.lua`),
lua,
lua
);
}
}
@@ -41,5 +41,5 @@ const scriptLoader = new RawScriptLoader();
scriptLoader.transpileScripts(
path.join(__dirname, './lib/commands'),
path.join(__dirname, './rawScripts'),
path.join(__dirname, './rawScripts')
);
@@ -0,0 +1,16 @@
--[[
Function to add job considering priority.
]]
local function addJobWithPriority(priorityKey, priority, jobId, targetKey)
rcall("ZADD", priorityKey, priority, jobId)
local count = rcall("ZCOUNT", priorityKey, 0, priority)
local len = rcall("LLEN", targetKey)
local id = rcall("LINDEX", targetKey, len - (count - 1))
if id then
rcall("LINSERT", targetKey, "BEFORE", id, jobId)
else
rcall("RPUSH", targetKey, jobId)
end
end
@@ -0,0 +1,12 @@
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("EXISTS", queueMetaKey) ~= 1 then
return waitKey, false
else
return pausedKey, true
end
end
+3 -2
View File
@@ -4,5 +4,6 @@ const { ScriptLoader } = require('./script-loader');
const scriptLoader = new ScriptLoader();
module.exports = {
ScriptLoader, scriptLoader
}
ScriptLoader,
scriptLoader
};
@@ -5,7 +5,8 @@
KEYS[1] 'delayed'
KEYS[2] 'wait'
KEYS[3] 'paused'
KEYS[4] 'priority'
KEYS[4] 'meta-paused'
KEYS[5] 'priority'
ARGV[1] queue.toKey('')
ARGV[2] jobId
@@ -17,30 +18,20 @@
local rcall = redis.call;
local jobId = ARGV[2]
if redis.call("ZREM", KEYS[1], jobId) == 1 then
-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
if rcall("ZREM", KEYS[1], jobId) == 1 then
local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0
local target = KEYS[2];
if rcall("EXISTS", KEYS[3]) == 1 then
target = KEYS[3]
end
local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[3])
if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[4], priority, jobId)
local count = rcall("ZCOUNT", KEYS[4], 0, priority)
local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end
addJobWithPriority(KEYS[5], priority, jobId, target)
end
-- Emit waiting event (wait..ing@token)
@@ -4,10 +4,11 @@
Input:
KEYS[1] 'active',
KEYS[2] 'wait'
KEYS[3] jobId
KEYS[3] jobId key
KEYS[4] 'meta-paused'
KEYS[5] 'paused'
KEYS[6] stalled key
KEYS[7] 'priority'
ARGV[1] pushCmd
ARGV[2] jobId
@@ -22,6 +23,11 @@
-2 - Job Not locked
]]
local rcall = redis.call
-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
if rcall("EXISTS", KEYS[3]) == 1 then
-- Check for job lock
@@ -37,14 +43,16 @@ if rcall("EXISTS", KEYS[3]) == 1 then
rcall("LREM", KEYS[1], 0, ARGV[2])
local target
if rcall("EXISTS", KEYS[4]) ~= 1 then
target = KEYS[2]
else
target = KEYS[5]
end
local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[5])
rcall(ARGV[1], target, ARGV[2])
local priority = tonumber(rcall("HGET", KEYS[3], "priority")) or 0
if priority == 0 then
-- LIFO or FIFO
rcall(ARGV[1], target, ARGV[2])
else
addJobWithPriority(KEYS[7], priority, ARGV[2], target)
end
return 0
else
+38 -69
View File
@@ -16,13 +16,7 @@ class ScriptLoaderError extends Error {
* The include stack
*/
constructor(
message,
path,
stack = [],
line,
position = 0,
) {
constructor(message, path, stack = [], line, position = 0) {
super(message);
// Ensure the name of this error is the same as the class name
this.name = this.constructor.name;
@@ -33,8 +27,7 @@ class ScriptLoaderError extends Error {
}
}
const isPossiblyMappedPath = (path) =>
path && ['~', '<'].includes(path[0]);
const isPossiblyMappedPath = path => path && ['~', '<'].includes(path[0]);
/**
* Lua script loader with include support
@@ -97,7 +90,7 @@ class ScriptLoader {
throw new ScriptLoaderError(
`No path mapping found for "${name}"`,
scriptName,
stack,
stack
);
}
scriptName = path.join(mappedPath, scriptName.substring(p + 1));
@@ -114,19 +107,14 @@ class ScriptLoader {
* multiple times, we make sure to load it only once.
* @param stack - internal stack to prevent circular references
*/
async resolveDependencies(
file,
cache,
isInclude = false,
stack = [],
) {
async resolveDependencies(file, cache, isInclude = false, stack = []) {
cache = cache ? cache : new Map();
if (stack.includes(file.path)) {
throw new ScriptLoaderError(
`circular reference: "${file.path}"`,
file.path,
stack,
stack
);
}
stack.push(file.path);
@@ -136,7 +124,7 @@ class ScriptLoader {
const arr = content.slice(0, pos).split('\n');
return {
line: arr.length,
column: arr[arr.length - 1].length + match.indexOf('@include') + 1,
column: arr[arr.length - 1].length + match.indexOf('@include') + 1
};
}
@@ -144,7 +132,7 @@ class ScriptLoader {
const pos = findPos(file.content, match);
throw new ScriptLoaderError(msg, file.path, stack, pos.line, pos.column);
}
// eslint-disable-next-line node/no-unpublished-require
// eslint-disable-next-line node/no-unpublished-require
const minimatch = require('minimatch');
if (!minimatch) {
@@ -152,7 +140,7 @@ class ScriptLoader {
}
const Minimatch = minimatch.Minimatch || class Empty {};
// eslint-disable-next-line node/no-unpublished-require
// eslint-disable-next-line node/no-unpublished-require
const fg = require('fast-glob');
if (!fg) {
@@ -162,21 +150,21 @@ class ScriptLoader {
const nonOp = () => {
return [''];
};
const glob = (fg) ? fg.glob : nonOp;
const glob = fg ? fg.glob : nonOp;
const hasMagic = (pattern) => {
const hasMagic = pattern => {
if (!Array.isArray(pattern)) {
pattern = [pattern];
}
for (const p of pattern) {
if ((new Minimatch(p, GlobOptions)).hasMagic()) {
if (new Minimatch(p, GlobOptions).hasMagic()) {
return true;
}
}
return false;
};
const hasFilenamePattern = (path) => hasMagic(path);
const hasFilenamePattern = path => hasMagic(path);
async function getFilenamesByPattern(pattern) {
return glob(pattern, { dot: true });
@@ -198,14 +186,12 @@ class ScriptLoader {
if (hasFilenamePattern(includeFilename)) {
const filesMatched = await getFilenamesByPattern(includeFilename);
includePaths = filesMatched.map((x) => path.resolve(x));
includePaths = filesMatched.map(x => path.resolve(x));
} else {
includePaths = [includeFilename];
}
includePaths = includePaths.filter(
(file) => path.extname(file) === '.lua',
);
includePaths = includePaths.filter(file => path.extname(file) === '.lua');
if (includePaths.length === 0) {
raiseError(`include not found: "${reference}"`, match);
@@ -216,9 +202,7 @@ class ScriptLoader {
for (let i = 0; i < includePaths.length; i++) {
const includePath = includePaths[i];
const hasInclude = file.includes.find(
(x) => x.path === includePath,
);
const hasInclude = file.includes.find(x => x.path === includePath);
if (hasInclude) {
/**
@@ -229,7 +213,7 @@ class ScriptLoader {
*/
raiseError(
`file "${reference}" already included in "${file.path}"`,
match,
match
);
}
@@ -243,7 +227,7 @@ class ScriptLoader {
const buf = await readFile(includePath, { flag: 'r' });
childContent = buf.toString();
} catch (err) {
if ((err).code === 'ENOENT') {
if (err.code === 'ENOENT') {
raiseError(`include not found: "${reference}"`, match);
} else {
throw err;
@@ -257,7 +241,7 @@ class ScriptLoader {
path: includePath,
content: childContent,
token,
includes: [],
includes: []
};
cache.set(includePath, includeMetadata);
} else {
@@ -292,13 +276,9 @@ class ScriptLoader {
* @param content - the content of the script
* @param cache - cache
*/
async parseScript(
filename,
content,
cache,
) {
async parseScript(filename, content, cache) {
const { name, numberOfKeys } = splitFilename(filename);
const meta = cache ? cache.get(name):undefined;
const meta = cache ? cache.get(name) : undefined;
if (meta && meta.content === content) {
return meta;
}
@@ -308,7 +288,7 @@ class ScriptLoader {
content,
name,
numberOfKeys,
includes: [],
includes: []
};
await this.resolveDependencies(fileInfo, cache);
@@ -323,8 +303,8 @@ class ScriptLoader {
interpolate(file, processed) {
processed = processed || new Set();
let content = file.content;
file.includes.forEach((child) => {
const emitted = processed? processed.has(child.path):undefined;
file.includes.forEach(child => {
const emitted = processed ? processed.has(child.path) : undefined;
const fragment = this.interpolate(child, processed);
const replacement = emitted ? '' : fragment;
@@ -337,7 +317,7 @@ class ScriptLoader {
content = replaceAll(content, child.token, '');
}
if(processed){
if (processed) {
processed.add(child.path);
}
});
@@ -345,14 +325,11 @@ class ScriptLoader {
return content;
}
async loadCommand(
filename,
cache,
) {
async loadCommand(filename, cache) {
filename = path.resolve(filename);
const { name: scriptName } = splitFilename(filename);
let script = cache? cache.get(scriptName) : undefined;
let script = cache ? cache.get(scriptName) : undefined;
if (!script) {
const content = (await readFile(filename)).toString();
script = await this.parseScript(filename, content, cache);
@@ -363,7 +340,7 @@ class ScriptLoader {
return {
name,
options: { numberOfKeys: numberOfKeys, lua },
options: { numberOfKeys: numberOfKeys, lua }
};
}
@@ -379,10 +356,7 @@ class ScriptLoader {
* moveToFinish-3.lua
*
*/
async loadScripts(
dir,
cache,
) {
async loadScripts(dir, cache) {
dir = path.normalize(dir || __dirname);
let commands = this.commandCache.get(dir);
@@ -392,9 +366,7 @@ class ScriptLoader {
const files = await readdir(dir);
const luaFiles = files.filter(
(file) => path.extname(file) === '.lua',
);
const luaFiles = files.filter(file => path.extname(file) === '.lua');
if (luaFiles.length === 0) {
/**
@@ -424,11 +396,7 @@ class ScriptLoader {
* @param client - redis client to attach script to
* @param pathname - the path to the directory containing the scripts
*/
async load(
client,
pathname,
cache,
) {
async load(client, pathname, cache) {
let paths = this.clientScripts.get(client);
if (!paths) {
paths = new Set();
@@ -438,11 +406,11 @@ class ScriptLoader {
paths.add(pathname);
const scripts = await this.loadScripts(
pathname,
cache ? cache : new Map(),
cache ? cache : new Map()
);
scripts.forEach((command) => {
scripts.forEach(command => {
// Only define the command if not already defined
if (!(client)[command.name]) {
if (!client[command.name]) {
client.defineCommand(command.name, command.options);
}
});
@@ -501,9 +469,9 @@ function getCallerFile() {
try {
Error.prepareStackTrace = (_, stack) => stack;
const sites = (new Error().stack);
const sites = new Error().stack;
const shiftResponse = sites.shift();
const currentFile = shiftResponse ? shiftResponse.getFileName():undefined;
const currentFile = shiftResponse ? shiftResponse.getFileName() : undefined;
while (sites.length) {
const newShiftResponse = sites.shift();
@@ -523,7 +491,8 @@ function getCallerFile() {
}
function sha1(data) {
return createHash('sha1').update(data)
return createHash('sha1')
.update(data)
.digest('hex');
}
@@ -542,4 +511,4 @@ function removeEmptyLines(str) {
module.exports = {
ScriptLoaderError,
ScriptLoader
}
};
+6 -7
View File
@@ -146,18 +146,17 @@ const Queue = function Queue(name, url, opts) {
this.clients = [];
const loadCommands = (providedScripts, client) => {
const finalScripts =
providedScripts || (scripts);
const finalScripts = providedScripts || scripts;
for (const property in finalScripts) {
// Only define the command if not already defined
if (!(client)[finalScripts[property].name]) {
(client).defineCommand(finalScripts[property].name, {
if (!client[finalScripts[property].name]) {
client.defineCommand(finalScripts[property].name, {
numberOfKeys: finalScripts[property].keys,
lua: finalScripts[property].content,
lua: finalScripts[property].content
});
}
}
}
};
const lazyClient = redisClientGetter(this, opts, (type, client) => {
// bubble up Redis error events
@@ -166,7 +165,7 @@ const Queue = function Queue(name, url, opts) {
this.once('close', () => client.removeListener('error', handler));
if (type === 'client') {
this._initializing = (async() => loadCommands(commands, client))().then(
this._initializing = (async () => loadCommands(commands, client))().then(
() => {
debuglog(name + ' queue ready');
},
+4 -6
View File
@@ -109,7 +109,7 @@ const scripts = {
progressJson,
JSON.stringify({ jobId: job.id, progress })
])
.then((code) => {
.then(code => {
if (code < 0) {
throw scripts.finishedErrors(code, job.id, 'updateProgress');
}
@@ -124,10 +124,7 @@ const scripts = {
});
const dataJson = JSON.stringify(data);
return queue.client
.updateData(keys, [
dataJson
]);
return queue.client.updateData(keys, [dataJson]);
},
retryJobsArgs(queue, count) {
@@ -426,6 +423,7 @@ const scripts = {
queue.keys.delayed,
queue.keys.wait,
queue.keys.paused,
queue.keys['meta-paused'],
queue.keys.priority
];
@@ -478,7 +476,7 @@ const scripts = {
const jobId = job.id;
const keys = _.map(
['active', 'wait', jobId, 'meta-paused', 'paused', 'stalled'],
['active', 'wait', jobId, 'meta-paused', 'paused', 'stalled', 'priority'],
name => {
return queue.toKey(name);
}
+4 -2
View File
@@ -206,7 +206,7 @@ describe('Job', () => {
it('throws an error', async () => {
const job = await Job.create(queue, { foo: 'bar' });
await job.remove();
await job.update({baz: 'qux'}).catch(err => {
await job.update({ baz: 'qux' }).catch(err => {
expect(err.message).to.be.equal('Missing key for job 1 updateData');
});
});
@@ -548,7 +548,9 @@ describe('Job', () => {
const job = await Job.create(queue, { foo: 'bar' });
await job.remove();
await job.progress({ total: 120, completed: 40 }).catch(err => {
expect(err.message).to.be.equal('Missing key for job 1 updateProgress');
expect(err.message).to.be.equal(
'Missing key for job 1 updateProgress'
);
});
});
});
+54
View File
@@ -2454,6 +2454,60 @@ describe('Queue', () => {
});
});
describe('when job has more priority than delayed jobs', () => {
it('executes retried job first', done => {
queue = utils.buildQueue('test retries and priority');
let id = 0;
queue.isReady().then(() => {
queue.process(async job => {
await delay(200);
if (job.attemptsMade === 0) {
id++;
expect(job.id).to.be.eql(`${id}`);
}
if (job.id == '1' && job.attemptsMade < 1) {
throw new Error('Not yet!');
}
});
queue.add(
{ foo: 'bar' },
{
attempts: 2,
priority: 1
}
);
queue.add(
{},
{
delay: 200,
priority: 2
}
);
queue.add(
{},
{
delay: 200,
priority: 2
}
);
queue.add(
{},
{
delay: 200,
priority: 2
}
);
});
let count = 0;
queue.on('completed', () => {
if (count++ === 3) {
done();
}
});
});
});
it('should not retry a failed job more than the number of given attempts times', done => {
queue = utils.buildQueue('test retries and backoffs');
let tries = 0;