mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 08:27:43 +08:00
feat: support job.update function in sandboxed processors
fixes #1279 , #1608, and #1056
This commit is contained in:
@@ -173,5 +173,15 @@ function wrapJob(job) {
|
||||
value: row
|
||||
});
|
||||
};
|
||||
/*
|
||||
* Emulate the real job `update` function.
|
||||
*/
|
||||
job.update = function(data) {
|
||||
process.send({
|
||||
cmd: 'update',
|
||||
value: data
|
||||
});
|
||||
job.data = data;
|
||||
};
|
||||
return job;
|
||||
}
|
||||
|
||||
@@ -36,6 +36,9 @@ module.exports = function(processFile, childPool) {
|
||||
case 'log':
|
||||
job.log(msg.value);
|
||||
break;
|
||||
case 'update':
|
||||
job.update(msg.value);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
+14
@@ -0,0 +1,14 @@
|
||||
/**
|
||||
* A processor file to be used in tests.
|
||||
*
|
||||
*/
|
||||
'use strict';
|
||||
|
||||
const delay = require('delay');
|
||||
|
||||
module.exports = function(job) {
|
||||
return delay(50).then(() => {
|
||||
job.update({ baz: 'qux' });
|
||||
return job.data;
|
||||
});
|
||||
};
|
||||
@@ -264,6 +264,24 @@ describe('sandboxed process', () => {
|
||||
queue.add({ foo: 'bar' });
|
||||
});
|
||||
|
||||
it('should process and update data', done => {
|
||||
queue.process(__dirname + '/fixtures/fixture_processor_data.js');
|
||||
|
||||
queue.on('completed', (job, value) => {
|
||||
try {
|
||||
expect(job.data).to.be.eql({ baz: 'qux' });
|
||||
expect(value).to.be.eql({ baz: 'qux' });
|
||||
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
|
||||
expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
|
||||
done();
|
||||
} catch (err) {
|
||||
done(err);
|
||||
}
|
||||
});
|
||||
|
||||
queue.add({ foo: 'bar' });
|
||||
});
|
||||
|
||||
it('should process and fail', done => {
|
||||
queue.process(__dirname + '/fixtures/fixture_processor_fail.js');
|
||||
|
||||
|
||||
Reference in New Issue
Block a user