나는 두 가지 기능을 가지고 scheduleScan()
와 scan()
.
scan()
호출은 scheduleScan()
새 검사 일정을 제외 할 다른 아무것도 할 때 , 그래서 scheduleScan()
을 예약 할 수 있습니다 scan()
. 그러나 문제가 있습니다. 일부 작업은 두 번 실행됩니다.
주어진 시간에 하나의 작업 만 처리되도록하고 싶습니다. 어떻게하면 되나요? 나는 그것이 done()
scan ()에 있었고 지금은 제거 된 것과 관련이 있다고 생각 하지만 해결책을 찾지 못했습니다.
불 버전 : 3.12.1
중요한 늦은 편집 : scan()
다른 함수를 호출하고 다른 함수를 호출하거나 호출하지 않을 수 있지만 모두 동기화 함수이므로 자신의 작업이 완료되었을 때만 함수를 호출하므로 한 가지 방법 만 있습니다. “트리”의 끝에서 마지막 함수는 scheduleScan ()을 호출하지만 두 개의 동시 작업을 실행할 수는 없습니다. 모든 단일 작업은에서 시작하여 scan()
끝납니다.scheduleScan(stock, period, milliseconds, 'called by file.js')
export function update(job) {
// does some calculations, then it may call scheduleScan() or
// it may call another function, and that could be the one calling
// scheduleScan() function.
// For instance, a function like finalize()
}
export function scan(job) {
update(job)
}
import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)
queue.process(1, (job) => {
job.progress(100).then(() => {
scan(job)
})
})
export function scheduleScan (stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ':' + period)
queue.getJob(uniqueId).then(job => {
if (!job) {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
} else {
job.getState().then(state => {
if (state === 'completed') {
job.remove().then(() => {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
}).catch(err => {
if (err) {
// console.log(err)
}
})
}
}).catch(err => {
// console.log(err)
})
}
})
}
답변
문제는 귀하의 scan
기능이 비동기 라고 생각합니다 . 따라서 job.progress
함수 호출 scan
후 즉시 호출 done
하여 큐가 다른 작업을 처리 할 수 있도록합니다.
해결책은 done
콜백을 매개 변수로 함수 scan
와 scheduleScan
함수에 전달하고 작업을 완료하거나 오류가 발생하면 호출하는 것입니다.
또 다른 (더 나은) 솔루션은 항상를 반환하도록 할 수 Promise
에서 scan
하고 scheduleScan
, 다음 해결 한 후 전화 약속을 기다리고 있습니다 done
. 이 작업을 수행하는 경우 모든 약속 약속을 scheduleScan
기능 에 연결해야 합니다.
queue.process(1, (job, done) => {
job.progress(100).then(() => {
scan(job)
.then(done)
.catch(done)
})
})
export function scan() {
// business logic
return scheduleScan()
}
// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
return queue.getJob(..).then(() => {
....
return queue.add()...
....
return queue.add(...)
.catch(e => {
console.log(e);
// propogate errors!
throw e;
})
}
답변
스캔 기능은 비동기 기능입니다. 당신이 queue.process()
기능을 사용하면 스캔 기능을 기다리고 다음 호출 할 필요가 done()
콜백을.
export async function scan(job) {
// it does some calculations, then it creates a new schedule.
return scheduleScan(stock, period, milliseconds, "scan.js");
}
queue.process(1, (job, done) => {
job.progress(100).then(async() => {
await scan(job);
done();
});
});
export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ":" + period);
try {
const existingJob = await queue.getJob(uniqueId);
if (!existingJob) {
const job = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return job;
} else {
const jobState = await existingJob.getState();
if (jobState === "completed") {
await existingJob.remove();
const newJob = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return newJob;
}
}
} catch (err) {
throw new Error(err);
}
}
export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
if (milliseconds) {
return queue.add(
{ stock, period, triggeredBy },
{ delay: milliseconds, jobId: uniqueId }
);
} else {
return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
}
}
이 시도! async-await를 사용하여 코드를 약간 리팩토링하려고했습니다.