Nano Hash - криптовалюты, майнинг, программирование

Выполнение параллельных групп задач блокировки по порядку

Я размышлял над этим в течение некоторого времени и все больше и больше узнаю о потоках, исполнителях и т. Д. По мере того, как я иду. У меня есть приблизительное представление об исполнителях и потоках, но я чувствую себя немного застрявшим.

Вот что я пытаюсь сделать.

Есть команды, а есть действия. Команда имеет имя и может вызываться пользователем произвольно, например !playsong, !cheer и т. д. Действие — это то, что отправляет работу службе; например, запрос клиента веб-сокета на отправку нового сообщения или запрос клиента IRC на отправку нового сообщения и т. д.

Когда команда выполняется, она выполняет свои Действия по порядку одно за другим.

Например, команда !cheer может иметь четыре действия:

  1. Сделайте запрос через веб-сокет и дождитесь успешного ответа (например, покажите элемент сцены в OBS)
  2. Отправить IRC-сообщение (например, отправить сообщение в чат). После отправки тогда,
  3. Подождите 1-3 секунды (например, ожидание окончания воспроизведения видео). Как только ожидание закончилось, затем
  4. Сделайте еще один запрос веб-сокета (например, скройте элемент сцены из шага 1)

Мало того, что они должны выполняться по порядку, НО мы не можем запустить их все одновременно (Действия 1, 2 и 4 завершаются первыми, а затем Действие 3 завершается последним); каждое Действие зависит от того, было ли его предыдущее завершено первым.

Вдобавок ко всему, Команды могут быть отправлены клиентами произвольно в любое время и не должны блокировать друг друга. Например, !longcommand может быть запущен, но не заблокирует запуск !shortcommand (при условии, что базовые службы не заблокированы).

Вот что я думаю сделать:

Я знаю, что могу использовать Future/Callable для блокировки в ожидании результата выполнения в данном потоке, поэтому каждое действие должно возвращать будущее при запуске (будущее исходит из соответствующей службы, которую он использует). Затем я могу просто вызвать действия одно за другим блокирующим образом, как это, для команды, чтобы убедиться, что они выполняются по порядку, и каждый ожидает завершения другого:

class ExecutableCommand implments Runnable {
  // omitted for brevity

  run() {
    for(Action action:command.getActions()) {
    action.run().get();
  } 

}

Но как мне справиться с выполнением команд? Думаю, я бы отправил каждую команду через исполнителя, может быть, ThreadPoolExecutor, подобный этому, при каждой отправке?

class ExecutorServiceWrapper {

  private final ExecutorService executorService = Executors.newThreadPoolExecutor(4);
  
  void submit(ExecutableCommand command) {
    executorService.submit(command)
  }

}

И тогда каждый клиент ofc просто сохранит ссылку на ExecutorServiceWrapper и вызовет его в ответ на события, которые их запускают:

class FromChatHandler() {
  private final ExecutorServiceWrapper masterQueue;

  onMessage(String message) {
    Command command = // parse what command to lookup from message
    masterQueue.submit(command)
  }
}

@RestController // or whatever
class MyController() {
  private final ExecutorServiceWrapper masterQueue;

  @Post
  executeCommandByName(String commandName) {
    Command command = // lookup command
    masterQueue.submit(command)
  }
}

class directHandler() {
  private final ExecutorServiceWrapper masterQueue;

  handle(Command command) {
    Command command = // build the command given the message
    masterQueue.submit(command)
  }
}

Я предполагаю, что, поскольку каждая команда отправляется исполнителю, каждая из них отправляется в свой собственный поток, поэтому он не будет блокировать другие.

Но я не уверен, должен ли я делать то, что я делаю выше, с ExecutableCommand и выполнять каждое действие внутри команды, как я.

Кроме того, я не уверен, справится ли он с этим случаем: пул потоков фиксирован на 5 потоков. Выполнено 5 команд. Они давно работают и используют разные сервисы, но базовые сервисы не блокируются и все еще могут принимать работу. Кто-то пытается выполнить 6-ю команду — его нельзя блокировать, потому что базовые службы все еще могут принимать работу.

Есть ли лучший способ сделать это? Я на правильном пути?


