github: https://github.com/sindresorhus/p-limit
使用方式
1 2 3 4 5 6 7 8 9 10 11 12
| import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [ limit(() => fetchSomething('foo')), limit(() => fetchSomething('bar')), limit(() => doSomething()) ];
const result = await Promise.all(input); console.log(result);
|
函数源码
初始化(pLimit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import Queue from "yocto-queue";
export default function pLimit(concurrency) { const queue = new Queue(); let activeCount = 0;
const enqueue = (fn, resolve, args) => { queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => { await Promise.resolve()
if (activeCount < concurrency && queue.size > 0) { queue.dequeue()(); } })(); };
const generator = (fn, ...args) => new Promise((resolve) => { enqueue(fn, resolve, args); });
return generator; }
|
pLimit
函数的入参 concurrency 是最大并发数,调用一次 pLimit 会生成一个限制并发的函数 generator
依赖yocto-queue
的队列能力,每次调用generator
会返回个promise
- 会向队列入队一个
run
(执行函数)
- 如果当前在执行
promise
数量小于concurrency
(并发数),就出队并执行
执行函数(run)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| const next = () => { activeCount--;
if (queue.size > 0) { queue.dequeue()(); } };
const run = async (fn, resolve, args) => { activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try { await result; } catch {}
next(); };
|
run函数执行
activeCount
加一
- 执行异步函数
fn
,并将结果传递给resolve
await
了result
,使得next
执行有序
- 执行
next
时表示promise
结果已经返回,activeCount
-1,并开始执行下一个promise
思考
为什么使用队列而不是数组
相关issus:Improve performance
shift
方法每次调用时, 都需要遍历一次数组, 将数组进行一次平移, 时间复杂度是O(n)
队列的dequeue
时间复杂度则是O(1)
为什么在入队并且执行的时候,判断执行前需要await Promise.resolve()
相关issus:Always run limited functions asynchronously
不加的话,有时候执行是同步的,有时候执行是异步的,有可能会导致在下一行代码执行之前状态就已经改变了,让程序运行结果不可预测
1 2 3 4 5 6 7
| (async () => { await Promise.resolve()
if (activeCount < concurrency && queue.size > 0) { queue.dequeue()(); } })();
|
加上可以保证所有出队执行都是异步的
如何添加超时逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| let timer = null; const timerPromise = new Promise((resolve, reject) => { timer = setTimeout(() => { reject('time out'); }, 1000); });
Promise.all([ timerPromise, fetchPromise, ]) .then(res => clearTimeout(timer)) .catch(err => console.error(err));
|
更正规的写法可以参考:p-timeout
参考文章
Node.js 并发能力总结