Грунтовка; в моей кодовой базе мне часто приходится работать с объединенными блоками памяти. Это сделано из соображений производительности, чтобы уменьшить сборку мусора (создание компонента движка видеоигры в реальном времени). Я справляюсь с этим, предоставляя типы как IDisposableValue, где вы можете получить доступ к T Value только до тех пор, пока оболочка не будет удалена. Вы располагаете оболочку, чтобы вернуть значение в пул для повторного использования.
Я создаю потоки обработки данных, которые работают с этими обернутыми значениями в ответ на события, происходящие с течением времени. Обычно это был бы идеальный кандидат для Observables / Reactive Extensions, за исключением того, что необходимость избавляться от оболочки по своей сути является формой изменчивости, чего вы не хотите при реактивности. Если один подписчик избавляется от оболочки, когда они заканчивают с ней, но второй наблюдатель все еще работает с ней, оболочка выдаст исключение.
Предполагаемая цель: чтобы каждый подписчик получил отдельную оболочку для исходного реального упакованного значения. Базовое значение будет удалено только после того, как каждый подписчик избавится от своей индивидуальной оболочки (вспомните RefCountDisposable). Таким образом, каждый подписчик может работать со значением столько, сколько ему нужно, и они означают, что это сделано путем утилизации. Когда все они завершатся, значение возвращается обратно в пул.
Единственная проблема в том, что я понятия не имею, как это правильно реализовать в RX. Является ли это подходящим способом справиться с моей ситуацией, и если да, то есть ли какие-либо указания о том, как на самом деле его реализовать?
Изменить 1 - грязное решение с использованием ISubject:
Я попытался заставить его работать, используя различные комбинации Observable.Select/Create/Defer, но не смог заставить Intended Goal выше работать при этом. Вместо этого мне пришлось обратиться к предметам, которых, как я знаю, избегают. Вот мой текущий код.
public class SharedDisposableValueSubject<T> : AbstractDisposable, ISubject<IDisposableValue<T>>
{
private readonly Subject<SharedDisposable> subject;
private readonly SubscriptionCounter<SharedDisposable> counter;
private readonly IObservable<IDisposableValue<T>> observable;
public SharedDisposableValueSubject()
{
this.subject = new Subject<SharedDisposable>();
this.counter = new SubscriptionCounter<SharedDisposable>(this.subject);
this.observable = this.counter.Source.Select(value => value.GetValue());
}
/// <inheritdoc />
public void OnCompleted() => this.subject.OnCompleted();
/// <inheritdoc />
public void OnError(Exception error) => this.subject.OnError(error);
/// <inheritdoc />
public void OnNext(IDisposableValue<T> value) =>
this.subject.OnNext(new SharedDisposable(value, this.counter.Count));
/// <inheritdoc />
public IDisposable Subscribe(IObserver<IDisposableValue<T>> observer) => this.observable.Subscribe(observer);
/// <inheritdoc />
protected override void ManagedDisposal() => this.subject.Dispose();
private class SharedDisposable
{
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value, int count)
{
Contracts.Requires.That(count >= 0);
this.value = value;
this.count = new AtomicInt(count);
if (count == 0)
{
this.value?.Dispose();
}
}
public IDisposableValue<T> GetValue() => new ValuePin(this);
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable parent;
public ValuePin(SharedDisposable parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
}
public class SubscriptionCounter<T>
{
private readonly AtomicInt count = new AtomicInt(0);
public SubscriptionCounter(IObservable<T> source)
{
Contracts.Requires.That(source != null);
this.Source = Observable.Create<T>(observer =>
{
this.count.Increment();
return new Subscription(source.Subscribe(observer), this.count);
});
}
public int Count => this.count.Read();
public IObservable<T> Source { get; }
private class Subscription : AbstractDisposable
{
private readonly IDisposable subscription;
private readonly AtomicInt count;
public Subscription(IDisposable subscription, AtomicInt count)
{
Contracts.Requires.That(subscription != null);
Contracts.Requires.That(count != null);
this.subscription = subscription;
this.count = count;
}
/// <inheritdoc />
protected override void ManagedDisposal()
{
this.subscription.Dispose();
this.count.Decrement();
}
}
}
public interface IDisposableValue<out T> : IDisposable
{
bool IsDisposed { get; }
T Value { get; }
}
AbstractDisposable - это просто реализация базового класса одноразового шаблона для типов, которые не поддерживают неуправляемые типы. Это гарантирует, что ManagedDisposal () вызывается только один раз при первом вызове Dispose (). AtomicInt - это оболочка над Interlocked для int, обеспечивающая потокобезопасные атомарные обновления для int.
Мой тестовый код, показывающий, как предполагается использовать SharedDisposableValueSubject;
public static class SharedDisposableValueSubjectTests
{
[Fact]
public static void NoSubcribersValueAutoDisposes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var sourceValue = new DisposableWrapper<int>(0);
sourceValue.IsDisposed.Should().BeFalse();
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSurcriber()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void ManySubcribers()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved1.Should().NotBeNull();
retrieved1.Value.Should().Be(testNumber);
retrieved1.IsDisposed.Should().BeFalse();
retrieved2.Should().NotBeNull();
retrieved2.Value.Should().Be(testNumber);
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing only 1 retrieved value does not yet dispose the source value
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
retrieved2.Value.Should().Be(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void DisposingManyTimesStillRequiresEachSubscriberToDispose()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved1 = null;
subject.Subscribe(value => retrieved1 = value);
IDisposableValue<int> retrieved2 = null;
subject.Subscribe(value => retrieved2 = value);
subject.OnNext(sourceValue);
// disposing only 1 retrieved value does not yet dispose the source value
// even though the retrieved value is disposed many times
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.Dispose();
retrieved1.IsDisposed.Should().BeTrue();
retrieved2.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing both retrieved values disposes the source value
retrieved2.Dispose();
retrieved2.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SingleSubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// source value auto disposes because no subscribers
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void SubcriberUnsubcribes()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
var subscription = subject.Subscribe(value => { });
subscription.Dispose();
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber);
retrieved.IsDisposed.Should().BeFalse();
sourceValue.IsDisposed.Should().BeFalse();
// disposing retrieved causes source to be disposed
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static async Task DelayedSubcriberAsync()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber = 1;
var sourceValue = new DisposableWrapper<int>(testNumber);
sourceValue.IsDisposed.Should().BeFalse();
// delay countdown event used just to ensure that the value isn't disposed until assertions checked
var delay = new AsyncCountdownEvent(1);
var disposed = new AsyncCountdownEvent(2);
subject.Delay(TimeSpan.FromSeconds(1)).Subscribe(async value =>
{
await delay.WaitAsync().DontMarshallContext();
value.Dispose();
disposed.Signal(1);
});
subject.Subscribe(value =>
{
value.Dispose();
disposed.Signal(1);
});
// value is not yet disposed
subject.OnNext(sourceValue);
sourceValue.IsDisposed.Should().BeFalse();
// wait for value to be disposed
delay.Signal(1);
await disposed.WaitAsync().DontMarshallContext();
sourceValue.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
[Fact]
public static void MultipleObservedValues()
{
using (var subject = new SharedDisposableValueSubject<int>())
{
var testNumber1 = 1;
var sourceValue1 = new DisposableWrapper<int>(testNumber1);
sourceValue1.IsDisposed.Should().BeFalse();
var testNumber2 = 2;
var sourceValue2 = new DisposableWrapper<int>(testNumber2);
sourceValue2.IsDisposed.Should().BeFalse();
IDisposableValue<int> retrieved = null;
subject.Subscribe(value => retrieved = value);
// first test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue1);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber1);
retrieved.IsDisposed.Should().BeFalse();
sourceValue1.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue1.IsDisposed.Should().BeTrue();
// second test value
// value retrieved from sequence but not disposed yet
subject.OnNext(sourceValue2);
retrieved.Should().NotBeNull();
retrieved.Value.Should().Be(testNumber2);
retrieved.IsDisposed.Should().BeFalse();
sourceValue2.IsDisposed.Should().BeFalse();
// disposing retrieved disposes the source value
retrieved.Dispose();
retrieved.IsDisposed.Should().BeTrue();
sourceValue2.IsDisposed.Should().BeTrue();
subject.OnCompleted();
}
}
}
Все это проходит, но я понимаю, что есть много вещей, которые вы можете сделать с наблюдаемым, поэтому могут быть варианты использования, которые я не рассматривал, которые могут нарушить эту реализацию. Если вы знаете о каких-либо проблемах, дайте мне знать. Также может быть случай, когда я пытаюсь заставить Rx делать то, для чего он изначально не предназначен.
Изменить 2 - Решение с использованием публикации:
Я использую Publish, чтобы обернуть одноразовые значения из исходного наблюдаемого объекта в SharedDisposable, гарантируя, что каждое исходное значение будет обернуто только один раз. Затем опубликованный наблюдаемый объект подсчитывается для подписчиков, и каждый подписчик получает отдельный ValuePin, который при удалении уменьшает счетчик для SharedDisposable. Когда счетчик SharedDisposable достигает 0, он удаляет исходное значение.
Я попытался не производить подсчет подписок и вместо этого иметь его так, чтобы каждый раз, когда выдается ValuePin, он увеличивал счетчик, но я не мог найти способ гарантировать, что он создаст ValuePins для каждого подписчика, прежде чем позволить подписчикам избавиться от них. . Это привело к тому, что подписчик 1 получил свой пин, счетчик переходит от 0 до 1, затем удаляет этот пин до того, как подписчик 2 получит свой пин, счет идет от 1 до 0, инициируя удаление исходного значения, а затем подписчик 2 должен получить булавка, но уже слишком поздно.
public static IObservable<IDisposableValue<T>> ShareDisposable<T>(this IObservable<IDisposableValue<T>> source)
{
Contracts.Requires.That(source != null);
var published = source.Select(value => new SharedDisposable<T>(value)).Publish();
var counter = new SubscriptionCounter<SharedDisposable<T>>(published);
published.Connect();
return counter.CountedSource.Select(value => value.GetValue(counter.Count));
}
private class SharedDisposable<T>
{
private const int Uninitialized = -1;
private readonly IDisposableValue<T> value;
private readonly AtomicInt count;
public SharedDisposable(IDisposableValue<T> value)
{
this.value = value;
this.count = new AtomicInt(Uninitialized);
}
public IDisposableValue<T> GetValue(int subscriberCount)
{
Contracts.Requires.That(subscriberCount >= 0);
this.count.CompareExchange(subscriberCount, Uninitialized);
return new ValuePin(this);
}
private class ValuePin : AbstractDisposable, IDisposableValue<T>
{
private readonly SharedDisposable<T> parent;
public ValuePin(SharedDisposable<T> parent)
{
Contracts.Requires.That(parent != null);
this.parent = parent;
}
/// <inheritdoc />
public T Value => this.parent.value != null ? this.parent.value.Value : default(T);
/// <inheritdoc />
protected override void ManagedDisposal()
{
if (this.parent.count.Decrement() == 0)
{
this.parent.value?.Dispose();
}
}
}
}
Это, безусловно, кажется лучше, поскольку мне не нужно каким-либо образом использовать Subject, хотя подсчет подписчиков кажется грязным. Особенно из-за того, что мне нужно, чтобы счетчик не инициализировался, пока не будет выдан первый ValuePin. И для ясности, я пытаюсь справиться с удалением значений, созданных наблюдаемым объектом, который будет использоваться 0 для многих подписчиков, а не с удалением подключений к самому наблюдаемому объекту, поэтому я не использую RefCount вместо Connect .
Subject<T>
, значит, вы, вероятно, делаете что-то не так. Если вы реализуетеISubject<T>
, то почти наверняка будете. Вам действительно стоит опубликовать пример проблемы, с которой вы столкнулись. 11.09.2016