JavaScript 自定义限流队列 fetch

本文最后更新于:2020年12月30日 下午

为什么需要它

有些时候不得不需要限制并发 fetch 的请求数量,避免请求过快导致 IP 封禁

需要做到什么

  • 允许限制 fetch 请求同时存在的数量
  • 时间过久便认为是超时了

如何实现

暂停请求

该方法的请求是无序的!

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前判断当前请求的数量,添加请求等待数量
    1. 如果请求数量已满,则进行等待
    2. 如果请求数量未满,则删除一个请求等待数量
  3. 请求完成,删除当前请求数量

等待队列:循环监听

该方法需要使用回调函数

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 使用 setInterval 函数持续监听队列和当前执行的请求数
    • 发现请求数量没有到达最大值,且等待队列中还有值,那么就执行一次请求

等待队列:触发钩子

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 添加完成,等待当前请求数量未满
  4. 尝试启动等待队列(钩子)

实现代码

暂停请求实现

/**
 * 等待指定的时间/等待指定表达式成立
 * @param {Number|Function} param 等待时间/等待条件
 * @returns {Promise} Promise 对象
 */
function wait(param) {
  return new Promise((resolve) => {
    if (typeof param === "number") {
      setTimeout(resolve, param);
    } else if (typeof param === "function") {
      var timer = setInterval(() => {
        if (param()) {
          clearInterval(timer);
          resolve();
        }
      }, 100);
    } else {
      resolve();
    }
  });
}
/**
 * 为 fetch 请求添加超时选项
 * 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
 * @param {Promise} fetchPromise fetch 请求的 Promise
 * @param {Number} timeout 超时时间
 * @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
 */
function promiseTimeout(fetchPromise, timeout) {
  var abortFn = null;
  //这是一个可以被reject的promise
  var abortPromise = new Promise(function (resolve, reject) {
    abortFn = function () {
      reject("abort promise");
    };
  });
  var abortablePromise = Promise.race([fetchPromise, abortPromise]);
  setTimeout(function () {
    abortFn();
  }, timeout);

  return abortablePromise;
}
/**
 * 限制并发请求数量的 fetch 封装
 */
class RequestLimiting {
  constructor({ timeout = 10000, limit = 10 }) {
    this.timeout = timeout;
    this.limit = limit;
    this.execCount = 0;
    this.waitCount = 0;
  }

  /**
   * 执行一个请求
   * 如果到达最大并发限制时就进行等待
   * 注:该方法的请求顺序是无序的,与代码里的顺序无关
   * @param {RequestInfo} url 请求 url 信息
   * @param {RequestInit} init 请求的其他可选项
   * @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
   */
  async _fetch(url, init) {
    this.waitCount++;
    await wait(() => this.execCount < this.limit);
    this.waitCount--;
    this.execCount++;
    try {
      return await promiseTimeout(fetch(url, init), this.timeout);
    } finally {
      this.execCount--;
    }
  }
}

使用示例

const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((i) =>
  requestLimiting
    ._fetch("/")
    .then((res) => console.log(res))
    .catch((err) => console.log(err))
);

等待队列:循环监听实现

/**
 * 等待指定的时间/等待指定表达式成立
 * @param {Number|Function} param 等待时间/等待条件
 * @returns {Promise} Promise 对象
 */
function wait(param) {
  return new Promise((resolve) => {
    if (typeof param === "number") {
      setTimeout(resolve, param);
    } else if (typeof param === "function") {
      var timer = setInterval(() => {
        if (param()) {
          clearInterval(timer);
          resolve();
        }
      }, 100);
    } else {
      resolve();
    }
  });
}
/**
 * 为 fetch 请求添加超时选项
 * 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
 * @param {Promise} fetchPromise fetch 请求的 Promise
 * @param {Number} timeout 超时时间
 * @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
 */
function promiseTimeout(fetchPromise, timeout) {
  var abortFn = null;

  //这是一个可以被reject的promise
  var abortPromise = new Promise(function (resolve, reject) {
    abortFn = function () {
      reject("abort promise");
    };
  });

  var abortablePromise = Promise.race([fetchPromise, abortPromise]);

  setTimeout(function () {
    abortFn();
  }, timeout);

  return abortablePromise;
}
/**
 * 限制并发请求数量的 fetch 封装
 */
