<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title></title>
</head>
<body></body>
<script>
class ConcurrentScheduler {
constructor(maxConcurrency = 3) {
this.maxConcurrency = maxConcurrency;
this.activeCount = 0;
this.queue = [];
this.abortControllers = new Map();
}
* 添加任务到调度器
* @param {Function} task - 返回Promise的异步函数
* @returns {Promise} 包含任务结果的Promise,可被外部中止
*/
add(task) {
const controller = new AbortController();
const { signal } = controller;
const wrappedTask = async () => {
try {
this.activeCount++;
const result = await task({ signal });
return result;
} finally {
this.activeCount--;
this.#runNext();
}
};
const taskPromise = new Promise((resolve, reject) => {
this.queue.push({
task: wrappedTask,
resolve,
reject,
signal,
});
this.abortControllers.set(signal, controller);
this.#runNext();
});
taskPromise.signal = signal;
return taskPromise;
}
#runNext() {
if (this.activeCount >= this.maxConcurrency) return;
const nextTask = this.queue.shift();
if (!nextTask) return;
if (nextTask.signal.aborted) {
nextTask.reject(new DOMException("Aborted", "AbortError"));
return this.#runNext();
}
nextTask
.task()
.then(nextTask.resolve)
.catch(nextTask.reject)
.finally(() => {
this.abortControllers.delete(nextTask.signal);
});
this.activeCount++;
this.#runNext();
}
* 中止特定任务
* @param {AbortSignal} signal - 任务对应的AbortSignal
*/
abort(signal) {
const controller = this.abortControllers.get(signal);
if (controller) {
controller.abort();
const index = this.queue.findIndex((item) => item.signal === signal);
if (index !== -1) {
const [removedTask] = this.queue.splice(index, 1);
removedTask.reject(new DOMException("Aborted", "AbortError"));
this.abortControllers.delete(signal);
}
}
}
* 中止所有任务
*/
abortAll() {
this.queue = [];
for (const controller of this.abortControllers.values()) {
controller.abort();
}
}
}
const createTask =
(id, delay) =>
async ({ signal }) => {
try {
console.log(`Task ${id} started`);
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
console.log(`Task ${id} completed`);
resolve();
}, delay);
signal.addEventListener("abort", () => {
clearTimeout(timeout);
reject(new Error(`Task ${id} aborted`));
});
});
return `Result ${id}`;
} catch (err) {
throw err;
}
};
const scheduler = new ConcurrentScheduler(2);
const task1 = scheduler.add(createTask(1, 2000));
const task2 = scheduler.add(createTask(2, 1000));
const task3 = scheduler.add(createTask(3, 1500));
setTimeout(() => {
scheduler.abort(task2.signal);
}, 500);
Promise.allSettled([task1, task2, task3]).then((results) => {
results.forEach((result, index) => {
console.log(`Task ${index + 1}:`, result.status);
});
});
Task 1 started
Task 1 completed
Task 3 started
Task 3 completed
Task 1: fulfilled
Task 2: rejected
Task 3: fulfilled
*/
</script>
</html>