Nano Hash - криптовалюты, майнинг, программирование

Почему мои задачи в очереди не обрабатываются всеми потоками пула потоков?

Я написал простую программу на Java, которая помогает играть с потоками и пулами потоков для выполнения определенных задач. В моей программе есть объекты класса TheObject, которые должны иметь какую-то обработку (в данном случае просто задержку сна и распечатку их полей).

TheObject объекты помещаются в очередь, из которой WorkerThreads извлекают их и обрабатывают.

Я создал класс Manager, который инициализирует TheObjects и WorkerThread. Я получаю странный результат, когда запускаю его; Вместо того, чтобы потоки отключали работу, все обрабатывает один поток! Почему это происходит? Как заставить потоки разделять рабочую нагрузку?

Это результат:

--------------------------------------------
alfred   a   0
Thread-0
--------------------------------------------
bob   b   1
Thread-0
--------------------------------------------
carl   c   2
Thread-0
--------------------------------------------
dave   d   3
Thread-0
--------------------------------------------
earl   e   4
Thread-0
--------------------------------------------
fred   f   5
Thread-0
--------------------------------------------
greg   g   6
Thread-0
--------------------------------------------
harry   h   7
Thread-0
--------------------------------------------
izzie   i   8
Thread-0
--------------------------------------------
jim   j   9
Thread-0
--------------------------------------------
kyle   k   0
Thread-0
--------------------------------------------
Larry   L   1
Thread-1
--------------------------------------------
Michael   M   2
Thread-1
--------------------------------------------
Ned   N   3
Thread-0
--------------------------------------------
Olaf   O   4
Thread-0
--------------------------------------------
Peter   P   5
Thread-0
--------------------------------------------
Quincy   Q   6
Thread-0
--------------------------------------------
Raphael   R   7
Thread-0
--------------------------------------------
Sam   S   8
Thread-0
--------------------------------------------
Trixie   T   9
Thread-0

Код:

Классы:

  • Управляющий делами
  • Объект
  • TheObjectQueue
  • WorkerThread

Менеджер

public class Manager {

    public static void main(String[] args) throws InterruptedException {
        TheObjectQueue queue = new TheObjectQueue();
        //Create Objects
        int numberOfInitialObjects = 10;
        String[] arrayOfNames = {"alfred", "bob", "carl", "dave", "earl", "fred", "greg", "harry", "izzie", "jim"};
        //char[] arrayOfChars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'};
        //int[] arrayOfNums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        for (int i = 0; i < numberOfInitialObjects; i++){
                TheObject anObject = new TheObject(arrayOfNames[i], arrayOfNames[i].charAt(0), i);
                queue.addToQueue(anObject);

        }

        int numberOfThreads = 2;
        for (int i = 0; i < numberOfThreads; i++){
            WorkerThread workerThread = new WorkerThread(queue);
            workerThread.start();
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


        String[] arrayOfNames2 = {"kyle", "Larry", "Michael", "Ned", "Olaf", "Peter", "Quincy", "Raphael", "Sam", "Trixie"};
        for (int i = 0; i < numberOfInitialObjects; i++){
            TheObject anObject = new TheObject(arrayOfNames2[i], arrayOfNames2[i].charAt(0), i);
            queue.addToQueue(anObject);
        }

    }
}

TheObject

public class TheObject {
    private String someName;
    private char someChar;
    private int someNum;

    public TheObject(String someName, char someChar, int someNum) {
        super();
        this.someName = someName;
        this.someChar = someChar;
        this.someNum = someNum;
    }

    public String getSomeName() {
        return someName;
    }
    public char getSomeChar() {
        return someChar;
    }
    public int getSomeNum() {
        return someNum;
    }
}

TheObjectQueue

import java.util.LinkedList;

public class TheObjectQueue {

    private LinkedList<TheObject> objectQueue = null;

    public TheObjectQueue(){
        objectQueue = new LinkedList<TheObject>();
    }
    public LinkedList<TheObject> getQueue(){
        return objectQueue;
    }
    public void addToQueue(TheObject obj){
        synchronized (this) {
            objectQueue.addFirst(obj);
            this.notify();
        }

    }
    public TheObject removeFromQueue(){
        synchronized (this) {
            TheObject obj = objectQueue.removeLast();
            return obj;
        } 
    }
    public int getSize(){
        return objectQueue.size();
    }
    public boolean isEmpty(){
        return objectQueue.isEmpty();
    }
}

WorkerThread

import java.util.Random;

public class WorkerThread extends Thread{
    private TheObjectQueue queue;

