Предыстория

В одной из моих предыдущих организаций требовалось загружать N файлов. Первоначально общее количество было жестко задано таким образом, что одновременно можно было загрузить только 10, 100 или 200 файлов. Однако позже требование изменилось, и теперь можно загружать любое количество файлов или папок (сглаженный массив BFS). Это был поворотный момент, потому что выполнение более 10 000 запросов в конечном итоге приведет к сбою браузера, поэтому я придумал решение использовать очередь для загрузки N файлов или запросов.

Введение в массовую загрузку

Вы когда-нибудь сталкивались с проблемой загрузки большого количества файлов или папок и сбоем браузера из-за слишком большого количества запросов? Если это так, вы будете рады узнать, что есть решение этой проблемы. Представляем Bulk Upload, превосходную библиотеку для одновременной загрузки большого количества файлов или папок (сглаженный массив BFS, аналогичный процессу загрузки папок на Google Диск), чтобы избежать ударов по производительности. С помощью массовой загрузки вы можете установить уровень параллелизма в соответствии с вашими потребностями, а библиотека позаботится обо всем остальном. Например, если вы установите уровень параллелизма равным 10 и у вас есть 1000 файлов для загрузки, 990 файлов будут находиться в очереди, а остальные 10 — в пуле одновременных запросов. Таким образом, вы можете быть уверены, что ваш браузер не выйдет из строя, а ваши загрузки будут завершены эффективно.

Обзор

Давайте пошагово

  1. UI Wrapper – это простая интерфейсная оболочка для этой библиотеки. Процесс начинается с создания класса экземпляра массовой загрузки и предоставления файлов или папок в плоском формате поиска в ширину. На начальном этапе файлы разделяются на очередь и незавершенный пул на основе параллелизма и общего количества предоставленных файлов.
  2. очередь — это простая структура данных карты, которая отслеживает файлы, которые не выполняются, и сбрасывает их, если размер пула выполняемых операций меньше размера параллелизма.
  3. Пул in-progress — это место, где начинается процесс загрузки. Он использует Axios для обработки запросов и отправляет обратный вызов в оболочку пользовательского интерфейса. Затем оболочка пользовательского интерфейса возвращает полезную нагрузку для запроса Axios, предоставляя ему контроль над всем потоком запросов. Незавершенный пул также отвечает за отправку событий, таких как сбой, отмена, завершение и ход загрузки/выгрузки, диспетчеру состояний.
  4. Менеджер состояния играет решающую роль в этом процессе, определяя, когда очередь должна быть сброшена в текущий пул. Если размер выполняемого пула меньше размера параллелизма, очередь освобождается и цикл завершается. Диспетчер состояния также получает события из текущего пула и информирует об этом очередь, а также отправляет обновления в оболочку пользовательского интерфейса, чтобы он мог просматривать в режиме реального времени структуры данных о выполняемых, незавершенных событиях. очередь, завершенные и неудачные запросы.
  5. Оболочка пользовательского интерфейса также контролирует запрос на отмену, уничтожение, повторную попытку, поскольку объекты в JavaScript передаются по ссылке, статус = FAILED | COMPLETED | IN_PROGRESS и т. д. совместно используются при переходе к различным пулам.

Таким образом, файлы организованы в очередь и пул в процессе выполнения на основе параллелизма. Процесс загрузки начинается с использованием Axios, а менеджер состояния обновляет очередь и информирует оболочку пользовательского интерфейса о состоянии запроса в режиме реального времени. UI Wrapper также может управлять отменой, уничтожением и повторными запросами. Размер выполняемого пула сравнивается с параллелизмом, чтобы определить, когда очищать очередь.

