JavaScript 自定义限流队列 fetch

本文最后更新于:2020年12月30日 凌晨

为什么需要它

有些时候不得不需要限制并发 fetch 的请求数量,避免请求过快导致 IP 封禁

需要做到什么

  • 允许限制 fetch 请求同时存在的数量
  • 时间过久便认为是超时了

如何实现

暂停请求

该方法的请求是无序的!

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前判断当前请求的数量,添加请求等待数量
    1. 如果请求数量已满,则进行等待
    2. 如果请求数量未满,则删除一个请求等待数量
  3. 请求完成,删除当前请求数量

等待队列:循环监听

该方法需要使用回调函数

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 使用 setInterval 函数持续监听队列和当前执行的请求数
    • 发现请求数量没有到达最大值,且等待队列中还有值,那么就执行一次请求

等待队列:触发钩子

  1. 使用 class 定义默认超时设置和请求数量限制的构造函数
  2. 在请求前将请求 argments 添加到等待队列中
  3. 添加完成,等待当前请求数量未满
  4. 尝试启动等待队列(钩子)

实现代码

暂停请求实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise(resolve => {
if (typeof param === "number") {
setTimeout(resolve, param);
} else if (typeof param === "function") {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer);
resolve();
}
}, 100);
} else {
resolve();
}
});
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null;
//这是一个可以被reject的promise
var abortPromise = new Promise(function(resolve, reject) {
abortFn = function() {
reject("abort promise");
};
});
var abortablePromise = Promise.race([fetchPromise, abortPromise]);
setTimeout(function() {
abortFn();
}, timeout);

return abortablePromise;
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout;
this.limit = limit;
this.execCount = 0;
this.waitCount = 0;
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init) {
this.waitCount++;
await wait(() => this.execCount < this.limit);
this.waitCount--;
this.execCount++;
try {
return await promiseTimeout(fetch(url, init), this.timeout);
} finally {
this.execCount--;
}
}
}

使用示例

1
2
3
4
5
6
7
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach(i =>
requestLimiting
._fetch("/")
.then(res => console.log(res))
.catch(err => console.log(err))
);

等待队列:循环监听实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise(resolve => {
if (typeof param === "number") {
setTimeout(resolve, param);
} else if (typeof param === "function") {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer);
resolve();
}
}, 100);
} else {
resolve();
}
});
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null;

//这是一个可以被reject的promise
var abortPromise = new Promise(function(resolve, reject) {
abortFn = function() {
reject("abort promise");
};
});

var abortablePromise = Promise.race([fetchPromise, abortPromise]);

setTimeout(function() {
abortFn();
}, timeout);

return abortablePromise;
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout;
this.limit = limit;
this.execCount = 0;
// 等待队列
this.waitArr = [];

// 监视 execCount 的值
setInterval(async () => {
if (this.execCount >= this.limit) {
return;
}
console.debug(
`执行 execCount: ${this.execCount}, waitArr length: ${
this.waitArr.length
}, index: ${JSON.stringify(this.waitArr[0])}`
);
const args = this.waitArr.shift(0);
if (!args) {
return;
}
this.execCount++;
const callback = args[2];
try {
// 如果没有错误就返回 res
callback({ res: await promiseTimeout(fetch(...args), this.timeout) });
} catch (err) {
// 否则返回 err
callback({
err: err
});
} finally {
this.execCount--;
}
}, 100);
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @param {Function} callback 回调函数
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init, callback) {
this.waitArr.push(arguments);
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((item, i) =>
requestLimiting._fetch(
"/",
{
// 这里设置添加时的 index,用于验证是否真的顺序执行了
headers: {
index: i
}
},
// 这里使用了回调函数,参数使用解构得到
({ res, err }) => {
console.log(`res: ${res}, err: ${err}`);
}
)
);

等待队列:触发钩子实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 等待指定的时间/等待指定表达式成立
* @param {Number|Function} param 等待时间/等待条件
* @returns {Promise} Promise 对象
*/
function wait(param) {
return new Promise(resolve => {
if (typeof param === "number") {
setTimeout(resolve, param);
} else if (typeof param === "function") {
var timer = setInterval(() => {
if (param()) {
clearInterval(timer);
resolve();
}
}, 100);
} else {
resolve();
}
});
}
/**
* 为 fetch 请求添加超时选项
* 注:超时选项并非真正意义上的超时即取消请求,请求依旧正常执行完成,但会提前返回 reject 结果
* @param {Promise} fetchPromise fetch 请求的 Promise
* @param {Number} timeout 超时时间
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
function promiseTimeout(fetchPromise, timeout) {
var abortFn = null;
//这是一个可以被 reject 的 Promise
var abortPromise = new Promise(function(resolve, reject) {
abortFn = function() {
reject("abort promise");
};
});
// 有一个 Promise 完成就立刻结束
var abortablePromise = Promise.race([fetchPromise, abortPromise]);
setTimeout(function() {
abortFn();
}, timeout);
return abortablePromise;
}
/**
* 限制并发请求数量的 fetch 封装
*/
class RequestLimiting {
constructor({ timeout = 10000, limit = 10 }) {
this.timeout = timeout;
this.limit = limit;
this.execCount = 0;
// 等待队列
this.waitArr = [];
}

/**
* 执行一个请求
* 如果到达最大并发限制时就进行等待
* 注:该方法的请求顺序是无序的,与代码里的顺序无关
* @param {RequestInfo} url 请求 url 信息
* @param {RequestInit} init 请求的其他可选项
* @returns {Promise} 如果超时就提前返回 reject, 否则正常返回 fetch 结果
*/
async _fetch(url, init) {
const _innerFetch = async () => {
console.log(
`执行 execCount: ${this.execCount}, waitArr length: ${
this.waitArr.length
}, index: ${JSON.stringify(this.waitArr[0])}`
);
this.execCount++;
const args = this.waitArr.shift(0);
try {
return await promiseTimeout(fetch(...args), this.timeout);
} finally {
this.execCount--;
}
};
this.waitArr.push(arguments);
await wait(() => this.execCount < this.limit);
// 尝试启动等待队列
return _innerFetch();
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
const requestLimiting = new RequestLimiting({ timeout: 500, limit: 1 });
new Array(100).fill(0).forEach((item, i) =>
requestLimiting
._fetch("/", {
// 这里设置添加时的 index,用于验证是否真的顺序执行了
headers: {
index: i
}
})
.then(res => console.log(res))
.catch(err => console.log(err))
);

总结

目前而言,最后一种实现是最好的,同时实现了两种规范

  • 返回 Promise,避免使用回调函数
  • 请求执行与添加顺序相同

JavaScript 自定义限流队列 fetch
https://blog.rxliuli.com/p/4842e93e045a4299a84d7d04b02038c4/
作者
rxliuli
发布于
2020年2月2日
许可协议