Nano Hash - криптовалюты, майнинг, программирование

Задержка RxJava2 (rx.functions.Func1) не выводит элементы по порядку

Я использую эту подпись delay:

public final <U> Observable<T> delay(Func1<? super T,? extends Observable<U>> itemDelay)

javadoc

Я использую Func1, чтобы вернуть Observable, который действует как своего рода «триггер». Моя цель - отложить элементы до завершения внешней асинхронной операции. После завершения этой операции я хочу передать все элементы, которые были отложены, и все будущие элементы по порядку.

Вот пример кода, который показывает, что я пытаюсь сделать:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;

public class Example {

    private ReplaySubject<Object> delayTrigger = ReplaySubject.create(); // (1)

    public void main() {
        System.out.println("============ MAIN ============");
        SourceThread sourceThread = new SourceThread();
        sourceThread.start();

        sourceThread.stream
                .compose(doOnFirst(integer -> startAsyncOperation())) // (2)
                .delay(integer -> delayTrigger) // (3)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe((Integer integer)
                        -> System.out.println("onNext: " + integer));
    }

    private void startAsyncOperation() {
        System.out.println(">>>>>>> long async operation started");
        SomeOtherThread someOtherThread = new SomeOtherThread();
        someOtherThread.start();
    }

    private void onAsyncOperationComplete() {
        System.out.println("<<<<<<< long async operation completed");
        delayTrigger.onNext(new Object()); // (4)
    }

    /**
     * From https://stackoverflow.com/a/32366794
     */
    private <T> ObservableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return observableTransformer -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return observableTransformer.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }

    /**
     * Some thread to simulate a some time delayed source.
     * This is not really part of the problem,
     * we just need a time delayed source on another thread
     */
    private final class SourceThread extends Thread {
        private ReplaySubject<Integer> stream = ReplaySubject.create();

        @Override
        public void run() {
            super.run();
            for (int i = 0; i < 100; i++) {
                stream.onNext(i);
                System.out.println("Source emits item: " + i);
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private final class SomeOtherThread extends Thread {
        @Override
        public void run() {
            super.run();
            try {
                Thread.sleep(1000);
                onAsyncOperationComplete();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

В (1) я создаю ReplaySubject, который будет действовать как мой триггер, в (2) я запускаю асинхронную операцию, в (3) я задерживаюсь, пока триггер что-то не испускает; наконец, в пункте (4) я что-то помещаю в поток триггера, когда асинхронная операция завершается.

По большей части это работает нормально, кроме тех случаев, когда асинхронная операция завершается, поток, возвращаемый из delay, выходит из строя.

I/System.out: Source emits item: 46
I/System.out: Source emits item: 47
I/System.out: <<<<<<< long async operation completed
I/System.out: Source emits item: 48
I/System.out: onNext: 0
I/System.out: onNext: 48 <---- problem here!!!
I/System.out: onNext: 1
I/System.out: onNext: 2
I/System.out: onNext: 3

Элемент 48 передается из delay перед элементами 1 - 47. Пункт 49 также будет передан не по порядку. Это будет продолжаться до тех пор, пока элементы 1-47 не будут отправлены, затем поток очистится. Но есть большой раздел не заказанных товаров. Как я могу это исправить? Правильно ли я использую delay? Это ошибка задержки?

Для справки это всего лишь образец. В моей «настоящей» проблеме у меня нет возможности изменить порядок выдаваемых элементов, если они выходят из строя (т.е. они плохо пронумерованы).

08.09.2017

Ответы:


1

Этот оператор delay не имеет никаких гарантий упорядочения, потому что внутренний источник для элемента №1 может сигнализировать позже, чем другой внутренний источник для элемента №2 в целом. Любой асинхронный сигнал может нарушить порядок, даже если он исходит из такого источника, как завершенный ReplaySubject.

Я предполагаю, что вы хотите предварительно выбрать основной источник, но не пропустить его до внешнего сигнала, верно? В этом случае вы можете использовать concatArrayEager, где завершение первого источника запускает выброс предварительно загруженного второго источника:

PublishSubject<Integer> delayer = PublishSubject.create();

Observable.concatArrayEager(
    delayer,
    sourceThread.stream
)

// somewhere async
delayer.onComplete();
08.09.2017
  • Я полагал, что внутреннее упорядочение не сохранялось, но основывалось на том, когда внутренние источники сигнализировали об излучении. Спасибо за предложенное изменение. 11.09.2017
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..