Nano Hash - криптовалюты, майнинг, программирование

Подождите, пока переменная станет равной нулю

Я пишу многопоточную программу, которая может выполнять некоторые задачи в отдельных потоках.

Некоторые операции требуют их ожидания в конце выполнения моей программы. Я написал простую защиту для таких «важных» операций:

class CPendingOperationGuard final
{
public: 
    CPendingOperationGuard()
    {
        InterlockedIncrementAcquire( &m_ullCounter );
    }

    ~CPendingOperationGuard()
    {
        InterlockedDecrementAcquire( &m_ullCounter );
    }

    static bool WaitForAll( DWORD dwTimeOut )
    {
        // Here is a topic of my question
        // Return false on timeout
        // Return true if wait was successful
    }

private:
    static volatile ULONGLONG m_ullCounter;
};

Использование простое:

void ImportantTask()
{
    CPendingOperationGuard guard;
    // Do work
}

// ...

void StopExecution()
{
    if(!CPendingOperationGuard::WaitForAll( 30000 )) {
        // Handle error
    }
}

Вопрос в том, как эффективно ждать, пока m_ullCounter не станет равным нулю или пока не истечет время ожидания.

У меня есть две идеи:

  1. Чтобы запустить эту функцию в другом отдельном потоке и напишите WaitForSingleObject( hThread, dwTimeout ):

    DWORD WINAPI WaitWorker( LPVOID )
    {
        while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
            ;
    }
    

    Но он будет "съедать" почти 100% процессорного времени - плохая идея.

  2. Вторая идея - разрешить запуск других потоков:

    DWORD WINAPI WaitWorker( LPVOID )
    {
        while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
            Sleep( 0 );
    }
    

    Но это переключит контекст выполнения в режим ядра и обратно - слишком дорого для майской задачи. Тоже плохая идея

Вопрос в следующем:
Как выполнить почти нулевые накладные расходы, ожидая, пока моя переменная не станет равной нулю? Можно и без отдельного потока... Главное условие - поддержка остановки ожидания по таймауту.

Возможно, кто-то может предложить совершенно другую идею для моей задачи - ждать всех зарегистрированных операций (как в WinAPI ThreadPools - его API, например, WaitForThreadpoolWaitCallbacks выполнять ожидание ВСЕХ зарегистрированных задач).

