Nano Hash - криптовалюты, майнинг, программирование

Асинхронные/ожидающие задачи и WaitHandle

Скажем, у меня есть 10N элементов (мне нужно получить их по http-протоколу), в коде запускается N задач для получения данных, каждая задача последовательно принимает 10 элементов. Я помещаю предметы в ConcurrentQueue<Item>. После этого элементы обрабатываются потокобезопасным методом один за другим.

async Task<Item> GetItemAsync()
{
    //fetch one item from the internet
}

async Task DoWork()
{
    var tasks = new List<Task>();
    var items = new ConcurrentQueue<Item>();
    var handles = new List<ManualResetEvent>();

    for i 1 -> N
    {
        var handle = new ManualResetEvent(false);
        handles.Add(handle);

        tasks.Add(Task.Factory.StartNew(async delegate
        {
            for j 1 -> 10
            {
                var item = await GetItemAsync();
                items.Enqueue(item);
            }
            handle.Set();
        });
    }

    //begin to process the items when any handle is set
    WaitHandle.WaitAny(handles);

    while(true)
    {
         if (all handles are set && items collection is empty) //***
           break; 
         //in another word: all tasks are really completed

         while(items.TryDequeue(out item))          
         {
              AThreadUnsafeMethod(item);    //process items one by one
         }
    }
}

Я не знаю, что, если условие может быть помещено в оператор, помеченный ***. Я не могу использовать здесь свойство Task.IsCompleted, потому что я использую в задаче await, поэтому задача выполняется очень быстро. И bool[], указывающий, выполнена ли задача до конца, выглядит действительно уродливо, потому что я думаю, что ManualResetEvent может сделать ту же работу. Может ли кто-нибудь дать мне предложение?


Ответы:


1

Что ж, вы могли создать это самостоятельно, но я думаю, что это намного проще с Поток данных TPL.

Что-то типа:

static async Task DoWork()
{
  // By default, ActionBlock uses MaxDegreeOfParallelism == 1,
  //  so AThreadUnsafeMethod is not called in parallel.
  var block = new ActionBlock<Item>(AThreadUnsafeMethod);

  // Start off N tasks, each asynchronously acquiring 10 items.
  // Each item is sent to the block as it is received.
  var tasks = Enumerable.Range(0, N).Select(Task.Run(
      async () =>
      {
        for (int i = 0; i != 10; ++i)
          block.Post(await GetItemAsync());
      })).ToArray();

  // Complete the block when all tasks have completed.
  Task.WhenAll(tasks).ContinueWith(_ => { block.Complete(); });

  // Wait for the block to complete.
  await block.Completion;
}
24.08.2012

2

Вы можете сделать WaitOne с нулевым тайм-аутом, чтобы проверить состояние. Что-то вроде этого должно работать:

if (handles.All(handle => handle.WaitOne(TimeSpan.Zero)) && !items.Any())
    break;

http://msdn.microsoft.com/en-us/library/cc190477.aspx

24.08.2012
  • Блокирует текущий поток до тех пор, пока текущий экземпляр не получит сигнал, поэтому у вас будет 10 блокирующих потоков, ожидающих сигнала. 03.09.2015
  • @Sebastian Примечания: если время ожидания равно нулю, метод не блокируется. Он проверяет состояние дескриптора ожидания и немедленно возвращается. 11.04.2017
  • @jaggedSpire хорошо, это информация, которую я пропустил. Спасибо, теперь это мое предпочтительное решение для проверки WaitHandles. 12.04.2017

  • 3

    Спасибо всем. Наконец я обнаружил, что CountDownEvent очень подходит для этот сценарий. Общая реализация выглядит так: (для информации других)

    for i 1 -> N
    {
        //start N tasks
        //invoke CountDownEvent.Signal() at the end of each task
    }
    
    //see if CountDownEvent.IsSet here
    
    27.08.2012
    Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..