JavaScript 异步数组

JavaScript 异步数组

场景

吾辈是一只在飞向太阳的萤火虫

JavaScript 中的数组是一个相当泛用性的数据结构,能当数组,元组,队列,栈进行操作,更好的是 JavaScript 提供了很多原生的高阶函数,便于我们对数组整体操作。
然而,JavaScript 中的高阶函数仍有缺陷 – 异步!当你把它们放在一起使用时,就会感觉到这种问题的所在。

例如现在,有一组 id,我们要根据 id 获取到远端服务器 id 对应的值,然后将之打印出来。那么,我们要怎么做呢?

1
2
3
4
5
6
7
8
9
const wait = ms => new Promise(resolve => setTimeout(resolve, ms))

async function get(id) {
// 这里只是为了模拟每个请求的时间可能是不定的
await wait(Math.random() * id * 100)
return '内容: ' + id.toString()
}

const ids = [1, 2, 3, 4]

你或许会下意识地写出下面的代码

1
ids.forEach(async id => console.log(await get(id)))

事实上,控制台输出是无序的,而并非想象中的 1, 2, 3, 4 依次输出

1
2
3
4
内容: 2 ​​​​​
内容: 3 ​​​​​
内容: 1 ​​​​​
内容: 4

这是为什么呢?原因便是 JavaScript 中数组的高阶函数并不会等待异步函数的返回!当你在网络上搜索时,会发现很多人会说可以使用 for-of, for-in 解决这个问题。

1
2
3
4
5
;(async () => {
for (let id of ids) {
console.log(await get(id))
}
})()

或者,使用 Promise.all 也是一种解决方案

1
2
3
;(async () => {
;(await Promise.all(ids.map(get))).forEach(v => console.log(v))
})()

然而,第一种方式相当于丢弃了 Array 的所有高阶函数,再次重返远古 for 循环时代了。第二种则一定会执行所有的异步函数,即便你需要使用的是 find/findIndex/some/every 这些高阶函数。那么,有没有更好的解决方案呢?

思考

既然原生的 Array 不支持完善的异步操作,那么,为什么不由我们来实现一个呢?

实现思路:

  1. 创建异步数组类型 AsyncArray
  2. 内置一个数组保存当前异步操作数组的值
  3. 实现数组的高阶函数并实现支持异步函数顺序执行
  4. 获取到内置的数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class AsyncArray {
constructor(...args) {
this._arr = Array.from(args)
this._task = []
}
async forEach(fn) {
const arr = this._arr
for (let i = 0, len = arr.length; i < len; i++) {
await fn(arr[i], i, this)
}
}
}

new AsyncArray(...ids).forEach(async id => console.log(await get(id)))

打印结果确实有顺序了,看似一切很美好?

然而,当我们再实现一个 map 试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class AsyncArray {
constructor(...args) {
this._arr = Array.from(args)
}
async forEach(fn) {
const arr = this._arr
for (let i = 0, len = arr.length; i < len; i++) {
await fn(arr[i], i, this)
}
}
async map(fn) {
const arr = this._arr
const res = []
for (let i = 0, len = arr.length; i < len; i++) {
res.push(await fn(arr[i], i, this))
}
return this
}
}

调用一下

1
2
3
new AsyncArray(...ids).map(get).forEach(async res => console.log(res))
// 抛出错误
// (intermediate value).map(...).forEach is not a function

然而会有问题,实际上 map 返回的是 Promise,所以我们还必须使用 await 进行等待

1
2
3
4
5
;(async () => {
;(await new AsyncArray(...ids).map(get)).forEach(async res =>
console.log(res),
)
})()

是不是感觉超级蠢?吾辈也是这样认为的!

链式调用加延迟执行

我们可以尝试使用链式调用加延迟执行修改这个 AsyncArray

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 保存高阶函数传入的异步操作
*/
class Action {
constructor(type, args) {
/**
* @field 异步操作的类型
* @type {string}
*/
this.type = type
/**
* @field 异步操作的参数数组
* @type {Function}
*/
this.args = args
}
}

/**
* 所有的操作类型
*/
Action.Type = {
forEach: 'forEach',
map: 'map',
filter: 'filter',
}