PS: невозможно переписать мой код с помощью API ThreadPool :(


  • Есть ли конкретная причина не использовать библиотеку потоков и атомарности С++? Вы используете final, поэтому С++ 11 кажется доступным. В С++ 11 также есть std::condition_variable, который позволяет вам ждать бездействия, пока вы не получите уведомление от другого потока. 10.04.2020
  • Вы должны открыть свою книгу по C++ на главе, объясняющей, как использовать std::mutex и std::condition_variable, и прочитать ее. Использование volatile не дает абсолютно ничего полезного для достижения вашей цели. volatile не для этого. 10.04.2020
  • volatile не означает потокобезопасность. 10.04.2020
  • @JesperJuhl Возможно, это так, но используемые здесь вызовы WinAPI, похоже, требуют этого: docs.microsoft.com/en-us/previous-versions/windows/desktop/ 10.04.2020
  • @SamVarshavchik См. комментарий выше. 10.04.2020
  • @walnut - теперь все, что нужно сделать, это выяснить, как использовать эти конкретные вызовы Win API для реализации ожидания с почти нулевыми накладными расходами, и тогда будет аргумент в пользу их использования с volatile объектами. А если не можешь, значит, ты не можешь их использовать, так что volatile, опять же, абсолютно ничего не даст. 10.04.2020
  • @walnut, да, есть причина использовать продукты WinAPI - весь проект написан с WinAPI, и мой проект должен соответствовать стилю кода проекта. 10.04.2020
  • Однако пока не понимаю, как связать std::condition_variable с ожиданием обнуления переменной :( 10.04.2020
  • Я бы предложил продолжить использование InterlockedIncrement/InterlockedDecrement и просто проверить возвращаемое значение от каждого уменьшения, когда оно достигает 0, установить событие или какое-либо другое уведомление (APC, пользовательский пакет IOCP, сообщение Windows). Не пытайтесь ждать самой переменной. Помните, что возвращаемое значение каждого InterlockedDecrement — это значение после декремента, и оно гарантированно будет атомарным. 10.04.2020
  • @GeorgyFirsov весь проект написан с помощью WinAPI, и мой проект должен соответствовать стилю кода проекта -- это не убедительный аргумент в пользу того, чтобы не использовать модель памяти C++11. Если этот код, который вы нам показываете, является вашим собственным, и нет других мест, где используются функции WinAPI MultiThread (не только WinAPI), то я не вижу проблемы. Кроме того, если код, который вы показываете, изолирован от остальной части базы кода, опять же, я не вижу проблемы с использованием С++ 11. 10.04.2020
  • @SoronelHaetir - когда он достигает 0, установите событие или какое-либо другое уведомление - действительно он может много раз достигать 0 - каждый раз, когда нет активной операции. но после этого может начаться новая операция. просто счетчика здесь недостаточно 11.04.2020

Ответы:


1

Взгляните на WaitOnAddress(). и WakeByAddressSingle()/WakeByAddressAll(), представленные в Windows 8.

Например:

class CPendingOperationGuard final
{
public: 
    CPendingOperationGuard()
    {
        InterlockedIncrementAcquire(&m_ullCounter);
        Wake­By­Address­All(&m_ullCounter);
    }

    ~CPendingOperationGuard()
    {
        InterlockedDecrementAcquire(&m_ullCounter);
        Wake­By­Address­All(&m_ullCounter);
    }

    static bool WaitForAll( DWORD dwTimeOut )
    {
        ULONGLONG Captured, Now, Deadline = GetTickCount64() + dwTimeOut;
        DWORD TimeRemaining;
        do
        {
            Captured = InterlockedExchangeAdd64((LONG64 volatile *)&m_ullCounter, 0);
            if (Captured == 0) return true;
            Now = GetTickCount64();
            if (Now >= Deadline) return false;
            TimeRemaining = static_cast<DWORD>(Deadline - Now);
        }
        while (WaitOnAddress(&m_ullCounter, &Captured, sizeof(ULONGLONG), TimeRemaining));
        return false;
    }

private:
    static volatile ULONGLONG m_ullCounter;
};

Рэймонд Чен написал серию статей в блоге об этих функциях:

WaitOnAddress позволяет создать объект синхронизации из любой переменной данных, даже из байта< /а>

Реализация критического раздела с точки зрения WaitOnAddress

Фальшивые пробуждения, условия гонки и поддельные заявки FIFO: заглянуть за кулисы WaitOnAddress

Расширение нашего критического раздела на основе WaitOnAddress для поддержки тайм-аутов

Сравнение функции WaitOnAddress с фьютексами (futexi? futexen?)

Создание семафора из WaitOnAddress

Создание семафора с максимальным количеством от WaitOnAddress

Создание события с ручным сбросом из WaitOnAddress

Создание события автоматического сброса из WaitOnAddress

Вспомогательная функция шаблона для ожидания WaitOnAddress в цикле

10.04.2020
  • это не решение, потому что ждать, когда m_ullCounter станет 0, недостаточно для задачи. предположим, что мы получили это m_ullCounter == 0 . и что ? новая операция может начаться уже после того, как m_ullCounter станет 0 11.04.2020
  • и, как примечание (офтопик), InterlockedExchangeAdd64((LONG64 volatile *)&m_ullCounter, 0); существует смысл использовать только для операции ReadModifyWrite. это не имеет смысла только для чтения, если процессор может выполнять чтение как атомарное (выровненное 32-битное целочисленное атомарное чтение на x86/x64 и думать также на любой платформе), поэтому достаточно просто прочитать m_ullCounter 11.04.2020

  • 2

    для этой задачи вам понадобится что-то вроде Run-Down Защита вместо CPendingOperationGuard

    перед началом работы вы вызываете ExAcquireRundownProtection< /a> и только если он вернет TRUE - начать выполнение операции. в конце вы должны вызвать ExReleaseRundownProtection< /а>

    поэтому шаблон должен быть следующим

    if (ExAcquireRundownProtection(&RunRef)) {
        do_operation();
        ExReleaseRundownProtection(&RunRef);
    }
    

    когда вы хотите остановить этот процесс и дождаться завершения всех активных вызовов do_operation();, вы звоните ExWaitForRundownProtectionRelease (вместо WaitWorker)

    После вызова ExWaitForRundownProtectionRelease подпрограмма ExAcquireRundownProtection вернет значение FALSE (поэтому новые операции после этого запускаться не будут). ExWaitForRundownProtectionRelease ожидает возврата до тех пор, пока all не вызовет процедуру ExReleaseRundownProtection для сброса ранее полученной защиты от выбега (поэтому, когда все текущие (если существуют) операции завершатся). Когда все незавершенные обращения завершены, ExWaitForRundownProtectionRelease возвращает значение

    к сожалению, этот API реализуется системой только в режиме ядра и не имеет аналога в пользовательском режиме. однако не сложно реализовать такую ​​идею самостоятельно

    это мой пример:

    enum RundownState {
        v_complete = 0, v_init = 0x80000000
    };
    
    template<typename T>
    class RundownProtection
    {
        LONG _Value;
    
    public:
    
        _NODISCARD BOOL IsRundownBegin()
        {
            return 0 <= _Value;
        }
    
        _NODISCARD BOOL AcquireRP()
        {
            LONG Value, NewValue;
    
            if (0 > (Value = _Value))
            {
                do 
                {
                    NewValue = InterlockedCompareExchangeNoFence(&_Value, Value + 1, Value);
    
                    if (NewValue == Value) return TRUE;
    
                } while (0 > (Value = NewValue));
            }
    
            return FALSE;
        }
    
        void ReleaseRP()
        {
            if (InterlockedDecrement(&_Value) == v_complete)
            {
                static_cast<T*>(this)->RundownCompleted();
            }
        }
    
        void Rundown_l()
        {
            InterlockedBitTestAndResetNoFence(&_Value, 31);
        }
    
        void Rundown()
        {
            if (AcquireRP())
            {
                Rundown_l();
                ReleaseRP();
            }
        }
    
        RundownProtection(RundownState Value = v_init) : _Value(Value)
        {
        }
    
        void Init()
        {
            _Value = v_init;
        }
    };
    
    ///////////////////////////////////////////////////////////////
    
    class OperationGuard : public RundownProtection<OperationGuard>
    {
        friend RundownProtection<OperationGuard>;
    
        HANDLE _hEvent;
    
        void RundownCompleted()
        {
            SetEvent(_hEvent);
        }
    
    public:
    
        OperationGuard() : _hEvent(0) {}
    
        ~OperationGuard() 
        {
            if (_hEvent)
            {
                CloseHandle(_hEvent);
            }
        }
    
        ULONG WaitComplete(ULONG dwMilliseconds = INFINITE)
        {
            return WaitForSingleObject(_hEvent, dwMilliseconds);
        }
    
        ULONG Init()
        {
            return (_hEvent = CreateEvent(0, 0, 0, 0)) ? NOERROR : GetLastError();
        }
    } g_guard;
    
    //////////////////////////////////////////////
    
    ULONG CALLBACK PendingOperationThread(void*)
    {
        while (g_guard.AcquireRP())
        {
            Sleep(1000);// do operation
            g_guard.ReleaseRP();
        }
    
        return 0;
    }
    
    void demo()
    {
        if (g_guard.Init() == NOERROR)
        {
            if (HANDLE hThread = CreateThread(0, 0, PendingOperationThread, 0, 0, 0))
            {
                CloseHandle(hThread);
            }
    
            MessageBoxW(0, 0, L"UI Thread", MB_ICONINFORMATION|MB_OK);
    
            g_guard.Rundown();
    
            g_guard.WaitComplete();
        }
    }
    

    зачем просто ждать, когда ждать, пока m_ullCounter не станет нулем, недостаточно

    если мы читаем 0 из m_ullCounter, это означает только то, что в это время нет активной операции. но отложенная операция может начаться уже после того, как мы проверим, что m_ullCounter == 0 . мы можем использовать специальный флаг (скажем, bool g_bQuit) и установить его. перед началом операции проверьте этот флаг и не начинайте, если он истинен. но этого в любом случае недостаточно

    наивный код:

    // рабочий поток

    if (!g_bQuit) // (1)
    {
        // MessageBoxW(0, 0, L"simulate delay", MB_ICONWARNING);
    
        InterlockedIncrement(&g_ullCounter); // (4)
        // do operation
        InterlockedDecrement(&g_ullCounter); // (5)
    }
    

    // здесь ждем завершения всей операции

        g_bQuit = true; // (2)
    
        // wait on g_ullCounter == 0, how - not important
        while (g_ullCounter) continue; // (3)
    
    • ожидание операции проверки флага g_bQuit (1) - пока ложно, поэтому начинается
    • рабочий поток заменен (используйте MessageBox для имитации этого)
    • устанавливаем g_bQuit = true; // (2)
    • мы проверяем/ждем g_ullCounter == 0, это 0, поэтому мы выходим (3)
    • пробуждение рабочего потока (возврат из MessageBox) и приращение g_ullCounter (4)

    проблема здесь в том, что операция может использовать некоторые ресурсы, которые мы уже начинаем уничтожать после g_ullCounter == 0

    это происходит из-за проверки флага выхода (g_Quit) и счетчика приращения после этого не атомарный - между ними может быть пробел.

    для правильного решения нам нужен атомарный доступ к флагу + счетчику. это и сделать защиту от износа. для флага+счетчика используется одна переменная LONG (32 бита), потому что мы можем сделать к ней атомарный доступ. 31 бит используется для счетчика и 1 бит используется для флага выхода. Решение для Windows использует 0 бит для флага (1 означает выход) и [1..31] бит для счетчика. я использую биты [0..30] для счетчика и 31 бит для флага (0 означает выход). искать

    10.04.2020
    Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..