并发控制的概念相信大家都非常熟悉,比如浏览器请求的并发控制等。今天,我们结合 async-poolopen in new window 这个开源工具来看看如何实现一个简单的并发控制。
async-pool 的代码分为 es6 和 es7 两个版本,都非常简单,我们主要基于 es6 版本进行说明。
在去除参数校验等逻辑以后,核心代码如下,非常短小精悍:
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = [];
const executing = [];
const enqueue = function() {
if (i === array.length) {
return Promise.resolve();
}
const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
let r = Promise.resolve();
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
}
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
asyncPool
支持三个参数,第一个是并发数量,第二个是一组请求输入,第三个是返回 promise 的迭代函数。我们举一个例子来进行说明。
假设我们现在有 500 个请求需要发送,并发数量控制是 50。那么我们可以这样使用 asyncPool
:
asyncPool(50, [/* 500 个请求的参数数据 */], () => {/* 发起请求的函数 */})
我们现在来详细说明 asyncPool
的工作原理。
首先,asyncPool
中初始化了两个数组,ret
保存返回结果,其顺序要与输入顺序一致,executing
用于记录当前正在执行的请求。
asyncPool
中创建了一个 enqueue
函数,负责具体的并发控制逻辑。
在 enqueue
函数中,通过变量 i
来逐个获取请求输入参数,调用迭代函数发起请求,然后将返回的 promise 保存在 ret
中。
const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
2
3
之后就是并发数量控制的核心逻辑:
let r = Promise.resolve();
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
}
return r.then(() => enqueue());
2
3
4
5
6
7
8
9
10
11
如果并发数量限制大于要发起的请求数量,则无需通过 executing
数组来记录正在执行的请求,直接循环发起请求即可。
如果并发数量限制小于要发起的请求数量,则首先通过之前调用迭代函数返回的 promise 生成一个新的 promise,放入 executing
中。在这个新的 promise 完成时,将其从 executing
中删除。
如果 executing
数组长度大于并发数量控制,则使用 Promise.race(executing)
获取最先返回的 promsie,并通过它进行下一次迭代。
通过变量 r
我们可以看到,在整个循环过程中,enqueue
函数会形成一个 promise 链,在最后一个 promise 返回之后,asyncPool
通过 Promise.all
将所有的结果返回。
return enqueue().then(() => Promise.all(ret));
至此,async-pool
的核心逻辑我们就分析完了。上面的分析过程是基于 es6 版本的代码,es7 版本更加简洁,如下,看官们可以自行分析:
async function asyncPool(poolLimit, array, iteratorFn) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
}
return Promise.all(ret);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
我们知道,不管是 Promise.race
还是 Promise.all
,只要有一个 promise 达到 Fufilled
或者 Rejected
状态,整个就会返回。这在接口请求的的场景中是不合适的。我们应该如何改造呢?
其实也非常简单,只要在迭代函数的调用处做一些特殊处理即可。
iteratorFn(item, array).then(resp => resp).catch(error => error);
关注微信公众号,获取最新推送~
加微信,深入交流~