Мне интересно, есть ли способ составить существующие операторы для выполнения противоположного switchMap()
.
switchMap()
будет преследовать последнее полученное излучение и отменит все Observable
, которые выполнялись ранее. Допустим, я перевернул его и хочу игнорировать все выбросы, поступающие к оператору xxxMap()
, пока он занят первым полученным выбросом. Он будет игнорировать выбросы до тех пор, пока не закончит излучать текущий Observable
внутри себя. Затем он будет обрабатывать следующую полученную эмиссию.
Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(i -> System.out.println("Source Emitted Value: " + i))
.ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation()))
.subscribe(i -> System.out.println("Subscriber received Value: " + i));
Есть ли способ сделать это? В приведенном выше примере, если intensiveProcess()
будет длиться три секунды, ignoreWhileBusyMap()
обработает 0
, но, скорее всего, проигнорирует выбросы 1
и 2
, исходящие от interval()
. Затем он обработает 3
, но, скорее всего, проигнорирует 4
и 5
и так далее...
Semaphore
, но надеялся использовать чисто реактивную композицию с существующими операторами. Я полагаю, я мог бы обернуть все это вTransformer
. 11.05.2016inline fun <T,R> Observable<T>.ignoreWhileBusyMap(crossinline mapper: (T) -> Observable<R>) = onBackpressureDrop() .flatMap({ mapper.invoke(it)} ,1)
11.05.2016