/**
* 真正实现的异步数组
*/
class InnerAsyncArray {
constructor(arr) {
this._arr = arr
}
async forEach(fn) {
const arr = this._arr
for (let i = 0, len = arr.length; i < len; i++) {
await fn(arr[i], i, this)
}
this._arr = []
}
async map(fn) {
const arr = this._arr
const res = []
for (let i = 0, len = arr.length; i < len; i++) {
res.push(await fn(arr[i], i, this))
}
this._arr = res
return this
}
async filter(fn) {
const arr = this._arr
const res = []
for (let i = 0, len = arr.length; i < len; i++) {
if (await fn(arr[i], i, this)) {
res.push(arr[i])
}
}
this._arr = res
return this
}
}

class AsyncArray {
constructor(...args) {
this._arr = Array.from(args)
/**
* @field 保存异步任务
* @type {Action[]}
*/
this._task = []
}
forEach(fn) {
this._task.push(new Action(Action.Type.forEach, [fn]))
return this
}
map(fn) {
this._task.push(new Action(Action.Type.map, [fn]))
return this
}
filter(fn) {
this._task.push(new Action(Action.Type.filter, [fn]))
return this
}
/**
* 终结整个链式操作并返回结果
*/
async value() {
const arr = new InnerAsyncArray(this._arr)
let result
for (let task of this._task) {
result = await arr[task.type](...task.args)
}
return result
}
}

使用一下

1
2
3
4
5
new AsyncArray(...ids)
.filter(async i => i % 2 === 0)
.map(get)
.forEach(async res => console.log(res))
.value()

可以看到,确实符合预期了,然而每次都要调用 value(),终归有些麻烦。

使用 then 以支持 await 自动结束

这里使用 then() 替代它以使得可以使用 await 自动计算结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class AsyncArray {
// 上面的其他内容...
/**
* 终结整个链式操作并返回结果
*/
async then(resolve) {
const arr = new InnerAsyncArray(this._arr)
let result
for (let task of this._task) {
result = await arr[task.type](...task.args)
}
// 这里使用 resolve(result) 是为了兼容 await 的调用方式
resolve(result)
return result
}
}

现在,可以使用 await 结束这次链式调用了

1
await new AsyncArray(...ids).map(get).forEach(async res => console.log(res))

突然之间,我们发现了一个问题,为什么会这么慢?一个个去进行异步操作太慢了,难道就不能一次性全部发送出去,然后有序的处理结果就好了嘛?

并发异步操作

我们可以使用 Promise.all 并发执行异步操作,然后对它们的结果进行有序地处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 并发实现的异步数组
*/
class InnerAsyncArrayParallel {
constructor(arr) {
this._arr = arr
}
async _all(fn) {
return Promise.all(this._arr.map(fn))
}
async forEach(fn) {
await this._all(fn)
this._arr = []
}
async map(fn) {
this._arr = await this._all(fn)
return this
}
async filter(fn) {
const arr = await this._all(fn)
this._arr = this._arr.filter((v, i) => arr[i])
return this
}
}

然后修改 AsyncArray,使用 _AsyncArrayParallel 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class AsyncArray {
// 上面的其他内容...
/**
* 终结整个链式操作并返回结果
*/
async then(resolve) {
const arr = new InnerAsyncArrayParallel(this._arr)
let result = this._arr
for (let task of this._task) {
result = await arr[task.type](...task.args)
}
// 这里使用 resolve(result) 是为了兼容 await 的调用方式
if (resolve) {
resolve(result)
}
return result
}
}

调用方式不变。当然,由于使用 Promise.all 实现,也同样受到它的限制 – 异步操作实际上全部执行了。

串行/并行相互转换

现在我们的 _AsyncArray_AsyncArrayParallel 两个类只能二选一,所以,我们需要添加两个函数用于互相转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AsyncArray {
constructor(...args) {
this._arr = Array.from(args)
/**
* @field 保存异步任务
* @type {AsyncArrayAction[]}
*/
this._task = []
/**
* 是否并行化
*/
this._parallel = false
}
// 其他内容...

parallel() {
this._parallel = true
return this
}
serial() {
this._parallel = false
return this
}
async then() {
const arr = this._parallel
? new InnerAsyncArrayParallel(this._arr)
: new InnerAsyncArray(this._arr)
let result = this._arr
for (let task of this._task) {
result = await arr[task.type](...task.args)
}
if (resolve) {
resolve(result)
}
return result
}
}

现在,我们可以在真正执行之前在任意位置对其进行转换了

1
2
3
4
5
await new AsyncArray(...ids)
.parallel()
.filter(async i => i % 2 === 0)
.map(get)
.forEach(async res => console.log(res))

并发执行多个异步操作