import axios, { AxiosProgressEvent, AxiosRequestConfig } from "axios";
export default class BulkUpload {
  /**
   * @param {number} concurrency - The number of concurrent file uploads allowed.
   * @param {File[]} files - The array of File objects to be uploaded.
   * @param {function} onUpdate - A callback function that is called whenever there is an update in the upload status.
   * @param {boolean} [requestOptions.downloadProgress=false] - Whether to report download progress
   * @param {boolean} [requestOptions.uploadProgress=false] - Whether to report upload progress
   * @param {function} requestArguments - callback function which returns payload for axios request along side fileObject as an argument
   * @param {function} onUploadComplete - callback function when pending and queue is finished
   * @param {number} lastProgressUpload - how frequest onUpdate callback should be invoked, whenever upload/download progress is updated
   * @param {boolean} isFileHierarchy - For fetching & uploading folder-hierarchy please use this package : https://www.npmjs.com/package/files-hierarchy
   */
  constructor({
    concurrency,
    // files,
    onUpdate,
    requestOptions,
    requestArguments,
    onUploadComplete,
    lastProgressUpload,
    isFileHierarchy,
  }: Constructor) {
      // INTIAL SETUP 
  }
  /**
   * getControls to override upload flow
   * @returns {Object} {cancel, retry, destroy, updateQueue}
   */
  public getControls() {
    return {
      cancel: this.cancelOperation,
      retry: this.retryFailedOperation,
      updateQueue: this.updateQueue,
      destroy: this.destroy,
    };
  }
  /**
   * @param {Array} File or FileHierarchy objects
   * start the queue progress segregates queue and inProgress pool based on concurrency limit
   */
  public start(files: File[] | FileHierarchy[]) {
    if (!this.initiated) {
      //if request is already initiated and more files are there to be processed
      //those extra files are pushed into queue 
      return this.updateQueue(files);
    }
    this.initiated = true;
    for (let i = 0; i < files.length; i++) {
      const file = files[i]!;
      const value = {
        status: FileStatus.IN_PROGRESS,
        id: this.getTargetValue(file),
        ...this.getFileTargetVal(file),
      };
      if (i < this._concurrency) {
        value.status = FileStatus.IN_PROGRESS;
        this.inProgress.set(value.id, value);
      } else {
        value.status = FileStatus.IN_QUEUE;
        this.inQueue.set(value.id, value);
      }
    }
    this.sendUpdateEvent();
    this.startInitialProgress();
  }
  private startInitialProgress() {
    for (const [_, fileObj] of this.inProgress) {
      this.uploadFile(fileObj as FileObj);
    }
  }

  /***
  updateProgressEvent is responsible for attaching a callback 
  that is invoked by axios XHR request that has totoal & loaded count
  */
  private updateProgressEvent({
    fileObj,
    axiosRequestArgs,
    type,
  }: {
    type: "DOWNLOAD" | "UPLOAD";
    fileObj: FileObj;
    axiosRequestArgs: any;
  }) {
    try {
      const isDownload = type === "DOWNLOAD";
      const progressType = isDownload
        ? "onDownloadProgress"
        : "onUploadProgress";
      axiosRequestArgs[progressType] = ({
        loaded,
        total,
      }: AxiosProgressEvent) => {
        loaded = isNaN(Number(loaded)) ? 0 : Number(loaded);
        total = isNaN(Number(total)) ? 0 : Number(total);
        fileObj[isDownload ? "downloadCount" : "uploadCount"] = Math.floor(
          (loaded / total) * 100
        );
        if (typeof fileObj?.lastProgressUpdated !== "number") {
          fileObj.lastProgressUpdated = Date.now();
        }
        //send event callback after updating lastProgressUpload if frequency 
        //is more than requested one
        if (
          typeof this._lastProgressUpload === "number" &&
          Date.now() - fileObj?.lastProgressUpdated >= this._lastProgressUpload
        ) {
          this.sendUpdateEvent();
          fileObj.lastProgressUpdated = Date.now();
        }
      };
    } catch (e) {
      console.error(e);
    }
  }

 //axios upload method where arguments is gathered using a callback function
//to the calle by sending fileObj consisting of all the request info.
  private uploadFile(fileObj: FileObj) {
    try {
      const axiosRequestArgs: AxiosRequestConfig =
        this._requestArguments(fileObj);
      if (this._downloadProgress) {
        this.updateProgressEvent({
          fileObj,
          type: "DOWNLOAD",
          axiosRequestArgs,
        });
      }
      if (this._uploadProgress) {
        this.updateProgressEvent({ fileObj, type: "UPLOAD", axiosRequestArgs });
      }
      //preserve the canceltoken within the fileObj of the map.
      axiosRequestArgs.cancelToken = new axios.CancelToken((cancel) => {
        fileObj.cancel = cancel;
      });
      axios(axiosRequestArgs)
        .then(() => {
          //here progress is deleted from map and status is updated
          //later queue is informed about the status change
          if (this.destroyed) return;
          this.inProgress.delete(fileObj.id);
          fileObj.status = FileStatus.SUCCESS;
          this.completedUploads += 1; //.set(fileObj.id, fileObj);
          this.sendUpdateEvent();
          this.freeQueue();
        })
        .catch((requestError) => {
          if (this.destroyed) return;
          fileObj.isCancelled = !!axios.isCancel(requestError);
          this.uploadFailed(fileObj);
        });
    } catch (e) {
      if (this.destroyed) return;
      this.uploadFailed(fileObj);
    }
  }

