Nano Hash - криптовалюты, майнинг, программирование

rxjs 5 publishReplay refCount

Я не могу понять, как работает publishReplay().refCount().

Например (https://jsfiddle.net/7o3a45L1/):

var source = Rx.Observable.create(observer =>  {
  console.log("call"); 
  // expensive http request
  observer.next(5);
}).publishReplay().refCount();

subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
console.log(""); 

subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)});
subscription2.unsubscribe();
console.log(""); 

subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)});
subscription3.unsubscribe();
console.log(""); 

subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)});
subscription4.unsubscribe();

дает следующий результат:

вызов наблюдателяA: 5

наблюдательB: 5 вызвать наблюдателяB: 5

ObserverC: 5 ObserverC: 5 вызов ObserverC: 5

наблюдательD: 5 наблюдательD: 5 наблюдательD: 5 вызов наблюдателяD: 5

1) Почему наблюдатели B, C и D вызываются несколько раз?

2) Почему «звонок» печатается на каждой строке, а не в начале строки?

Кроме того, если я вызываю publishReplay(1).refCount(), он вызывает ObserverB, C и D по 2 раза каждый.

Я ожидаю, что каждый новый наблюдатель получит значение 5 ровно один раз, а «вызов» будет напечатан только один раз.

12.02.2017

Ответы:


1

publishReplay(x).refCount() в сочетании выполняет следующие действия:

  • Он создает ReplaySubject, который воспроизводит до x выбросов. Если x не определен, он воспроизводит весь поток.
  • Это делает эту ReplaySubject многоадресную передачу совместимой с помощью оператора refCount (). Это приводит к тому, что одновременные подписки получают одинаковые выбросы.

В вашем примере есть несколько проблем, которые затрудняют совместную работу всего этого. См. Следующий измененный фрагмент:

var state = 5
var realSource = Rx.Observable.create(observer =>  {
  console.log("creating expensive HTTP-based emission"); 
  observer.next(state++);
//  observer.complete();
  
  return () => {
    console.log('unsubscribing from source')
  }
});


var source = Rx.Observable.of('')
  .do(() => console.log('stream subscribed'))
  .ignoreElements()
  .concat(realSource)
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount()
;
    
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
    
subscription3 = source.subscribe(v => console.log('observerC: ' + v));
subscription3.unsubscribe();
    
subscription4 = source.subscribe(v => console.log('observerD: ' + v));
 
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

При запуске этого фрагмента мы ясно видим, что он не генерирует повторяющиеся значения для Observer D, а фактически создает новые выбросы для каждой подписки. Почему?

Подписка на каждую подписку отменяется до того, как состоится следующая подписка. Это фактически заставляет refCount уменьшаться до нуля, многоадресная рассылка не выполняется.

Проблема заключается в том, что поток realSource не завершается. Поскольку мы не осуществляем многоадресную рассылку, следующий подписчик получает новый экземпляр realSource через ReplaySubject, а новые выбросы добавляются к предыдущим уже отправленным сообщениям.

Таким образом, чтобы избавить ваш поток от многократного вызова дорогостоящего HTTP-запроса, вам нужно завершить поток, чтобы publishReplay знал, что ему не нужно повторно подписываться.

13.02.2017

2

Обычно: refCount означает, что поток является горячим / общим, пока есть хотя бы 1 подписчик, однако он сбрасывается / холодный, когда подписчиков нет.

Это означает, что если вы хотите быть абсолютно уверены в том, что ничего не выполняется более одного раза, вы не должны использовать refCount(), а просто connect поток, чтобы активировать его.

В качестве дополнительного примечания: если вы добавите observer.complete() после observer.next(5);, вы также получите ожидаемый результат.


Примечание: Вам действительно нужно создать здесь свой собственный Obervable? В 95% случаев существующих операторов достаточно для данного варианта использования.

13.02.2017

3

Это происходит потому, что вы используете publishReplay(). Он внутренне создает экземпляр ReplaySubject, в котором хранятся все проходящие значения.

Поскольку вы используете Observable.create, где вы испускаете одно значение, то каждый раз, когда вы вызываете source.subscribe(...), вы добавляете одно значение в буфер в ReplaySubject.

Вы не получаете call в начале каждой строки, потому что это ReplaySubject, который сначала излучает свой буфер, когда вы подписываетесь, а затем он подписывается на свой источник:

Подробнее о реализации см .:

То же самое и при использовании publishReplay(1). Сначала он излучает буферизованный элемент из ReplaySubject, а затем еще один элемент из observer.next(5);

13.02.2017
  • Единственный относящийся к делу ответ 01.07.2017
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..