然而,上面的代码有一些隐藏的问题

  1. await 之后返回值不是一个数组

    1
    2
    3
    4
    ;(async () => {
    const asyncArray = new AsyncArray(...ids)
    console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    })()
  2. 上面的 map, filter 调用在 await 之后仍会影响到下面的调用

    1
    2
    3
    4
    5
    ;(async () => {
    const asyncArray = new AsyncArray(...ids)
    console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    })()
  3. 并发调用的顺序不能确定,会影响到内部数组,导致结果不能确定

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ;(async () => {
    const asyncArray = new AsyncArray(...ids)
    ;(async () => {
    console.log(
    await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
    ) // InnerAsyncArray { _arr: [ 2, 6 ] }
    })()
    ;(async () => {
    console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 6 ] }
    })()
    })()

先解决第一个问题,这里只需要判断一下是否为终结操作(forEach),是的话就直接返回结果,否则继续下一次循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AsyncArray {
// 其他内容...

async then(resolve, reject) {
const arr = this._parallel
? new InnerAsyncArrayParallel(this._arr)
: new InnerAsyncArray(this._arr)
let result = this._arr
for (let task of this._task) {
const temp = await arr[task.type](...task.args)
if (
temp instanceof InnerAsyncArray ||
temp instanceof InnerAsyncArrayParallel
) {
result = temp._arr
} else {
// 如果已经是终结操作就返回数组的值
if (resolve) {
resolve(temp)
}
return temp
}
}
if (resolve) {
resolve(result)
}
return result
}
}

现在,第一个问题简单解决

1
2
3
4
;(async () => {
const asyncArray = new AsyncArray(...ids)
console.log(await asyncArray.map(i => i * 2)) // [ 2, 4, 6, 8 ]
})()

第二、第三个问题看起来似乎是同一个问题?其实我们可以按照常规思维解决第一个问题。既然 await 之后仍然会影响到下面的调用,那就在 then 中把 _task 清空好了,修改 then 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class AsyncArray {
// 其他内容...

async then(resolve, reject) {
const arr = this._parallel
? new InnerAsyncArrayParallel(this._arr)
: new InnerAsyncArray(this._arr)
let result = this._arr
for (let task of this._task) {
const temp = await arr[task.type](...task.args)
if (
temp instanceof InnerAsyncArray ||
temp instanceof InnerAsyncArrayParallel
) {
result = temp._arr
} else {
// 如果已经是终结操作就返回数组的值
if (resolve) {
resolve(temp)
}
this._task = []
return temp
}
}
if (resolve) {
resolve(result)
}
this._task = []
return result
}
}

现在,第一个问题解决了,但第二个问题不会解决。究其原因,还是异步事件队列的问题,虽然 async-await 能够让我们以同步的方式写异步的代码,但千万不可忘记它们本质上还是异步的!