    public WorkerThread(TheObjectQueue queue){
        this.queue = queue;
    }
    public void process(TheObject obj){
        //Stall for a random amount of time
        Random random = new Random();
        int randomInt = random.nextInt(3)*1000;
        try {
            Thread.sleep(randomInt);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        //Begin Printing
        System.out.println("--------------------------------------------");
        System.out.print(obj.getSomeName());
        System.out.print("   ");
        System.out.print(obj.getSomeChar());
        System.out.print("   ");
        System.out.println(obj.getSomeNum());
        System.out.println(this.getName());
    }

    @Override
    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                TheObject objToDealWith = queue.removeFromQueue();
                process(objToDealWith);
                super.run();
            }
        }
    }
}

  • Почему бы не использовать ExecutorService? 22.06.2014
  • @Jarrod Я не понимаю, почему это дубликат. Даже если существуют инструменты, позволяющие сделать это иначе, вопрос не в их использовании. Я заново открываюсь. 22.06.2014
  • ‹S› Одной из возможных причин такого поведения является isEmpty() несинхронизация, поэтому все другие рабочие потоки, кроме thread-0, всегда видят очередь как пустую. ‹/S› Ой, нет, вы вызываете isEmpty блокировку блокировки на queue, поэтому isEmpty() должен работать как ожидал. 23.06.2014

Ответы:


1

Синхронизация по (общей) очереди в WorkerThread.run позволяет только одному потоку обрабатывать задачу за раз - эффект фактически представляет собой пул с одним рабочим! В этом случае поток 0 большую часть времени «выигрывает» при получении блокировки; синхронизированное получение блокировки - нет гарантированно будет честным.

Простое исправление - получить задачу из очереди, используя требуемую поточно-ориентированную конструкцию, а затем обработать задачу вне синхронизированного раздела. Это позволяет работникам одновременно обрабатывать задачи, которые считаются независимыми.

// e.g.
@Override
public void run() {
    while(true){
        TheObject objToDealWith; 
        synchronized (queue) {
            // Only synchronized on queue when fetching task
            objToDealWith = getTask(queue);
        }

        // Process task; independent of queue
        process(objToDealWith);
    }
}

Поскольку один из потоков будет занят обработкой задачи, в то время как другой получает (или получил) блокировку очереди, распределение работы будет «справедливым».

22.06.2014

2

Почему так сложно? Просто используйте ExecutorService, предоставляемый java. .

Если вы пишете это от руки, вы делаете это неправильно.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample {

    public static void main(String[] args) {
//      configure here how many threads and other properties, the queue here is actually build in.
        ExecutorService executor = Executors.newCachedThreadPool();
        String[] arrayOfNames = { "alfred", "bob", "carl", "dave", "earl",
                "fred", "greg", "harry", "izzie", "jim" };
        for (int i = 0; i < arrayOfNames.length; i++) {
            TheObject anObject = new TheObject(arrayOfNames[i], arrayOfNames[i].charAt(0), i);
            MyRunnable runnalbe = new MyRunnable(anObject);
            executor.execute(runnalbe);
        }
        executor.shutdown()
    }

    static class MyRunnable implements Runnable {

        final TheObject anObject;

        MyRunnable(TheObject theObject) {
            this.anObject = theObject;
        }

        @Override
        public void run() {
            //TODO do work with anObject
        }

    }

    static class TheObject {
        private String someName;
        private char someChar;
        private int someNum;

        public TheObject(String someName, char someChar, int someNum) {
            this.someName = someName;
            this.someChar = someChar;
            this.someNum = someNum;
        }

        public String getSomeName() {
            return someName;
        }

        public char getSomeChar() {
            return someChar;
        }

        public int getSomeNum() {
            return someNum;
        }
    }
}
22.06.2014
  • На какую часть вопроса вы отвечаете? 22.06.2014
  • В лучшем случае это должен быть комментарий. 22.06.2014
  • все. Изобретая очередь? Ни за что. Изобретая заново ExecutorService / ThreadPool? Нет. Изобретая колесо заново? Нет. Изобретая ThreadFactory заново? Нет. Мой ответ: используйте инструменты, предоставляемые java. = ›Если вы воспользуетесь этими классами, проблем не возникнет. 22.06.2014
  • Затем покажите нам, как вы могли бы использовать ExecutorService для решения проблем OP. Ответы только по ссылкам являются чрезвычайно плохими < / i>. 22.06.2014
  • @Zarathustra В соответствии с кодом и с включением ответа user2864740, не хватает ли чего-нибудь, что было бы включено при использовании ExecutorService? 22.06.2014
  • @Imray Я обновил свой ответ и добавил пример кода. 22.06.2014
  • Одной из важных причин описанного поведения OP является код в методе run() и способ использования TheObjectQueue. Вы не решили проблему. 22.06.2014
  • @Zarathustra Спасибо. Итак, я предполагаю, что когда вызывается executor.execute(runnalbe);, объект добавляется в какую-то структуру Queue ... и Executor заботится об обработке. Правильно? 22.06.2014
  • @Imray правильно, можно даже решить, что за очередь использовать, есть много фабричных методов из класса Executors. 22.06.2014
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..


    © 2024 nano-hash.ru, Nano Hash - криптовалюты, майнинг, программирование