Nano Hash - криптовалюты, майнинг, программирование

RxJava - напротив оператора switchMap ()?

Мне интересно, есть ли способ составить существующие операторы для выполнения противоположного switchMap().

switchMap() будет преследовать последнее полученное излучение и отменит все Observable, которые выполнялись ранее. Допустим, я перевернул его и хочу игнорировать все выбросы, поступающие к оператору xxxMap(), пока он занят первым полученным выбросом. Он будет игнорировать выбросы до тех пор, пока не закончит излучать текущий Observable внутри себя. Затем он будет обрабатывать следующую полученную эмиссию.

Observable.interval(1, TimeUnit.SECONDS)
        .doOnNext(i -> System.out.println("Source Emitted Value: " + i))
        .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
        .subscribe(i -> System.out.println("Subscriber received Value: " + i));

Есть ли способ сделать это? В приведенном выше примере, если intensiveProcess() будет длиться три секунды, ignoreWhileBusyMap() обработает 0, но, скорее всего, проигнорирует выбросы 1 и 2, исходящие от interval(). Затем он обработает 3, но, скорее всего, проигнорирует 4 и 5 и так далее...


Ответы:


1

Конечно, заблокируйте обработку значения логическим значением, которое устанавливается после завершения обработки:

AtomicBoolean gate = new AtomicBoolean(true);

Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
    if (gate.get()) {
        gate.set(false);

        return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
                .doAfterTerminate(() -> gate.set(true));
    } else {
        return Observable.empty();
    }
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

Изменить

Альтернатива:

Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

Вы можете изменить onBackpressureDrop на onBackpressureLatest, чтобы сразу продолжить с последним значением.

11.05.2016
  • Удивительно, я сделал что-то подобное с Semaphore, но надеялся использовать чисто реактивную композицию с существующими операторами. Я полагаю, я мог бы обернуть все это в Transformer. 11.05.2016
  • Используйте отложенный преобразователь, чтобы избежать совместного использования шлюза несколькими конечными подписчиками. 11.05.2016
  • Только что понял, что ваше решение не блочное, как мое, поэтому я переключусь на это. Спасибо! 11.05.2016
  • Ха-ха, да, я буду иметь это в виду на этот раз :) Больше не совершу эту ошибку. 11.05.2016
  • спасибо за ваш вариант. Мне удалось скомпоновать его в функцию расширения для Kotlin: inline fun <T,R> Observable<T>.ignoreWhileBusyMap(crossinline mapper: (T) -> Observable<R>) = onBackpressureDrop() .flatMap({ mapper.invoke(it)} ,1) 11.05.2016

  • 2

    Я знаю, что это старый поток, но в настоящее время есть оператор RxJs, который делает именно это.

    Оператор exhaustMap.

    Согласно документам:

    ExhaustMap проецирует каждое исходное значение в Observable, которое объединяется с выходным Observable, только если завершено предыдущее спроецированное Observable.

    Документы пример:

    import { fromEvent, interval } from 'rxjs';
    import { exhaustMap, take } from 'rxjs/operators';
    
    const clicks = fromEvent(document, 'click');
    const result = clicks.pipe(
      exhaustMap(ev => interval(1000).pipe(take(5)))
    );
    result.subscribe(x => console.log(x));
    
    28.11.2019
  • В настоящее время это лучший ответ, поскольку ExhaustMap не существовало, когда исходный ответ был принят. 30.04.2020
  • ExhaustMap по-прежнему не существует в RxJava, о чем изначально и спрашивали. Я так разочарован, что RxJava не имеет этого, исходя из RxJs. 09.11.2020

  • 3

    Чтобы ответить в стиле Jeopardy: что такое concatMap?

    concatMap подпишется на первого Observable и не подпишется на последующие Observable, пока предыдущий Observable не позвонит onComplete().

    В этом отношении это «противоположность» switchMap, которая охотно отписывается от предыдущих Observable, когда появляется новый.

    concatMap хочет услышать все, что может сказать каждый Observable, в то время как switchMap — социальная бабочка, и уходит, как только становится доступным другой Observable.

    15.07.2016
  • Не совсем так, если вы внимательно прочитаете вопрос, то увидите, что я придерживаюсь поведения, которое имеет мало общего с concatMap или его предполагаемой противоположностью. Следуя вашей аналогии, я искал оператора xxxMap, который сосредоточился бы на разговоре с первым наблюдателем, с которым он столкнулся, и сказал бы всем последующим наблюдаемым не сейчас, я занят разговором с этим парнем. Только когда его разговор будет завершен, он позволит другому Наблюдаемому взаимодействовать с ним. 15.07.2016
  • Это то, что делает concatMap. 16.07.2016
  • Я считаю, что concatMap() совпадает с flatMap(), но не чередуется. Это гарантирует, что все выбросы в конечном итоге будут испущены, даже если он поставит их в очередь. Но оператор, которого я просил, просто игнорирует последующие выбросы, пока он занят. 17.07.2016
  • Ах, хорошо, под игнорированием выбросов я думал, что вы имеете в виду игнорировать выбросы на данный момент, но, как указано в последнем абзаце вашего вопроса: эти выбросы следует исключить. В таком случае мне нравится решение, предложенное akarnokd. 17.07.2016
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..