Nano Hash - криптовалюты, майнинг, программирование

Могу ли я запустить несколько медленных процессов в фоновом режиме, чтобы несколько задач могли выполняться параллельно?

У меня есть консольное приложение, написанное с использованием C# поверх платформы Core .NET 2.2.

Мое приложение позволяет мне запускать длительные административные задания с помощью планировщика задач Windows.

Одно из заданий администратора выполняет вызов веб-API, который загружает множество файлов перед их отправкой в ​​хранилище BLOB-объектов Azure. Вот логические шаги, которые мой код должен будет выполнить, чтобы выполнить работу.

  1. Вызовите удаленный API, который ответит сообщением Mime, где каждое сообщение представляет файл.
  2. Разберите сообщения Mime и преобразуйте каждое сообщение в MemoryStream, создав коллекцию MemoryStream.

Когда у меня будет коллекция из более чем 1000 MemoryStream, я хочу записать каждый Stream в хранилище BLOB-объектов Azure. Поскольку запись в удаленное хранилище выполняется медленно, я надеюсь, что смогу выполнить каждую итерацию записи, используя свой собственный процесс или поток. Это позволит мне одновременно параллельно выполнять более 1000+ потоков вместо того, чтобы ждать результата каждой операции записи. Каждый поток будет отвечать за регистрацию любых ошибок, которые могут возникнуть в процессе записи/загрузки. Любые зарегистрированные ошибки будут обработаны с использованием другого задания, поэтому мне не нужно беспокоиться о повторных попытках.

Насколько я понимаю, вызов кода, который асинхронно записывает/загружает поток, будет делать именно это. Другими словами, я бы сказал: «Есть Stream выполнить его и работать столько, сколько потребуется. Меня не волнует результат, пока задача выполнена».

Во время тестирования я обнаружил, что мое понимание вызова async несколько неверно. У меня сложилось впечатление, что при вызове метода, определенного с помощью async, он будет выполняться в фоновом потоке/рабочем, пока этот процесс не будет завершен. Но мое понимание потерпело неудачу, когда я тестировал код. Мой код показал мне, что без добавления ключевого слова await код async на самом деле никогда не выполняется. В то же время, когда добавляется ключевое слово await, код будет ждать, пока процесс завершит выполнение, прежде чем продолжить. Другими словами, добавление await для моих нужд лишает смысла асинхронный вызов метода.

Вот урезанная версия моего кода для объяснения того, что я пытаюсь выполнить.

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};

    foreach (Stream file in files)
    {
        // This code should get executed in the background without having to await the result
        await Upload(file);
    }
}

// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
    try
    {
        await Storage.Create(file, GetUniqueName());
    } 
    catch(Exception e)
    {
        // Log any errors
    }
}

Из приведенного выше кода вызов await Upload(file); работает и загрузит файл, как и ожидалось. Однако, поскольку я использую await при вызове метода Upload(), мой цикл НЕ перейдет к следующей итерации, пока код загрузки не завершится. В то же время, убрав ключевое слово await, цикл не ждет процесса загрузки, но Stream никогда фактически не пишет в хранилище, как будто я никогда не вызывал код.

Как я могу выполнять несколько методов Upload параллельно, чтобы у меня был один поток, работающий для каждой загрузки в фоновом режиме?


Ответы:


1

Преобразуйте список в список задач «Отправить» и дождитесь их всех с помощью Task.WhenAll():

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};
    var tasks = files.Select(Upload);

    await Task.WhenAll(tasks);
}

Дополнительную информацию о tasks/await см. в этой публикации.