  /**
    heart of the concurrency mechanish
    here progress size is first checked before flushing queue to progress pool
   * inform queue to remove items and push to progress Pool
   */
  private freeQueue(): void {
    if (this.inQueue.size === 0 || this.destroyed) {
      this.sendUpdateEvent();
      if (!this.uploadCompleted) {
        this._onUploadComplete?.();
        this.uploadCompleted = true;
      }
      return;
    }
    if (this.inProgress.size === this._concurrency) {
      return this.sendUpdateEvent();
    }
    for (let [_, file] of this.inQueue) {
      file.status = FileStatus.IN_PROGRESS;
      this.inQueue.delete(file.id!);
      this.inProgress.set(file.id!, file);
      this.sendUpdateEvent();
      this.uploadFile(file as FileObj);
      //we only what top of the queue that's why break the loop post every 
      // iteration
      break;
    }
  }

  //same thing here update the status to FAILED
  // remove from progress pool update failedUploads
  //later send the event and inform queue
  private uploadFailed(fileObj: FileObj): void {
    fileObj.status = FileStatus.FAILED;
    this.inProgress.delete(fileObj.id);
    this.failedUploads.set(fileObj.id, fileObj);
    this.sendUpdateEvent();
    this.freeQueue();
  }

  /** */
  //  onUpdateCallback is catched by the callee (UI wrapper)
  // this callback is invoked whenever any 
  //map data-structure(progress, queue, failed, completed etc) is changed
  //such as queue flushed, failed, completed etc.
  private sendUpdateEvent(): void {
    this._onUpdate?.({
      IN_PROGRESS: this.inProgress,
      IN_QUEUE: this.inQueue,
      COMPLETED_UPLOADS: this.completedUploads,
      FAILED_UPLOADS: this.failedUploads,
    });
  }

  private cancelOperation = (file: FileObj) => {
    if (file.status === FileStatus.IN_PROGRESS) {
      file.cancel?.();
    }
  };

  //flip the destroyed flag this will stop all the queue-progress-flushing 
  //and halt the process,
  // post halting cancel all on-going request
  private destroy = () => {
    this.destroyed = true;
    const now = Date.now();
    for (let [, file] of this.inProgress as Map<string, FileObj>) {
      if (file.status === FileStatus.IN_PROGRESS) {
        this.cancelOperation(file);
        file = {
          status: FileStatus.FAILED,
          id: `${this.getTargetValue(
            file.fileHierarchy || (file.file as File)
          )}-${now}`,
          ...this.getFileTargetVal(file.fileHierarchy || (file.file as File)),
        };
        this.inProgress.delete(file.id);
        this.failedUploads.set(file.id, file);
      }
    }
    this.sendUpdateEvent();
  };

  private retryFailedOperation = (fileObjs: FileObj[]) => {
    if (!Array.isArray(fileObjs))
      throw new Error("Retry Argument must be an array");
    const retries: (File | FileHierarchy)[] = [];
    const isFile = this.isFileType();
    for (let file of fileObjs) {
      if (file.status === FileStatus.FAILED) {
        this.failedUploads.delete(file.id);
        retries.push(isFile ? file.file! : file.fileHierarchy!);
      }
    }
    this.updateQueue(retries);
  };
  //this method takes care of the new incoming files during an ongoing request
  //take those new files and push them to queue pool
  //later inform queue about the new files which need processing
  private updateQueue = (files: (File | FileHierarchy)[]) => {
    this.uploadCompleted = false;
    this.destroyed = false;
    const now = Date.now();
    for (let i = 0; i < files.length; i++) {
      const file = files[i]!;
      const value = {
        status: FileStatus.IN_QUEUE,
        id: `${this.getTargetValue(file)}-${now}`,
        ...this.getFileTargetVal(file),
      };
      value.status = FileStatus.IN_QUEUE;
      this.inQueue.set(value.id, value);
      this.freeQueue();
    }
    this.sendUpdateEvent();
  };

  private getTargetValue(fileObj: File | FileHierarchy) {
    if (fileObj instanceof File) {
      return fileObj.name;
    }
    return fileObj.path;
  }

  private getFileTargetVal(file: File | FileHierarchy): {
    file: File | null;
    fileHierarchy: FileHierarchy | null;
    isCancelled: boolean;
  } {
    const isFile = this.isFileType();
    return {
      file: isFile ? (file as File) : null,
      fileHierarchy: !isFile ? (file as FileHierarchy) : null,
      isCancelled: false,
    };
  }

  private isFileType(): boolean {
    return !!!this._isFileHierarchy;
  }
}

Зачем использовать структуру данных карты? Потому что его легче поддерживать во время циклов по сравнению с простым массивом, и он быстрее извлекает и удаляет элементы.

