# promise 并发 concurrency

promise-concurrency-limiter

/* TYPES */

type Callback = () => void;

type Task<T> = () => Promise<T>;

type Options = {
  concurrency: number
};

/* PROMISE CONCURRENCY LIMITER */

class Limiter {

  /* VARIABLES */

  concurrency: number;
  count: number;
  queue: Set<Callback>;

  /* CONSTRUCTOR */

  constructor ( options: Options ) {

    this.concurrency = options.concurrency;
    this.count = 0;
    this.queue = new Set ();

  }

  /* API */

  add <T> ( fn: Task<T> ): Promise<T> {

    if ( this.count < this.concurrency ) return this.run ( fn );

    return new Promise<T> ( resolve => {

      const callback = () => resolve ( this.run ( fn ) );

      this.queue.add ( callback );

    });

  }

  flush (): void {

    for ( const callback of this.queue ) {

      if ( this.count >= this.concurrency ) break;

      this.queue.delete ( callback );

      callback ();

    }

  }

  run <T> ( fn: Task<T> ): Promise<T> {

    this.count += 1;

    const promise = fn ();

    const cleanup = (): void => {

      this.count -= 1;

      this.flush ();

    };

    promise.then ( cleanup, cleanup );

    return promise;

  }

}

export default Limiter;


使用:

import Limiter from 'promise-concurrency-limiter';

const limiter = new Limiter ({
  concurrency: 2 // Limit the number of simultaneously active promises to 2
});

const somePromiseReturningFunction = async () => { /* ... */ };

limiter.add ( somePromiseReturningFunction ); // First function added, executed immediately
limiter.add ( somePromiseReturningFunction ); // Second function added, executed immediately
limiter.add ( somePromiseReturningFunction ); // Third function added, executed immediately only if one of the 2 available slots got freed, deferred otherwise

# 参考

  1. promise-concurrency-limiter (opens new window)