Nano Hash - криптовалюты, майнинг, программирование

Метод OnSubscribe.call вызывается дважды

Предпосылки: я хочу вызвать веб-службу внутри метода вызова OnSubscribe. Существует специальный класс Subscriber, который также является подписчиком этого Observable. (Следующий код является приближением того же)

Проблема: мы видим, что метод OnSubscribe.call вызывается дважды.

Мне не ясна связь между Observable.create (...), subscribe (...) и, наконец, преобразованием наблюдаемого в Blocking. Кажется, что строка ниже добавляет поведение и каким-то образом снова вызывает OnSubscribe.call. Вызов toBlocking, как я предполагаю, в конечном итоге должен быть вызван - непосредственно перед тем, как результат должен быть возвращен веб-сервером (сервлет / контроллер, которые не являются Rx по своей природе).

observable.toBlocking().forEach(i-> System.out.println(i));

Полный код ниже

 public static void main(String args[]){
   Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
       @Override
       public void call(Subscriber<? super Integer> observer) {
           System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
           try {
               if (!observer.isUnsubscribed()) {
                   for (int i = 1; i < 5; i++) {
                      observer.onNext(i);
                   }
                   observer.onCompleted();
               }
           } catch (Exception e) {
               observer.onError(e);
           }
       }
});

 observable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
        System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
        System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
        System.out.println("Sequence complete.");
    }
});
    observable.toBlocking().forEach(i-> System.out.println(i));
}

Вывод

In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4

Отвечая на собственный вопрос: я неправильно обрабатывал результаты с помощью forEach. Он сам по себе является подписчиком и, следовательно, снова вызывает метод call. Правильный способ делать то, что я делал, - использовать CountDownLatch.

Это также проясняет мои другие сомнения - в контексте того, что мы, наконец, ждем завершения работы непосредственно перед возвратом ответа в контроллере / сервлете, будет означать использование защелки с тайм-аутом.

Код ниже.

public static void main(String args[]) throws Exception {
    CountDownLatch latch = new CountDownLatch(4);
    Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
            try {
                if (!observer.isUnsubscribed()) {
                    for (int i = 1; i < 5; i++) {
                        observer.onNext(i);
                        latch.countDown();

                    }
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        }
    });

    observable.subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

    latch.await();
    System.out.println(" Wait for Latch Over");
}
04.01.2016

  • Возможно, я ошибаюсь, но ваше решение как бы побеждает цель RxJava: если вы собираетесь использовать его блокирующим способом, вы могли бы с тем же успехом уйти с помощью простого цикла for и не нести накладные расходы на RxJava. 04.01.2016
  • @LordRaydenMK - это единственный способ ответить на ваш собственный вопрос (или через комментарии), если он является новым для StackOverflow. 05.01.2016
  • @AndroidEx - Извините, я тоже новичок в RX и разбираюсь в этом. Но предположим, что у нас есть гигантское монолитное приложение, работающее более десяти лет, и я полагаю, мы будем постепенно менять его для работы с RX. Кроме того, поскольку все еще не является реактивным в мире веб-приложений (приложений и веб-серверов), у нас все еще будет родительский поток, который в конечном итоге будет ожидать завершения множества действий, прежде чем ответить. 05.01.2016

Ответы:


1

toBlocking можно упростить до создания подписки с защелкой, отсюда и две ваши подписки.

Но я думаю, что вы были правы, использовав toBlocking().forEach(), и должны оставить эту часть как решение.

Проблема тогда в вашей второй подписке, сделанной путем звонка на subscribe. Есть ли в этом реальная необходимость subscribe? Если речь идет только о ведении журнала, а не о фактическом изменении потока данных, вы можете вместо этого использовать doOnNext, doOnError и doOnCompleted.

Если ваш вариант использования немного сложнее, чем отражено в вашем вопросе, и вам действительно нужно дождаться двух «параллельных» асинхронных обработок, тогда вам может потребоваться присоединиться и подождать, используя CountDownLatch (или аналогичный) ... Имейте в виду что если вам нужен параллелизм, RxJava в этом отношении не зависит, и вам нужно управлять им, используя Schedulers, _10 _ / _ 11_.

04.01.2016
  • Только регистрировал данные. Знание «Операторы» теперь помогает. 05.01.2016

  • 2

    Это может быть не так уж важно, поскольку вы уже решили проблему в своем случае, но создание наблюдаемых с использованием необработанного OnSubscribe в целом подвержено ошибкам. Лучше использовать один из существующих вспомогательных классов: SyncOnSubscribe / AsyncOnSubscribe (начиная с RxJava 1.0.15) или AbstractOnSubscribe (до RxJava v1.1.0). Они предоставляют вам готовую структуру, которая управляет состоянием наблюдаемого и противодавлением, оставляя вам (в основном) заниматься собственной логикой.

    04.01.2016
    Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..