05.02.2019
  • Спасибо за эту информацию. так что же делает Task.WhenAll() в этом случае? Запускает ли он все задачи одновременно параллельно, объединяет ли он задачи в группы и распределяет группы параллельно или выполняет одну за другой в конвейере? 05.02.2019
  • Короче говоря, @MikeA планировщик задач решит, сколько потоков освободить из пула потоков для одновременного запуска, он использует наилучшее предположение, основанное на определенных факторах и эвристике. Короче говоря, он работает параллельно, насколько это возможно. 05.02.2019
  • @MichaelRandall: Насколько мне известно, вы описали Parallel.ForEach. И по моему опыту Task.WhenAll запускает все задачи сразу. Это где-нибудь задокументировано? 05.02.2019
  • @abatishchev Задачи фактически запускаются, когда вы запускаете на них команду select. Однако ваша функция продолжается и продолжается. Task.WhenAll — это асинхронный метод, который возвращает значение, когда одна задача не удалась или все завершились успешно. Если вы ждете этого, выполнение приостанавливается до тех пор, пока задача не будет завершена. 05.02.2019
  • @abatishchev да, это задокументировано в пуле задач или потоков или где-то еще, это просто то, что делает планировщик по умолчанию, однако вы можете протестировать его с кучей задач и потоков. Спящий режим, однако с асинхронной работой и ожиданием и связанной с вводом-выводом работой, он будет ждать, пока порты завершения ввода-вывода закончат свои рабочие потоки, вернутся в пул потоков. Однако в целом планировщик не будет агрессивно освобождать потоки из пула потоков и будет использовать метод наилучшего предположения, определяемый такими факторами, как максимальное количество потоков, тип выполняемой вами работы и эвристика. У Стефана Туба и Стефана Клири тоже есть блоги на эту тему. 06.02.2019
  • @FrankerZ: да, и это моя точка зрения. await Task.WhenAll(Enumerable.Range(0, Math.Pow(2,10)).Select(_ => httpClient.GetAsync("https://google.com"))) не будет корректно планировать рабочую нагрузку. Будет ли он? 06.02.2019
  • @MichaelRandall: я прочитал оба блога. Можно более конкретную ссылку? И пожалуйста, смотрите мой комментарий выше. 06.02.2019
  • @abatishchev Я признаю, что соответствующие источники о том, как .net TaskScheduler по умолчанию работает внутри, нелегко найти, и в документации есть только беглые комментарии по этому поводу. Кроме того, исходный код TPL не для слабонервных. Однако, если вы хотите копнуть достаточно глубоко, вы найдете ссылки на его внутренности, написанные такими людьми, как Стивен Клири, Стивен Тауб, Эрик Липперт и другие. Есть также много вопросов о его поведении на SO (обратите внимание, что некоторые из них устарели) 06.02.2019
  • @abatishchev Я предлагаю, если вы хотите получить более конкретный ответ о поведении планировщика заданий по умолчанию, задайте вопрос в StackOverflow. Подробное описание того, почему TaskScheduler будет разделять потоки пула потоков и потоки ввода-вывода так, как он это делает. Я буду рад сопоставить все имеющиеся у меня источники и сослаться на соответствующую (и иногда неочевидную) документацию, которую смогу найти. Я также заметил, что Стивен Клири снова отвечает на вопросы и активно отвечает на вопросы-задачи. Так что и тут вам может повезти 06.02.2019

  • 2

    Я надеюсь, что смогу выполнить каждую итерацию записи, используя свой собственный процесс или поток.

    Это не лучший способ сделать это. Процессы и потоки — это ограниченные ресурсы. Ваш ограничивающий фактор ожидает в сети выполнения действия.

    То, что вы хотите сделать, это что-то вроде:

    var tasks = new List<Task>(queue.Count);
    
    while (queue.Count > 0)
    {
      var myobject = Queue.Dequeue();
      var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
      tasks.Add(task);
    }
    await Task.WhenAll(tasks);
    

    Здесь мы просто создаем задачи так быстро, как только можем, а затем ждем их завершения. Мы просто позволим .Net framework позаботиться обо всем остальном.

    Здесь важно то, что потоки не увеличивают скорость ожидания сетевых ресурсов. Задачи — это способ делегировать то, что нужно сделать, из рук потоков, чтобы у вас было больше потоков для любых действий (например, запуск новой загрузки или ответ на завершенную загрузку). Если поток просто ждет завершения загрузки, это потраченный впустую ресурс.

    05.02.2019
  • Вот почему для этого лучше подходят задачи (что пытался сделать ОП). Позвольте планировщику задач определить, что лучше всего оптимизировать ресурсы для приложения/сервера. 05.02.2019

  • 3

    Вам, вероятно, нужно это:

    var tasks = files.Select(Upload);
    await Task.WhenAll(tasks);
    

    Просто обратите внимание, что он будет порождать столько задач, сколько у вас есть файлов, что может привести к остановке процесса/машины, если их будет слишком много. См. раздел Набор задач только с X работает одновременно в качестве примера, как решить эту проблему.

    05.02.2019

    4

    Другие ответы хороши, однако другой подход заключается в вашем TPL DataFlow, доступном в Nuget из https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/

    public static async Task DoWorkLoads(List<Something> results)
    {
       var options = new ExecutionDataflowBlockOptions
                         {
                            MaxDegreeOfParallelism = 50
                         };
    
       var block = new ActionBlock<Something>(MyMethodAsync, options);
    
       foreach (var result in results)
          block.Post(result );
    
       block.Complete();
       await block.Completion;
    
    }
    
    ...
    
    public async Task MyMethodAsync(Something result)
    {       
       //  Do async work here
    }
    

    Преимущество потока данных

    1. Это естественно работает с async, как и WhenAll решения на основе задач
    2. it can also be plumbed in to a larger pipeline of tasks
      • You could retry errors by piping them back in.
      • Добавьте любые вызовы предварительной обработки в более ранние блоки
    3. Вы можете ограничить MaxDegreeOfParallelism, если вас беспокоит дросселирование.
    4. Вы можете создавать более сложные конвейеры, отсюда и название DataFlow.
    05.02.2019

    5

    Вы можете преобразовать свой код в функции Azure и разрешить Azure выполняет большую часть параллелизма, масштабирования и загрузки в хранилище BLOB-объектов Azure.

    Вы можете использовать триггер Http или триггер служебной шины, чтобы инициировать каждую задачу загрузки, обработки и загрузки.

    05.02.2019
    Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..