前端前端
fabric
算法
jsBlock
styleBlock
工具其他
Vue、React相关
canvas
webgl
GitHub
fabric
算法
jsBlock
styleBlock
工具其他
Vue、React相关
canvas
webgl
GitHub
  • 1_1_带并发数限制的请求调度器
  • parseInt
  • preventDefault报错
  • promise
  • requestIdleCallback
  • undefined
  • webworker
  • 代码块
  • 前后端自动刷新
  • 多个标签页通讯
  • 实现个简单爬虫
  • 执行上下文
  • 控制台
  • 查找文件中的图片并下载
  • 深度广度优先
  • 累加函数
  • 触摸事件和键盘事件

带并发数限制的请求调度器

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title></title>
  </head>
  <body></body>

  <script>
    class ConcurrentScheduler {
      constructor(maxConcurrency = 3) {
        this.maxConcurrency = maxConcurrency; // 最大并发数
        this.activeCount = 0; // 当前活跃任务数
        this.queue = []; // 等待队列
        this.abortControllers = new Map(); // 存储每个任务的AbortController
      }

      /**
       * 添加任务到调度器
       * @param {Function} task - 返回Promise的异步函数
       * @returns {Promise} 包含任务结果的Promise,可被外部中止
       */
      add(task) {
        // 为每个任务创建AbortController
        const controller = new AbortController();
        const { signal } = controller;

        // 包装原始任务
        const wrappedTask = async () => {
          try {
            this.activeCount++;
            const result = await task({ signal }); // 将signal传给任务
            return result;
          } finally {
            this.activeCount--;
            this.#runNext(); // 任务完成后触发下一个任务
          }
        };

        // 创建一个对象,包含 Promise 和 signal
        const taskPromise = new Promise((resolve, reject) => {
          this.queue.push({
            task: wrappedTask,
            resolve,
            reject,
            signal,
          });
          this.abortControllers.set(signal, controller);
          this.#runNext();
        });

        // 将 signal 附加到返回的 Promise 上
        taskPromise.signal = signal;
        return taskPromise;
      }

      // 执行下一个任务(私有方法)
      #runNext() {
        if (this.activeCount >= this.maxConcurrency) return;

        const nextTask = this.queue.shift();
        if (!nextTask) return;

        // 监听外部中止信号
        if (nextTask.signal.aborted) {
          nextTask.reject(new DOMException("Aborted", "AbortError"));
          return this.#runNext();
        }

        // 执行任务并处理结果
        nextTask
          .task()
          .then(nextTask.resolve)
          .catch(nextTask.reject)
          .finally(() => {
            this.abortControllers.delete(nextTask.signal);
          });

        this.activeCount++;
        this.#runNext(); // 递归执行以填充并发槽位
      }

      /**
       * 中止特定任务
       * @param {AbortSignal} signal - 任务对应的AbortSignal
       */
      abort(signal) {
        const controller = this.abortControllers.get(signal);
        if (controller) {
          controller.abort();

          // 从队列中删除尚未执行的对应任务
          const index = this.queue.findIndex((item) => item.signal === signal);
          if (index !== -1) {
            const [removedTask] = this.queue.splice(index, 1);
            removedTask.reject(new DOMException("Aborted", "AbortError"));
            this.abortControllers.delete(signal);
          }
        }
      }

      /**
       * 中止所有任务
       */
      abortAll() {
        this.queue = []; // 清空等待队列
        for (const controller of this.abortControllers.values()) {
          controller.abort();
        }
      }
    }

    const createTask =
      (id, delay) =>
      async ({ signal }) => {
        try {
          console.log(`Task ${id} started`);
          await new Promise((resolve, reject) => {
            const timeout = setTimeout(() => {
              console.log(`Task ${id} completed`);
              resolve();
            }, delay);

            // 监听中止信号
            signal.addEventListener("abort", () => {
              clearTimeout(timeout);
              reject(new Error(`Task ${id} aborted`));
            });
          });
          return `Result ${id}`;
        } catch (err) {
          throw err;
        }
      };

    // 1. 创建调度器(最大并发数=2)
    const scheduler = new ConcurrentScheduler(2);

    // 3. 添加任务
    const task1 = scheduler.add(createTask(1, 2000));
    const task2 = scheduler.add(createTask(2, 1000));
    const task3 = scheduler.add(createTask(3, 1500));

    // 4. 执行中止操作(演示用)
    setTimeout(() => {
      scheduler.abort(task2.signal); // 中止第2个任务
    }, 500);

    // 5. 处理结果
    Promise.allSettled([task1, task2, task3]).then((results) => {
      results.forEach((result, index) => {
        console.log(`Task ${index + 1}:`, result.status);
      });
    });

    /* 输出:
    Task 1 started
    Task 1 completed
    Task 3 started
    Task 3 completed

    Task 1: fulfilled
    Task 2: rejected
    Task 3: fulfilled
    */
  </script>
</html>
Edit this page
最近更新:: 2025/12/10 16:53
Contributors: wu.hui
Next
parseInt