mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 00:17:41 +08:00
Merge pull request #1536 from douglascayers/sandboxed-job-progress
Emulate job.progress function for sandboxed processors
This commit is contained in:
+32
-5
@@ -99,14 +99,41 @@ process.on('uncaughtException', err => {
|
||||
throw err;
|
||||
});
|
||||
|
||||
/**
|
||||
* Enhance the given job argument with some functions
|
||||
* that can be called from the sandboxed job processor.
|
||||
*
|
||||
* Note, the `job` argument is a JSON deserialized message
|
||||
* from the main node process to this forked child process,
|
||||
* the functions on the original job object are not in tact.
|
||||
* The wrapped job adds back some of those original functions.
|
||||
*/
|
||||
function wrapJob(job) {
|
||||
/*
|
||||
* Emulate the real job `progress` function.
|
||||
* If no argument is given, it behaves as a sync getter.
|
||||
* If an argument is given, it behaves as an async setter.
|
||||
*/
|
||||
let progressValue = job.progress;
|
||||
job.progress = function(progress) {
|
||||
process.send({
|
||||
cmd: 'progress',
|
||||
value: progress
|
||||
});
|
||||
return Promise.resolve();
|
||||
if (progress) {
|
||||
// Locally store reference to new progress value
|
||||
// so that we can return it from this process synchronously.
|
||||
progressValue = progress;
|
||||
// Send message to update job progress.
|
||||
process.send({
|
||||
cmd: 'progress',
|
||||
value: progress
|
||||
});
|
||||
return Promise.resolve();
|
||||
} else {
|
||||
// Return the last known progress value.
|
||||
return progressValue;
|
||||
}
|
||||
};
|
||||
/*
|
||||
* Emulate the real job `log` function.
|
||||
*/
|
||||
job.log = function(row) {
|
||||
process.send({
|
||||
cmd: 'log',
|
||||
|
||||
+5
-1
@@ -10,18 +10,22 @@ module.exports = function(job) {
|
||||
return delay(50)
|
||||
.then(() => {
|
||||
job.progress(10);
|
||||
job.log(job.progress());
|
||||
return delay(100);
|
||||
})
|
||||
.then(() => {
|
||||
job.progress(27);
|
||||
job.log(job.progress());
|
||||
return delay(150);
|
||||
})
|
||||
.then(() => {
|
||||
job.progress(78);
|
||||
job.log(job.progress());
|
||||
return delay(100);
|
||||
})
|
||||
.then(() => {
|
||||
return job.progress(100);
|
||||
job.progress(100);
|
||||
job.log(job.progress());
|
||||
})
|
||||
.then(() => {
|
||||
return 37;
|
||||
|
||||
@@ -208,6 +208,12 @@ describe('sandboxed process', () => {
|
||||
expect(progresses).to.be.eql([10, 27, 78, 100]);
|
||||
expect(Object.keys(queue.childPool.retained)).to.have.lengthOf(0);
|
||||
expect(queue.childPool.getAllFree()).to.have.lengthOf(1);
|
||||
queue.getJobLogs(job.id).then(logs =>
|
||||
expect(logs).to.be.eql({
|
||||
logs: ['10', '27', '78', '100'],
|
||||
count: 4
|
||||
})
|
||||
);
|
||||
done();
|
||||
} catch (err) {
|
||||
done(err);
|
||||
|
||||
Reference in New Issue
Block a user