Nano Hash - криптовалюты, майнинг, программирование

Запись в файл из общего буфера недостающих данных и сбой программы без cout

Я делаю программу, используя потоки и общий буфер. Два потока бесконечно работают в фоновом режиме, один поток заполняет общий буфер данными, а другой поток записывает содержимое общего буфера в файл.

Пользователь может запустить или остановить заполнение данных, в результате чего поток переходит в состояние ожидания до тех пор, пока пользователь снова не запустит поток. В каждом цикле буфер заполняется 50 поплавками.

Это код:


#include <iostream>
#include <vector>
#include <iterator>
#include <utility>
#include <fstream>
#include <condition_variable>
#include <mutex>
#include <thread>

using namespace std;

std::mutex m;
std::condition_variable cv;
std::vector<std::vector<float>> datas;
bool keep_running = true, start_running = false;

void writing_thread()
{
    ofstream myfile;

    bool opn = false;

    while(1)
    {

        while(keep_running)
        {
            // Open the file only once
            if(!opn)
            {
                myfile.open("IQ_Datas.txt");
                opn = true;

            }


            // Wait until main() sends data
            std::unique_lock<std::mutex> lk(m);

            cv.wait(lk, [] {return !datas.empty();});


            auto d = std::move(datas);


            lk.unlock();


            for(auto &entry : d)
            {
                for(auto &e : entry)
                    myfile << e << endl;
            }


        }

        if(opn)
        {
            myfile.close();
            opn = false;
        }

    }
}

void sending_thread()
{

    std::vector<float> m_buffer;
    int cpt=0;
    //Fill the buffer with 50 floats
    for(float i=0; i<50; i++)
        m_buffer.push_back(i);

    while(1)
    {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {return keep_running && start_running;});

        }
        while(keep_running)
        {

            //Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;

            cout << "in3" << endl; //Commenting this line makes the program crash

            {
                std::lock_guard<std::mutex> lk(m);
                if (!keep_running)break;
                datas.push_back(std::move(d));
            }
            cv.notify_one();
            cpt++;
        }

        cout << "Total data: " << cpt*50 << endl;
        cpt = 0;
    }
}
void start()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = true;
    }
    cv.notify_all();
}
void stop()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = false;
    }
    cv.notify_all();
}

int main()
{
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    t1.detach();
    t2.detach();

    while(1)
    {

        std::cin >> go;

        if(go == 1)
        {
            start();
            keep_running = true;
        }
        else if(go == 0)
        {
            stop();
            keep_running = false;
        }


    }

    return 0;
}


У меня есть 2 проблемы с этим кодом:

  • При комментировании строки cout << "in3" << endl; программа вылетает через ~20-40 секунд с сообщением об ошибке: вызывается завершение после создания экземпляра 'std::bad_alloc' what(): std::bad_alloc. Если я позволю cout, программа будет работать без проблем.

  • Когда программа работает, после остановки sending_thread я отображаю общий объем данных, которые были скопированы с помощью cout << "Total data: " << cpt*50 << endl;. При небольшом объеме данных все они правильно записываются в файл, но при большом объеме данные отсутствуют. Отсутствуют/правильные данные (Общее количество строк в файле не соответствует total data)

Почему с cout программа работает правильно? И что является причиной отсутствия данных? Это потому, что sending_thread слишком быстро заполняет буфер, а writing_thread занимает слишком много времени для записи в файл?

EDIT: некоторые уточнения, добавление большего количества cout в sending_threadseems, чтобы исправить все проблемы. Первый поток произвел 21 миллион чисел с плавающей запятой, а второй поток успешно записал в файл 21 миллион чисел с плавающей запятой. Кажется, что без cout потоки-производители работают слишком быстро, чтобы поток-потребитель мог продолжать извлекать данные из общего буфера при записи их в файл.


  • Кажется, что ваш основной поток изменяет общую логическую переменную keep_running без какого-либо взаимного исключения. Таким образом, изменения могут оставаться в кэше ядра процессора, на котором работает основной поток. Однако сам факт записи in3 является системным вызовом, который может вызвать очистку кеша, отсюда и возможное объяснение изменения поведения. Вы также должны использовать правильное взаимное исключение в основном потоке. 21.06.2019
  • Хорошо, могу я изменить значение keep_running в функции startи stop, чтобы избежать такого поведения? Кроме того, тот факт, что keep_running может оставаться в кеше, вызывает ошибку bad_alloc? 21.06.2019
  • Да, перемещение изменений в общую переменную в область, защищенную блокировкой, должно помочь. Кроме того, в вашем исходном коде возможно, что поток-производитель увидит изменение и будет работать без остановок, в то время как поток-потребитель не увидит изменения и останется на месте. Следовательно, большое количество данных буферизуется до тех пор, пока ресурсы не будут исчерпаны. Но, вообще говоря, не очень полезно тратить время людей на размышления о том, почему плохо синхронизированные программы демонстрируют то или иное поведение. 21.06.2019
  • @jpmarinier Запуск программы без изменения значений keep_running и start_running по-прежнему приводит к bad_alloc, возможно, проблема не в общих переменных. РЕДАКТИРОВАТЬ: после печати 1, 2... в writing_thread кажется, что поток застрял на некоторое время в циклах for перед вызовом bad_alloc 21.06.2019
  • Вам нужно пересмотреть этот auto d = std::move(datas);, который оставит ваш datas в неопределенном состоянии, возможно, реконструкция datas после move придания ему нового начального состояния решит ваши недостающие данные. 21.06.2019
  • @muaz Да, я изменил его, как посоветовал мне Тед Лингмо. auto d = std::move(datas); теперь datas.swap(d);. Нет недостающих данных только тогда, когда я ставлю 3 cout (см. Мое редактирование), действительно странно. 21.06.2019
  • cout просто замедляет ваш поток-производитель, который потребляет вашу память, давая возможность потоку-потребителю очистить ваш вектор, но более элегантным решением будет предоставление максимального размера вашему вектору и не позволяйте потоку-производителю передавать данные до тех пор, пока он не получит снова пусто, то вам не нужны эти cout 21.06.2019
  • @muaz Думаю, альтернативой может быть d.swap(datas) вместо datas.swap(d) для очистки и перераспределения данных. 21.06.2019
  • @muaz Я добавил if(datas.empty())datas.push_back(std::move(d)); для отправки данных только тогда, когда вектор пуст, он работает, но медленно, и я не вижу, как указать максимальный размер для вектора, потому что каждый цикл datas содержит только один вектор, а затем он заблокирован потребительским потоком который скопируйте из datas, затем очистите его. 22.06.2019
  • Просто в качестве предложения перед datas.push_back(std::move(d)); установите это условие if(datas.size() * d.size() * sizeof(float) < size ), где size - некоторое целое число, представляющее разумный размер памяти в байтах, выделенный для datas, скажем, 2 МБ. 22.06.2019
  • d.swap(datas) вместо datas.swap(d) делаем то же самое. Вопрос только в том, кто выполняет фактический обмен (обмен указателями). Оба находятся в допустимом и указанном состоянии. Добавление проверки для отправки только в том случае, если вектор пуст, в качестве проверки, если клиентская сторона опустошила вектор, может вызвать состояние гонки. Тогда у вас будет несколько чеков для одного и того же. Ни разу не хороший знак. Я посмотрю на это более внимательно, когда закончится шведская середина лета :-) 22.06.2019
  • @TedLyngmo Я вижу, я пытался поставить переменную условия, чтобы дождаться, пока вектор станет пустым, но мне не удалось заставить его работать. Завтра попробую еще раз, устал ха-ха :) 22.06.2019

