Nano Hash - криптовалюты, майнинг, программирование

повысить asio асинхронное чтение и запись в сокет, используя очередь

Я работаю над простым TCP-сервером, который читает и записывает сообщения в потокобезопасную очередь. Затем приложение может использовать эту очередь для безопасного чтения и записи в сокет даже из разных потоков.

Проблема, с которой я столкнулся, заключается в том, что я не могу async_read. В моей очереди есть операция pop, которая возвращает следующий элемент для обработки, но блокируется, если ничего не доступно. Поэтому, как только я вызываю pop, обратный вызов async_read, конечно, больше не запускается. Есть ли способ интегрировать такую ​​очередь в boost asio или нужно полностью переписывать?

Ниже приведен краткий пример, который я сделал, чтобы показать проблему, с которой я столкнулся. Как только TCP-соединение установлено, я создаю новый поток, который будет запускать приложение под этим tcp_connection. После этого я хочу начать async_read и async_write. Я ломал голову над этим пару часов, и я действительно не знаю, как это решить.

class tcp_connection : public std::enable_shared_from_this<tcp_connection>
{
public:
    static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) {
        return std::shared_ptr<tcp_connection>(new tcp_connection(io_service));
    }

    boost::asio::ip::tcp::socket& get_socket()
    {
        return this->socket;
    }

    void app_start()
    {
        while(1)
        {
            // Pop is a blocking call.
            auto inbound_message = this->inbound_messages.pop();
            std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl;
            this->outbound_messages.push(inbound_message);
        }
    }

    void start() {
        this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this());

        boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
            strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));

        // Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks
        // empty() is also available to check whether the queue is empty.
        // So how can I async write without blocking the read.
        // block...
        auto message = this->outbound_messages.pop();
        boost::asio::async_write(this->socket, boost::asio::buffer(message),
            strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }

    void handle_read(const boost::system::error_code& e, size_t bytes_read)
    {
        std::cout << "handle_read called" << std::endl;
        if (e)
        {
            std::cout << "Error handle_read: " << e.message() << std::endl;
            return;
        }
        if (bytes_read != 0)
        {
            std::istream istream(&this->input_stream);
            std::string message;
            message.resize(bytes_read);
            istream.read(&message[0], bytes_read);
            std::cout << "Got message: " << message << std::endl;
            this->inbound_messages.push(message);
        }
        boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
            strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }

    void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/)
    {
        if (e)
        {
            std::cout << "Error handle_write: " << e.message() << std::endl;
            return;
        }

        // block...
        auto message = this->outbound_messages.pop();
        boost::asio::async_write(this->socket, boost::asio::buffer(message),
            strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
    }



private:
    tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service)
    {
    }

    boost::asio::ip::tcp::socket socket;
    boost::asio::strand strand;
    boost::asio::streambuf input_stream;

    std::thread app_thread;

    concurrent_queue<std::string> inbound_messages;
    concurrent_queue<std::string> outbound_messages;
};

class tcp_server
{
public:
    tcp_server(boost::asio::io_service& io_service)
        : acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001))
    {
        start_accept();
    }

private:
    void start_accept()
    {
        std::shared_ptr<tcp_connection> new_connection =
            tcp_connection::create(acceptor.get_io_service());

        acceptor.async_accept(new_connection->get_socket(),
            boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error));
    }

    void handle_accept(std::shared_ptr<tcp_connection> new_connection,
                       const boost::system::error_code& error)
    {
        if (!error)
        {
            new_connection->start();
        }

        start_accept();
    }

    boost::asio::ip::tcp::acceptor acceptor;
};
07.05.2017

Ответы:


1

Мне кажется, что вам нужен метод async_pop, который принимает заполнитель сообщения об ошибке и обработчик обратного вызова. Когда вы получаете сообщение, проверьте, есть ли незавершенный обработчик, и если да, извлеките сообщение, отмените регистрацию обработчика и вызовите его. Точно так же при регистрации async_pop, если уже есть ожидающее сообщение, извлеките сообщение и отправьте вызов обработчику, не регистрируя его.

Возможно, вы захотите получить класс async_pop из полиморфной базовой базы типа pop_operation или аналогичной.

07.05.2017
  • Спасибо! Мне не приходило в голову, что я могу создать такой обработчик сам. Поскольку я использую strand.wrap, это должно работать нормально. У меня есть один вопрос: зачем мне нужен заполнитель сообщения об ошибке? 07.05.2017
  • @JohnSmith моя ошибка. вам понадобится заполнитель сообщения (std::string& ?) вы захотите передать любой код ошибки из последней неудачной операции чтения ввода-вывода обратно в асинхронный обработчик в случае, когда очередь сообщений исчерпана и есть чтение ошибка. 07.05.2017
  • Да, я реализовал это сейчас. В коде это не строка, но продемонстрировать на примере строки было проще всего. Завтра я опубликую код в другом ответе, но я оставлю ваш принятым. Просто для людей, которые задаются тем же вопросом. 08.05.2017
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..