mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 00:17:41 +08:00
fix(sandbox): broken processor files should fail jobs
This commit is contained in:
@@ -58,9 +58,14 @@ ChildPool.prototype.retain = function(processFile) {
|
||||
|
||||
child.on('exit', _this.remove.bind(_this, child));
|
||||
|
||||
return initChild(child, child.processFile).then(() => {
|
||||
return child;
|
||||
});
|
||||
return initChild(child, child.processFile)
|
||||
.then(() => {
|
||||
return child;
|
||||
})
|
||||
.catch(err => {
|
||||
this.remove(child);
|
||||
throw new Error(err);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
@@ -106,12 +111,14 @@ ChildPool.prototype.getAllFree = function() {
|
||||
};
|
||||
|
||||
async function initChild(child, processFile) {
|
||||
const onComplete = new Promise(resolve => {
|
||||
const onComplete = new Promise((resolve, reject) => {
|
||||
const onMessageHandler = msg => {
|
||||
if (msg.cmd === 'init-complete') {
|
||||
resolve();
|
||||
child.off('message', onMessageHandler);
|
||||
} else {
|
||||
reject(msg.value);
|
||||
}
|
||||
child.off('message', onMessageHandler);
|
||||
};
|
||||
child.on('message', onMessageHandler);
|
||||
});
|
||||
@@ -122,11 +129,11 @@ async function initChild(child, processFile) {
|
||||
await onComplete;
|
||||
}
|
||||
function ChildPoolSingleton(isSharedChildPool = false) {
|
||||
if(isSharedChildPool === false) {
|
||||
if (isSharedChildPool === false) {
|
||||
return new ChildPool();
|
||||
}
|
||||
else if (
|
||||
(!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined)
|
||||
} else if (
|
||||
!(this instanceof ChildPool) &&
|
||||
ChildPoolSingleton.instance === undefined
|
||||
) {
|
||||
ChildPoolSingleton.instance = new ChildPool();
|
||||
}
|
||||
|
||||
+11
-2
@@ -52,7 +52,16 @@ process.on('SIGINT', waitForCurrentJobAndExit);
|
||||
process.on('message', msg => {
|
||||
switch (msg.cmd) {
|
||||
case 'init':
|
||||
processor = require(msg.value);
|
||||
try {
|
||||
processor = require(msg.value);
|
||||
} catch (err) {
|
||||
status = 'Errored';
|
||||
return process.send({
|
||||
cmd: 'error',
|
||||
value: `Error loading process file ${msg.value}. ${err.toString()}`
|
||||
});
|
||||
}
|
||||
|
||||
if (processor.default) {
|
||||
// support es2015 module.
|
||||
processor = processor.default;
|
||||
@@ -119,7 +128,7 @@ process.on('uncaughtException', err => {
|
||||
cmd: 'failed',
|
||||
value: err
|
||||
});
|
||||
|
||||
|
||||
// An uncaughException leaves this process in a potentially undetermined state so
|
||||
// we must exit
|
||||
process.exit(-1);
|
||||
|
||||
+2
-1
@@ -217,7 +217,7 @@ const Queue = function Queue(name, url, opts) {
|
||||
retryProcessDelay: 5000,
|
||||
drainDelay: 5,
|
||||
backoffStrategies: {},
|
||||
isSharedChildPool: false,
|
||||
isSharedChildPool: false
|
||||
});
|
||||
|
||||
this.settings.lockRenewTime =
|
||||
@@ -285,6 +285,7 @@ function redisClientGetter(queue, options, initCallback) {
|
||||
}
|
||||
const clientOptions = _.assign({}, options.redis);
|
||||
clientOptions.connectionName = this.clientName();
|
||||
|
||||
const client = (connections[type] = createClient(type, clientOptions));
|
||||
|
||||
const opts = client.options.redisOptions || client.options;
|
||||
|
||||
+2
@@ -0,0 +1,2 @@
|
||||
'use strict';
|
||||
throw new Error('Broken file processor');
|
||||
@@ -462,27 +462,38 @@ describe('sandboxed process', () => {
|
||||
]);
|
||||
|
||||
const processFile = __dirname + '/fixtures/fixture_processor.js';
|
||||
queueA.process(processFile)
|
||||
queueB.process(processFile)
|
||||
queueA.process(processFile);
|
||||
queueB.process(processFile);
|
||||
|
||||
await Promise.all([queueA.add(), queueB.add()]);
|
||||
|
||||
|
||||
expect(queueA.childPool).to.be.eql(queueB.childPool);
|
||||
});
|
||||
|
||||
it('should not share childPool across different queues if isSharedChildPool isn\'t specified', async () => {
|
||||
it("should not share childPool across different queues if isSharedChildPool isn't specified", async () => {
|
||||
const [queueA, queueB] = await Promise.all([
|
||||
utils.newQueue('queueA', { settings: { isSharedChildPool: false } }),
|
||||
utils.newQueue('queueB')
|
||||
]);
|
||||
|
||||
const processFile = __dirname + '/fixtures/fixture_processor.js';
|
||||
queueA.process(processFile)
|
||||
queueB.process(processFile)
|
||||
queueA.process(processFile);
|
||||
queueB.process(processFile);
|
||||
|
||||
await Promise.all([queueA.add(), queueB.add()]);
|
||||
|
||||
expect(queueA.childPool).to.not.be.equal(queueB.childPool);
|
||||
})
|
||||
});
|
||||
|
||||
it('should fail if the process file is broken', async () => {
|
||||
const processFile = __dirname + '/fixtures/fixture_processor_broken.js';
|
||||
queue.process(processFile);
|
||||
await queue.add('test', {});
|
||||
|
||||
return new Promise(resolve => {
|
||||
queue.on('failed', () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user