1
2
3
4
5
6
7
8
9
10
11
12
13
;(async () => {
await Promise.all([
(async () => {
console.log(
await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
) // [ 2, 6 ]
})(),
(async () => {
console.log(await asyncArray) // [ 2, 6 ]
})(),
])
console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()

可以看到,在使用 await 进行等待之后就如同预期的 _task 被清空了。然而,并发执行的没有等待的 await asyncArray 却有奇怪的问题,因为它是在 _task 清空之前执行的。

并且,这带来一个副作用: 无法缓存操作了

1
2
3
4
5
;(async () => {
const asyncArray = new AsyncArray(...ids).map(i => i * 2)
console.log(await asyncArray) // [ 2, 4, 6, 8 ]
console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()

使用不可变数据

为了解决直接修改内部数组造成的问题,我们可以使用不可变数据解决这个问题。试想:如果我们每次操作都返回一个新的 AsyncArray,他们之间没有关联,这样又如何呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class AsyncArray {
constructor(...args) {
this._arr = Array.from(args)
/**
* @field 保存异步任务
* @type {Action[]}
*/
this._task = []
/**
* 是否并行化
*/
this._parallel = false
}
forEach(fn) {
return this._addTask(Action.Type.forEach, [fn])
}
map(fn) {
return this._addTask(Action.Type.map, [fn])
}
filter(fn) {
return this._addTask(Action.Type.filter, [fn])
}
parallel() {
this._parallel = true
return this
}
serial() {
this._parallel = false
return this
}
_addTask(type, args) {
const result = new AsyncArray(...this._arr)
result._task = [...this._task, new Action(type, args)]
result._parallel = this._parallel
return result
}
/**
* 终结整个链式操作并返回结果
*/
async then(resolve, reject) {
const arr = this._parallel
? new InnerAsyncArrayParallel(this._arr)
: new InnerAsyncArray(this._arr)
let result = this._arr
for (let task of this._task) {
const temp = await arr[task.type](...task.args)
if (
temp instanceof InnerAsyncArray ||
temp instanceof InnerAsyncArrayParallel
) {
result = temp._arr
} else {
// 如果已经是终结操作就返回数组的值
if (resolve) {
resolve(temp)
}
return temp
}
}
if (resolve) {
resolve(result)
}
return result
}
}

再次测试上面的那第三个问题,发现已经一切正常了呢

  • 并发调用的顺序不能确定,但不会影响内部数组了,结果是确定的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
;(async () => {
const asyncArray = new AsyncArray(...ids)
await Promise.all([
(async () => {
console.log(
await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
) // [ 2, 6 ]
})(),
(async () => {
console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})(),
])
console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()
  • 操作可以被缓存
1
2
3
4
5
;(async () => {
const asyncArray = new AsyncArray(...ids).map(i => i * 2)
console.log(await asyncArray) // [ 2, 4, 6, 8 ]
console.log(await asyncArray) // [ 2, 4, 6, 8 ]
})()

完整代码

下面吾辈把完整的代码贴出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* 保存高阶函数传入的异步操作
*/
class Action {
constructor(type, args) {
/**
* @field 异步操作的类型
* @type {string}
*/
this.type = type
/**
* @field 异步操作的参数数组
* @type {Function}
*/
this.args = args
}
}

/**
* 所有的操作类型
*/
Action.Type = {
forEach: 'forEach',
map: 'map',
filter: 'filter',
}

/**
* 真正实现的异步数组
*/
class InnerAsyncArray {
constructor(arr) {
this._arr = arr
}
async forEach(fn) {
const arr = this._arr
for (let i = 0, len = arr.length; i < len; i++) {
await fn(arr[i], i, this)
}
this._arr = []
}
async map(fn) {
const arr = this._arr
const res = []
for (let i = 0, len = arr.length; i < len; i++) {
res.push(await fn(arr[i], i, this))
}
this._arr = res
return this
}
async filter(fn) {
const arr = this._arr
const res = []
for (let i = 0, len = arr.length; i < len; i++) {
if (await fn(arr[i], i, this)) {
res.push(arr[i])
}
}
this._arr = res
return this
}
}

class InnerAsyncArrayParallel {
constructor(arr) {
this._arr = arr
}
async _all(fn) {
return Promise.all(this._arr.map(fn))
}
async forEach(fn) {
await this._all(fn)
this._arr = []
}
async map(fn) {
this._arr = await this._all(fn)
return this
}
async filter(fn) {
const arr = await this._all(fn)
this._arr = this._arr.filter((v, i) => arr[i])
return this
}
}

class AsyncArray {
constructor(...args) {
this._arr = Array.from(args)
/**
* @field 保存异步任务
* @type {Action[]}
*/
this._task = []
/**
* 是否并行化
*/
this._parallel = false
}
forEach(fn) {
return this._addTask(Action.Type.forEach, [fn])
}
map(fn) {
return this._addTask(Action.Type.map, [fn])
}
filter(fn) {
return this._addTask(Action.Type.filter, [fn])
}
parallel() {
this._parallel = true
return this
}
serial() {
this._parallel = false
return this
}
_addTask(type, args) {
const result = new AsyncArray(...this._arr)
result._task = [...this._task, new Action(type, args)]
result._parallel = this._parallel
return result
}
/**
* 终结整个链式操作并返回结果
*/
async then(resolve, reject) {
const arr = this._parallel
? new InnerAsyncArrayParallel(this._arr)
: new InnerAsyncArray(this._arr)
let result = this._arr
for (let task of this._task) {
const temp = await arr[task.type](...task.args)
if (
temp instanceof InnerAsyncArray ||
temp instanceof InnerAsyncArrayParallel
) {
result = temp._arr
} else {
// 如果已经是终结操作就返回数组的值
if (resolve) {
resolve(temp)
}
return temp
}
}
if (resolve) {
resolve(result)
}
return result
}
}

总结

那么,关于 JavaScript 中如何封装一个可以使用异步操作高阶函数的数组就先到这里了,完整的 JavaScript 异步数组请参考吾辈的 AsyncArray(使用 TypeScript 编写)。