Предпосылки: я хочу вызвать веб-службу внутри метода вызова OnSubscribe. Существует специальный класс Subscriber, который также является подписчиком этого Observable. (Следующий код является приближением того же)
Проблема: мы видим, что метод OnSubscribe.call вызывается дважды.
Мне не ясна связь между Observable.create (...), subscribe (...) и, наконец, преобразованием наблюдаемого в Blocking. Кажется, что строка ниже добавляет поведение и каким-то образом снова вызывает OnSubscribe.call. Вызов toBlocking, как я предполагаю, в конечном итоге должен быть вызван - непосредственно перед тем, как результат должен быть возвращен веб-сервером (сервлет / контроллер, которые не являются Rx по своей природе).
observable.toBlocking().forEach(i-> System.out.println(i));
Полный код ниже
public static void main(String args[]){
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
observable.toBlocking().forEach(i-> System.out.println(i));
}
Вывод
In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4
Отвечая на собственный вопрос: я неправильно обрабатывал результаты с помощью forEach. Он сам по себе является подписчиком и, следовательно, снова вызывает метод call. Правильный способ делать то, что я делал, - использовать CountDownLatch.
Это также проясняет мои другие сомнения - в контексте того, что мы, наконец, ждем завершения работы непосредственно перед возвратом ответа в контроллере / сервлете, будет означать использование защелки с тайм-аутом.
Код ниже.
public static void main(String args[]) throws Exception {
CountDownLatch latch = new CountDownLatch(4);
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
latch.countDown();
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
latch.await();
System.out.println(" Wait for Latch Over");
}