  • Возможно, вы ищете методы CompletableFuture#then. 29.12.2020
  • Спасибо, я немного почитаю об этом! ... Однако, как мне справиться с описанным выше сценарием, когда пул потоков исчерпан для команд (из-за наличия многих/длительных действий), но базовые службы доступны для выполнения действий? 29.12.2020
  • Hej @chrylis-cautiouslyoptimistic- , я отредактировал свой пример, включив в него что-то, что, по-видимому, примерно выполняет эту работу (хотя ни в коем случае не обеспечивает требуемой мне детализации). У вас есть какие-нибудь мысли? 30.12.2020

Ответы:


1

Потратив немного больше времени на это, я придумал несколько возможных решений, используя Executors или Futures. Пока не уверен, что будет лучше другого, но поскольку я знаю, что могу расширить ThreadPoolExecutor (скажем, добавить функцию паузы), я, вероятно, склонюсь к Executors.

В противном случае, если у кого-то есть комментарии, они всегда приветствуются!

Я пока храню оба решения в своем GH (), но я также помещу их ниже. https://github.com/TinaTiel/concurrency-learning

Реализация фьючерсов

package futures;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CommandActionExample {

    public static void main(String[] args) {

        // Initialize some starting params
        Random random = new Random();
        int maxActions = 20;
        int maxCommands = 5;

        // Generate some commands, with a random number of actions.
        // We'll use the indexes as the command and action names to keep it simple/readable
        List<Command> commands = new ArrayList<>();
        for(Integer c = 0; c < maxCommands; c++) {
            Command command = new Command(String.format("%d", c+1));
            for(Integer a = 0; a < random.nextInt(maxActions); a++) {
                Action action = new Action(random, String.format("%d", a+1));
                command.addAction(action);
            }
            commands.add(command);
        }

        // Print out the commands we'll execute, again to keep the results readable/understandable
        System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");

        // Build a Future that tries to execute all commands (supplied as Futures) in an arbitrary order
        try {
            CompletableFuture.allOf(commands.stream()
                    .map((Function<Command, CompletableFuture<Void>>) CompletableFuture::runAsync)
                    .collect(Collectors.toList())
                    .toArray(CompletableFuture[]::new)
            ).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
//        commands.get(0).run(); // sanity check one of the command's actions run as expected

        // When we execute the results, the actions should be executed in-order within a command at some point in the future
        // (not started all at once), so something like:
        // 0  Command-2:Action-1 scheduled at 34
        // 0  Command-1:Action-1 scheduled at 21
        // 0  Command-3:Action-1 scheduled at 4
        // 4  Command-3:Action2 scheduled at ...
        // 21 Command-1:Action-2 scheduled at ...
        // 34 Command-1-Action-2 scheduled at ...
        // ...
        // Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?

    }

    public static class Action implements Runnable {

        private Command command;
        private final Random random;
        private final String name;

        public Action(Random random, String name) {
            this.random = random;
            this.name = name;
        }

        public void setCommand(Command command) {
            this.command = command;
        }

        @Override
        public void run() {

            // Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
            long msTime = random.nextInt(1000);
            System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
            try {
                Thread.sleep(msTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Action{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }

    public static class Command implements Runnable {

        private final String name;
        private final List<Action> actions = new ArrayList<>();

        public Command(String name) {
            this.name = name;
        }

        public void addAction(Action action) {
            action.setCommand(this);
            actions.add(action);
        }

        @Override
        public void run() {
            // If there are no actions, then do nothing
            if(actions.isEmpty()) return;

            // Build up a chain of futures.
            // Looks like we have to build them up in reverse order, so start with the first action...
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(actions.remove(0));

            // ...And then reverse the list and build the rest of the chain
            // (yes we could execute backwards...but it's not common and I/others probably don't like to reason about it)
            Collections.reverse(actions);
            for(int i=0; i< actions.size(); i++) {
                completableFuture.thenRun(actions.get(i));
            }

            // Execute our chain
            try {
                completableFuture.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Command{" +
                    "name='" + name + '\'' +
                    ", actions=" + actions +
                    '}';
        }
    }

}

Полученные результаты

Вывод и расписание соответствуют ожиданиям, но похоже, что фьючерсы используют ForkJoinPool.

Commands to execute: 
Command{name='1', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}]}
Command{name='2', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='3', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}, Action{name='7'}]}
Command{name='4', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='5', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}

2020-12-30 21:17:27.11: Command-2:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 207ms
2020-12-30 21:17:27.11: Command-4:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 930ms
2020-12-30 21:17:27.11: Command-1:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 948ms
2020-12-30 21:17:27.11: Command-3:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 173ms
2020-12-30 21:17:27.11: Command-5:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 348ms
2020-12-30 21:17:27.314: Command-3:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 890ms
2020-12-30 21:17:27.345: Command-2:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 178ms
2020-12-30 21:17:27.485: Command-5:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 702ms
2020-12-30 21:17:27.485: Command-5:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 161ms
2020-12-30 21:17:27.532: Command-2:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 201ms
2020-12-30 21:17:27.657: Command-5:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 257ms
2020-12-30 21:17:27.735: Command-2:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 518ms
2020-12-30 21:17:27.919: Command-5:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 258ms
2020-12-30 21:17:28.06: Command-4:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 926ms
2020-12-30 21:17:28.075: Command-1:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 413ms
2020-12-30 21:17:28.184: Command-5:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 77ms
2020-12-30 21:17:28.216: Command-3:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 487ms
2020-12-30 21:17:28.263: Command-2:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 570ms
2020-12-30 21:17:28.497: Command-1:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 273ms
2020-12-30 21:17:28.716: Command-3:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 302ms
2020-12-30 21:17:28.833: Command-2:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 202ms
2020-12-30 21:17:28.992: Command-4:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 733ms
2020-12-30 21:17:29.024: Command-3:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 756ms
2020-12-30 21:17:29.727: Command-4:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 131ms
2020-12-30 21:17:29.78: Command-3:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 920ms
2020-12-30 21:17:29.858: Command-4:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 305ms
2020-12-30 21:17:30.168: Command-4:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 612ms
2020-12-30 21:17:30.715: Command-3:Action-7 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 330ms

Исполнители Реализация

package executors;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CommandActionExample {

    public static void main(String[] args) {

        // Initialize some starting params
        Random random = new Random();
        int maxActions = 20;
        int maxCommands = 5;

        // Generate some commands, with a random number of actions.
        // We'll use the indexes as the command and action names to keep it simple/readable
        List<Command> commands = new ArrayList<>();
        for(Integer c = 0; c < maxCommands; c++) {
            Command command = new Command(String.format("%d", c+1));
            for(Integer a = 0; a < random.nextInt(maxActions); a++) {
                Action action = new Action(random, String.format("%d", a+1));
                command.addAction(action);
            }
            commands.add(command);
        }

        // Print out the commands we'll execute, again to keep the results readable/understandable
        System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");

        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for(Command command:commands) executorService.submit(command);

        // When we execute the results, the actions should be executed in-order within a command at some point in the future
        // (not started all at once), so something like:
        // 0  Command-2:Action-1 scheduled at 34
        // 0  Command-1:Action-1 scheduled at 21
        // 0  Command-3:Action-1 scheduled at 4
        // 4  Command-3:Action2 scheduled at ...
        // 21 Command-1:Action-2 scheduled at ...
        // 34 Command-1-Action-2 scheduled at ...
        // ...
        // Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?

    }

    public static class Action implements Runnable {

        private Command command;
        private final Random random;
        private final String name;

        public Action(Random random, String name) {
            this.random = random;
            this.name = name;
        }

        public void setCommand(Command command) {
            this.command = command;
        }

        @Override
        public void run() {

            // Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
            long msTime = random.nextInt(1000);
            System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
            try {
                Thread.sleep(msTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Action{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }

    public static class Command implements Runnable {

        private final String name;
        private final List<Action> actions = new ArrayList<>();

        public Command(String name) {
            this.name = name;
        }

        public void addAction(Action action) {
            action.setCommand(this);
            actions.add(action);
        }

        @Override
        public void run() {
            // If there are no actions, then do nothing
            if(actions.isEmpty()) return;

            ExecutorService executor = Executors.newSingleThreadExecutor(); // Because there is only one thread, this has the effect of executing sequentially and blocking
            for(Action action:actions) executor.submit(action);

        }

        @Override
        public String toString() {
            return "Command{" +
                    "name='" + name + '\'' +
                    ", actions=" + actions +
                    '}';
        }
    }

}

Результат

Результат и график соответствуют ожиданиям

2020-12-31 09:43:09.952: Command-3:Action-1 executing on Thread 'pool-4-thread-1' executing for 632ms
2020-12-31 09:43:09.952: Command-4:Action-1 executing on Thread 'pool-3-thread-1' executing for 932ms
2020-12-31 09:43:09.952: Command-2:Action-1 executing on Thread 'pool-6-thread-1' executing for 586ms
2020-12-31 09:43:09.952: Command-5:Action-1 executing on Thread 'pool-5-thread-1' executing for 987ms
2020-12-31 09:43:09.952: Command-1:Action-1 executing on Thread 'pool-2-thread-1' executing for 706ms
2020-12-31 09:43:10.562: Command-2:Action-2 executing on Thread 'pool-6-thread-1' executing for 329ms
2020-12-31 09:43:10.608: Command-3:Action-2 executing on Thread 'pool-4-thread-1' executing for 503ms
2020-12-31 09:43:10.891: Command-2:Action-3 executing on Thread 'pool-6-thread-1' executing for 443ms
2020-12-31 09:43:10.9: Command-4:Action-2 executing on Thread 'pool-3-thread-1' executing for 866ms
2020-12-31 09:43:10.955: Command-5:Action-2 executing on Thread 'pool-5-thread-1' executing for 824ms
2020-12-31 09:43:11.346: Command-2:Action-4 executing on Thread 'pool-6-thread-1' executing for 502ms
2020-12-31 09:43:11.766: Command-4:Action-3 executing on Thread 'pool-3-thread-1' executing for 638ms
2020-12-31 09:43:11.779: Command-5:Action-3 executing on Thread 'pool-5-thread-1' executing for 928ms
2020-12-31 09:43:11.848: Command-2:Action-5 executing on Thread 'pool-6-thread-1' executing for 179ms
2020-12-31 09:43:12.037: Command-2:Action-6 executing on Thread 'pool-6-thread-1' executing for 964ms
2020-12-31 09:43:12.412: Command-4:Action-4 executing on Thread 'pool-3-thread-1' executing for 370ms
2020-12-31 09:43:12.709: Command-5:Action-4 executing on Thread 'pool-5-thread-1' executing for 204ms
2020-12-31 09:43:12.783: Command-4:Action-5 executing on Thread 'pool-3-thread-1' executing for 769ms
2020-12-31 09:43:12.913: Command-5:Action-5 executing on Thread 'pool-5-thread-1' executing for 188ms
2020-12-31 09:43:13.102: Command-5:Action-6 executing on Thread 'pool-5-thread-1' executing for 524ms
2020-12-31 09:43:13.555: Command-4:Action-6 executing on Thread 'pool-3-thread-1' executing for 673ms
2020-12-31 09:43:13.634: Command-5:Action-7 executing on Thread 'pool-5-thread-1' executing for 890ms
2020-12-31 09:43:14.23: Command-4:Action-7 executing on Thread 'pool-3-thread-1' executing for 147ms
2020-12-31 09:43:14.527: Command-5:Action-8 executing on Thread 'pool-5-thread-1' executing for 538ms
31.12.2020
  • Чтобы узнать мнение о работающем коде, посетите codereview.stackexchange.com. 31.12.2020
  • Спасибо! Не знал, что такое существует :D 31.12.2020
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..