class RequestLimiting {
  constructor({ timeout = 10000, limit = 10 }) {
    this.timeout = timeout;
    this.limit = limit;
    this.execCount = 0;
    // 等待队列
    this.waitArr = [];

    // 监视 execCount 的值
    setInterval(async () => {
      if (this.execCount >= this.limit) {
        return;
      }
      console.debug(
        `执行 execCount: ${this.execCount}, waitArr length: ${
          this.waitArr.length
        }, index: ${JSON.stringify(this.waitArr[0])}`
      );
      const args = this.waitArr.shift(0);
      if (!args) {
        return;
      }
      this.execCount++;
      const callback = args[2];
      try {
        // 如果没有错误就返回 res
        callback({ res: await promiseTimeout(fetch(...args), this.timeout) });
      } catch (err) {
        // 否则返回 err
        callback({
          err: err,
        });
      } finally {
        this.execCount--;
      }
    }, 100);
  }

  /**
   * 执行一个请求
   * 如果到达最大并发限制时就进行等待
   * 注:该方法的请求顺序是无序的,与代码里的顺序无关
   * @param {RequestInfo} url 请求 url 信息
   * @param {RequestInit} init 请求的其他可选项
   * @param {Function} callback 回调函数
   * @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
   */
  async _fetch(url, init, callback) {
    this.waitArr.push(arguments);
  }
}

使用示例

const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((item, i) =>
  requestLimiting._fetch(
    "/",
    {
      // 这里设置添加时的 index,用于验证是否真的顺序执行了
      headers: {
        index: i,
      },
    },
    // 这里使用了回调函数,参数使用解构得到
    ({ res, err }) => {
      console.log(`res: ${res}, err: ${err}`);
    }
  )
);

等待队列:触发钩子实现

/**
 * 等待指定的时间/等待指定表达式成立
 * @param {Number|Function} param 等待时间/等待条件
 * @returns {Promise} Promise 对象
 */
function wait(param) {
  return new Promise((resolve) => {
    if (typeof param === "number") {
      setTimeout(resolve, param);
    } else if (typeof param === "function") {
      var timer = setInterval(() => {
        if (param()) {
          clearInterval(timer);
          resolve();
        }
      }, 100);
    } else {
      resolve();
    }
  });
}
/**
 * 为 fetch 请求添加超时选项
 * 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
 * @param {Promise} fetchPromise fetch 请求的 Promise
 * @param {Number} timeout 超时时间
 * @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
 */
function promiseTimeout(fetchPromise, timeout) {
  var abortFn = null;
  //这是一个可以被 reject 的 Promise
  var abortPromise = new Promise(function (resolve, reject) {
    abortFn = function () {
      reject("abort promise");
    };
  });
  // 有一个 Promise 完成就立刻结束
  var abortablePromise = Promise.race([fetchPromise, abortPromise]);
  setTimeout(function () {
    abortFn();
  }, timeout);
  return abortablePromise;
}
/**
 * 限制并发请求数量的 fetch 封装
 */
class RequestLimiting {
  constructor({ timeout = 10000, limit = 10 }) {
    this.timeout = timeout;
    this.limit = limit;
    this.execCount = 0;
    // 等待队列
    this.waitArr = [];
  }

  /**
   * 执行一个请求
   * 如果到达最大并发限制时就进行等待
   * 注:该方法的请求顺序是无序的,与代码里的顺序无关
   * @param {RequestInfo} url 请求 url 信息
   * @param {RequestInit} init 请求的其他可选项
   * @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
   */
  async _fetch(url, init) {
    const _innerFetch = async () => {
      console.log(
        `执行 execCount: ${this.execCount}, waitArr length: ${
          this.waitArr.length
        }, index: ${JSON.stringify(this.waitArr[0])}`
      );
      this.execCount++;
      const args = this.waitArr.shift(0);
      try {
        return await promiseTimeout(fetch(...args), this.timeout);
      } finally {
        this.execCount--;
      }
    };
    this.waitArr.push(arguments);
    await wait(() => this.execCount < this.limit);
    // 尝试启动等待队列
    return _innerFetch();
  }
}

使用示例

const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((item, i) =>
  requestLimiting
    ._fetch("/", {
      // 这里设置添加时的 index,用于验证是否真的顺序执行了
      headers: {
        index: i,
      },
    })
    .then((res) => console.log(res))
    .catch((err) => console.log(err))
);

总结

目前而言,最后一种实现是最好的,同时实现了两种规范

  • 返回 Promise,避免使用回调函数
  • 请求执行与添加顺序相同

JavaScript 自定义限流队列 fetch
https://blog.rxliuli.com/p/4842e93e045a4299a84d7d04b02038c4/
作者
rxliuli
发布于
2020年2月2日
许可协议