Nano Hash - криптовалюты, майнинг, программирование

Безопасная отмена таймера дедлайна boost asio

Я пытаюсь безопасно отменить boost::asio::basic_waitable_timer<std::chrono::steady_clock>.

Согласно этому ответу, этот код должен выполнять эту работу:

timer.get_io_service().post([&]{timer.cancel();})

Боюсь, у меня это не работает.
Я что-то не так делаю?
Вот мой код:

#include <iostream>
#include "boost/asio.hpp"
#include <chrono>
#include <thread>
#include <random>

boost::asio::io_service io_service;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer(io_service);
std::atomic<bool> started;

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout\n";
        timer.expires_from_now(std::chrono::milliseconds(10));
        timer.async_wait(&handle_timeout);
    } else if (ec == boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout aborted\n";
    } else {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout another error\n";
    }
}

int main() {

    std::cout << "tid: " << std::this_thread::get_id() << ", Hello, World!" << std::endl;
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 100);

    for (auto i = 0; i < 1000; i++) {

        started = false;
        std::thread t([&](){

            timer.expires_from_now(std::chrono::milliseconds(0));
            timer.async_wait(&handle_timeout);

            io_service.run();
        });

        while (!started) {};
        auto sleep = dis(gen);
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", sleeps for " << sleep << " [ms]" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
        timer.get_io_service().post([](){
            std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
            timer.cancel();
        });
//      timer.cancel();
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", waiting for thread to join()" << std::endl;
        t.join();
        io_service.reset();
    }

    return 0;
}

Это результат:

...
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737353967488, i: 2, ожидание объединения потока()
tid: 140737335076608, отмена в сообщении
tid: 140737335076608, handle_timeout aborted
tid: 140737353967488, i: 3, засыпает на 21 [мс]
tid: 140737335076608, handle_timeout
tid: 140737353967488, i: 3, ожидание объединения потока BR> TID: 140737335076608, Handle_timeout
TID: 140737335076608, Отмена в Post
TID: 140737335076608, Handle_timeout
TID: 140737335076608, Handle_timeout
TID: 140737335076608, Handle_timeout
TID: 140737335076608, Handle_timeout < br> tid: 140737335076608, handle_timeout
...
продолжаться вечно...

Как видите, timer.cancel() вызывается из соответствующего потока:

tid: 140737335076608, отмена по почте

НО нет

tid: 140737335076608, handle_timeout прерван

После.

Главное ждет вечно.


Ответы:


1

Отмена безопасна.

Это просто не надежно. Вы не учли случай, когда таймер не был отложен. Тогда вы отмените его один раз, но он просто запустит новое асинхронное ожидание после вызова обработчика завершения.

Ниже приведены мои подробные шаги о том, как я отследил проблему.

РЕЗЮМЕ TL;DR

Отмена времени отменяет только асинхронные операции в полете.

Если вы хотите отключить цепочку асинхронных вызовов, вам придется использовать для этого дополнительную логику. Пример приведен ниже.

Отслеживание обработчика

Включение с

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1

В результате получается вывод, который можно визуализировать с помощью boost/libs/asio/tools/handlerviz.pl:

Успешная трассировка

введите описание изображения здесь

Как видите, async_wait находится в полете, когда происходит отмена.

«Плохой» след

(усечено, потому что это будет работать бесконечно)

введите описание изображения здесь

Обратите внимание, как обработчик завершения видит cc=system:0, а не cc=system:125 (для operation_aborted). Это симптом того, что выложенная отмена на самом деле не "взяла". Единственное логическое объяснение (не видно на диаграмме) заключается в том, что таймер уже истек до того, как будет вызвана отмена.

Давайте сравним необработанные трассировки¹

введите описание изображения здесь

¹ удаление шумовой разницы

Обнаружение

Итак, у нас есть зацепка. Можем ли мы это обнаружить?

    timer.get_io_service().post([](){
        std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
        if (timer.expires_from_now() >= std::chrono::steady_clock::duration(0)) {
            timer.cancel();
        } else {
            std::cout << "PANIC\n";
            timer.cancel();
        }
    });

Отпечатки:

tid: 140113177143232, i: 0, waiting for thread to join()
tid: 140113177143232, i: 1, waiting for thread to join()
tid: 140113177143232, i: 2, waiting for thread to join()
tid: 140113177143232, i: 3, waiting for thread to join()
tid: 140113177143232, i: 4, waiting for thread to join()
tid: 140113177143232, i: 5, waiting for thread to join()
tid: 140113177143232, i: 6, waiting for thread to join()
tid: 140113177143232, i: 7, waiting for thread to join()
tid: 140113177143232, i: 8, waiting for thread to join()
tid: 140113177143232, i: 9, waiting for thread to join()
tid: 140113177143232, i: 10, waiting for thread to join()
tid: 140113177143232, i: 11, waiting for thread to join()
tid: 140113177143232, i: 12, waiting for thread to join()
tid: 140113177143232, i: 13, waiting for thread to join()
tid: 140113177143232, i: 14, waiting for thread to join()
tid: 140113177143232, i: 15, waiting for thread to join()
tid: 140113177143232, i: 16, waiting for thread to join()
tid: 140113177143232, i: 17, waiting for thread to join()
tid: 140113177143232, i: 18, waiting for thread to join()
tid: 140113177143232, i: 19, waiting for thread to join()
tid: 140113177143232, i: 20, waiting for thread to join()
tid: 140113177143232, i: 21, waiting for thread to join()
tid: 140113177143232, i: 22, waiting for thread to join()
tid: 140113177143232, i: 23, waiting for thread to join()
tid: 140113177143232, i: 24, waiting for thread to join()
tid: 140113177143232, i: 25, waiting for thread to join()
tid: 140113177143232, i: 26, waiting for thread to join()
PANIC

Можем ли мы сообщить о «суперотмене» другим, более ясным способом? У нас есть... только объект timer для работы, конечно:

Сигнализация отключения

У объекта timer не так много свойств для работы. Там нет close() или чего-то подобного, как в сокете, который можно использовать для перевода таймера в какое-то недопустимое состояние.

Однако есть момент истечения срока действия, и мы можем использовать специальное значение домена, чтобы сигнализировать о «недействительности» для нашего приложения:

timer.get_io_service().post([](){
    std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
    // also cancels:
    timer.expires_at(Timer::clock_type::time_point::min());
});

Это «специальное значение» легко обрабатывать в обработчике завершения:

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        if (timer.expires_at() != Timer::time_point::min()) {
            timer.expires_from_now(std::chrono::milliseconds(10));
            timer.async_wait(&handle_timeout);
        } else {
            std::cerr << "handle_timeout: detected shutdown\n";
        }
    } 
    else if (ec != boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout error " << ec.message() << "\n";
    }
}
02.04.2017
  • Вау, спасибо! Вы написали The cancellation is safe.. Вы имели в виду использование post(), верно? Не обычный timer.cancel() ? 02.04.2017
  • Конечно. Является потокобезопасным в том смысле, что он не вызывает гонку данных, поэтому поведение определяется 02.04.2017
  • Хороший обходной путь, но... не кажется ли вам, что должна быть лучшая функция отмены, которая скрывает весь этот беспорядок в деталях реализации? Вопросы, связанные с отменой, возникают снова и снова... 02.04.2017
  • @ИгорьР. Думаю, мне бы понравился таймер с функцией close(), как я описал. Нетрудно написать один, объединяющий таймеры Asio. На практике у меня обычно есть отдельный флаг отключения/счетчик ссылок, чтобы у меня не было этой проблемы. 02.04.2017
  • @sehe, я хочу иметь эмпирическое правило; Согласно документу dead_timer, общий объект между потоками небезопасен. Вы говорите, что cancel() является потокобезопасным именно из-за его реализации? Когда я пишу функцию stop(), которая будет останавливать такие вещи, как deadline_timer/socket/stream descriptor/signal, должен ли я использовать post() в качестве эмпирического правила, чтобы предотвратить неопределенное поведение, когда разные потоки вызывают этот stop() или все эти вызовы должен быть потокобезопасным, как cancel()? Спасибо 03.04.2017
  • @hudac Я просто подтверждаю, что вы используете его потокобезопасно, на самом деле я больше ничего не сказал. Его использование безопасно, потому что вы отправляете его в службу, и служба работает в одном потоке, что означает, что вы получаете неявное поведение цепочки (никакие два обработчика никогда не запускаются одновременно). в то же время). 03.04.2017
  • @hudac В частности, это не эмпирическое правило, если вы запускаете службу в большем количестве потоков! В этом случае вам нужна цепь для синхронизации доступа к объектам службы (например, deadline_timer). См. stackoverflow.com/questions/12794107/. Я надеюсь, что это доказывает, что cancel() не является потокобезопасным, согласно документации (никто не говорил об этом). 03.04.2017
  • @sehe, у вас есть обходной путь для безопасной отмены boost::asio::signal_set? Или я должен использовать какой-то флаг shutdown? 03.04.2017
  • @hudac Не думаю, что знаю (обычно я просто слушаю INT / TERM один раз). Конечно, вы можете просто signal_set.clear(...); (теперь, когда вы получаете сигнал 0, это означает, что вам, вероятно, следует завершить работу) 03.04.2017
  • Как насчет переноса handle_timeout по цепочке и публикации timer.cancel() в той же цепочке 19.09.2017
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..