mirror of
https://github.com/OptimalBits/bull.git
synced 2026-07-02 08:27:43 +08:00
feat(queue): emit internal duplicated event (#2754)
This commit is contained in:
@@ -51,7 +51,7 @@ else
|
||||
jobId = ARGV[2]
|
||||
jobIdKey = ARGV[1] .. jobId
|
||||
if rcall("EXISTS", jobIdKey) == 1 then
|
||||
rcall("PUBLISH", ARGV[1] .. "duplicated", jobId)
|
||||
rcall("PUBLISH", ARGV[1] .. "duplicated@" .. ARGV[11], jobId)
|
||||
return jobId .. "" -- convert to string
|
||||
end
|
||||
end
|
||||
|
||||
+4
-1
@@ -440,6 +440,9 @@ Queue.prototype._setupQueueEventListeners = function() {
|
||||
utils.emitSafe(this, 'global:stalled', message);
|
||||
break;
|
||||
case duplicatedKey:
|
||||
if (this.token === token) {
|
||||
utils.emitSafe(this, 'duplicated', message);
|
||||
}
|
||||
utils.emitSafe(this, 'global:duplicated', message);
|
||||
break;
|
||||
}
|
||||
@@ -510,7 +513,7 @@ Queue.prototype._setupQueueEventListeners = function() {
|
||||
};
|
||||
|
||||
Queue.prototype._registerEvent = function(eventName) {
|
||||
const internalEvents = ['waiting', 'delayed'];
|
||||
const internalEvents = ['waiting', 'delayed', 'duplicated'];
|
||||
|
||||
if (
|
||||
eventName.startsWith('global:') ||
|
||||
|
||||
+20
-1
@@ -1098,7 +1098,7 @@ describe('Queue', () => {
|
||||
});
|
||||
|
||||
describe('when job has been added again', () => {
|
||||
it('emits duplicated event', async () => {
|
||||
it('emits global duplicated event', async () => {
|
||||
queue.process(
|
||||
async () => {
|
||||
await delay(50);
|
||||
@@ -1116,6 +1116,25 @@ describe('Queue', () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('emits duplicated event', async () => {
|
||||
queue.process(
|
||||
async () => {
|
||||
await delay(50);
|
||||
await queue.add({ foo: 'bar' }, { jobId: 'a1' });
|
||||
await delay(50);
|
||||
}
|
||||
);
|
||||
|
||||
await queue.add({ foo: 'bar' }, { jobId: 'a1' });
|
||||
|
||||
await new Promise(resolve => {
|
||||
queue.once('duplicated', (jobId) => {
|
||||
expect(jobId).to.be.equal('a1');
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('process a job that updates progress', done => {
|
||||
|
||||
Reference in New Issue
Block a user