Ответы:


1

Избегать:

Moved-from object 'datas' of type 'std::vector' is moved:
        auto d = std::move(datas);
                 ^~~~~~~~~~~~~~~~

Замените это:

        // Wait until main() sends data
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [] {return !datas.empty();});
        auto d = std::move(datas);
        lk.unlock();

С этим:

        // Wait until main() sends data            
        std::vector<std::vector<float>> d;
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] { return !datas.empty(); });
            datas.swap(d);
        }

Также замените свои переменные bool, доступ к которым осуществляется из нескольких потоков, на std::atomic_bool или std::atomic_flag.

bad_alloc исходит из того, что sending_thread намного быстрее, чем writing_thread, поэтому ему не хватит памяти. Когда вы замедляете sending_thread достаточно (с печатью), проблема менее заметна, но вам нужна некоторая синхронизация, чтобы сделать это правильно. Вы можете создать класс-оболочку вокруг него и предоставить методы вставки и извлечения, чтобы убедиться, что весь доступ синхронизирован правильно, а также предоставить ему максимальное количество элементов. Пример:

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}

    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements && m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        if(m_shutdown == false) {
            m_current_elements = 0;
            return_value.swap(m_data);
        } else {
            // return an empty vector if we should shutdown
        }
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

Если вы хотите продолжать извлекать данные даже после завершения работы, вы можете изменить extract_all() на это:

   std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

Полный пример может выглядеть так:

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <iterator>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

using namespace std;

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}
    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements &&
              m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

std::mutex m;
std::condition_variable cv;
atomic2dvector<float> datas(256 * 1024 * 1024 / sizeof(float)); // 0.25 GiB limit
std::atomic_bool start_running = false;

void writing_thread() {
    std::ofstream myfile("IQ_Datas.txt");
    if(myfile) {
        std::cout << "writing_thread waiting\n";

        std::vector<std::vector<float>> d;
        while((d = datas.extract_all()).empty() == false) {
            std::cout << "got " << d.size() << "\n";

            for(auto& entry : d) {
                for(auto& e : entry) myfile << e << "\n";
            }
            std::cout << "wrote " << d.size() << "\n\n";
        }
    }
    std::cout << "writing_thread shutting down\n";
}

void sending_thread() {
    std::vector<float> m_buffer;
    std::uintmax_t cpt = 0;
    // Fill the buffer with 50 floats
    for(float i = 0; i < 50; i++) m_buffer.push_back(i);

    while(true) {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {
                return start_running == true || datas.is_active() == false;
            });
        }
        if(datas.is_active() == false) break;
        std::cout << "sending...\n";
        while(start_running == true) {
            // Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;
            if(datas.insert_one(std::move(d)) == false) break;
            cpt++;
        }
        cout << "Total data: " << cpt * 50 << endl;
        cpt = 0;
    }
    std::cout << "sending_thread shutting down\n";
}

void start() {
    std::unique_lock<std::mutex> lk(m);
    start_running = true;
    cv.notify_all();
}
void stop() {
    std::unique_lock<std::mutex> lk(m);
    start_running = false;
    cv.notify_all();
}
void quit() {
    datas.shutdown();
    cv.notify_all();
}

int main() {
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout << "Enter 1 to make the sending thread send and 0 to make it stop "
                 "sending. Enter a non-integer to shutdown.\n";

    while(std::cin >> go) {
        if(go == 1) {
            start();
        } else if(go == 0) {
            stop();
        }
    }
    std::cout << "--- shutting down ---\n";
    quit();

    std::cout << "joining threads\n";
    t1.join();
    std::cout << "t1 joined\n";
    t2.join();
    std::cout << "t2 joined\n";
}
21.06.2019
  • Комментарии не для расширенного обсуждения; этот разговор был перешел в чат. 25.06.2019
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..