Почему завершенные загрузки являются не картой, как другие, а счетчиком? Это связано с тем, что, поскольку он уже завершен, нет смысла хранить его в карте или массиве.

Что такое Иерархия файлов? В начале я упомянул о необходимости отправлять папки поиска в ширину (BFS) в качестве запроса на загрузку. File Hierarchy — это экземпляр другой библиотечной утилиты, которую я недавно написал (https://www.npmjs.com/package/files-hierarchy), которая преобразует формат каталога webkit в более плоский каталог BFS. Например, если путь webkitdirectory — Documents/folder1/home/some-pic.png, структура BFS будет Document/ { /Folder -> { home —› { some-pic. png }}}. File Hierarchy преобразует путь webkitdirectory в сглаженный массив BFS. Это похоже на загрузку папки на Google Диск. Если есть потребность в каталоге webkit, мы можем использовать иерархию файлов.

Применение

npm i browser-bulk-upload
import BulkUpload from "browser-bulk-upload";

const bulkUpload = new BulkUpload({
  concurrency: 2,
  //synchronous function for returning axios request args
  requestArguments: ({ file, fileHierarchy }: any) => {
    //fileHierarchy -> please refer isFileHierarchy flag comment below
    const formData = new FormData();
    formData.append("file", file);
    return {
      url: "http://localhost:3000/upload",
      method: "POST",
      headers: {
        "Content-Type": "multipart/form-data",
      },
      data: formData,
    };
  },
  lastProgressUpload: 100, //for every 100ms download/upload progress will be updated and onUpdate callback will be invoked
  onUpdate: ({
    COMPLETED_UPLOADS /**Number */,
    FAILED_UPLOADS /**MAP -> [(name) => FileObj]**/,
    IN_QUEUE /**MAP -> [(name) => FileObj]**/,
    IN_PROGRESS /**MAP -> [(name) => FileObj]**/,
  }) => {
    //on complete, failed, inQueue & inProgress structure update callback is invoked
    onUploadUpdate({
      COMPLETED_UPLOADS,
      FAILED_UPLOADS,
      IN_QUEUE,
      IN_PROGRESS,
    });
  },
  onUploadComplete: () => {
    console.log("request completed");
  },
  requestOptions: {
    uploadProgress: true, //send request upload percentage
    // downloadProgress: true, send request download percentage
  },
  isFileHierarchy: false /**enable this flag if you have a requirement of sending folders as a BFS like Google-Drive folder upload to fetch all folder path(s), 
  please use this library : https://www.npmjs.com/package/files-hierarchy 
  **/,
});
const { cancel, destroy, retry, updateQueue } = bulkUpload.getControls();
/**
 * cancel -> cancel failed request -> cancel(FileObj)
 * destroy -> cancel all inprogress and remove all inqueue request(s) -> destroy()
 * retry -> retry only failed request -> retry([FileObj])
 * updateQueue -> update existing queue upload. Please note if you start upload again internally updateQueue is been called
 */
function onUploadUpdate({
  COMPLETED_UPLOADS, //number
  FAILED_UPLOADS,
  IN_QUEUE,
  IN_PROGRESS,
}: EventType) {
  /**FAILED|IN_QUEUE, IN_PROGRESS -> 
   * MAP{ FILE_NAME_ID -> 
   * FileObj = {
      file: File | null;
      fileHierarchy: FileHierarchy | null;
      status: FileStatus;
      uploadCount?: number;
      downloadCount?: number;
      isCancelled?: boolean; //if cancelled by user else request failed
      id: string;
      lastProgressUpdated?: number;
    };
   *  }**/
  //cancel(FileObj)
  //retry([FileObj, FileObj])
  //updateQueue(FileObj.file || FileObj.fileHierarchy)
}
//start the upload
document.querySelector("input")?.addEventListener("change", (e) => {
  bulkUpload.start(e.target.files);
});

Кроме того, я создал оболочку React для этой библиотеки, так как в большинстве случаев использовать ее в оригинальной форме было бы менее практично.



Чтобы закончить это, вот небольшая демонстрация.

Ссылки на библиотеку массовой загрузки:





Надеюсь, вы нашли мою реализацию массовой загрузки информативной и полезной! Всегда есть возможности для улучшения, например, для включения веб-воркеров, и я хотел бы услышать ваши мысли и отзывы.

Спасибо.

Дополнительные материалы на PlainEnglish.io.

Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Подпишитесь на нас в Twitter, LinkedIn, YouTube и Discord .

Заинтересованы в масштабировании запуска вашего программного обеспечения? Ознакомьтесь с разделом Схема.