Nano Hash - криптовалюты, майнинг, программирование

Как обрабатывать IObservable из IDisposableValue

Грунтовка; в моей кодовой базе мне часто приходится работать с объединенными блоками памяти. Это сделано из соображений производительности, чтобы уменьшить сборку мусора (создание компонента движка видеоигры в реальном времени). Я справляюсь с этим, предоставляя типы как IDisposableValue, где вы можете получить доступ к T Value только до тех пор, пока оболочка не будет удалена. Вы располагаете оболочку, чтобы вернуть значение в пул для повторного использования.

Я создаю потоки обработки данных, которые работают с этими обернутыми значениями в ответ на события, происходящие с течением времени. Обычно это был бы идеальный кандидат для Observables / Reactive Extensions, за исключением того, что необходимость избавляться от оболочки по своей сути является формой изменчивости, чего вы не хотите при реактивности. Если один подписчик избавляется от оболочки, когда они заканчивают с ней, но второй наблюдатель все еще работает с ней, оболочка выдаст исключение.

Предполагаемая цель: чтобы каждый подписчик получил отдельную оболочку для исходного реального упакованного значения. Базовое значение будет удалено только после того, как каждый подписчик избавится от своей индивидуальной оболочки (вспомните RefCountDisposable). Таким образом, каждый подписчик может работать со значением столько, сколько ему нужно, и они означают, что это сделано путем утилизации. Когда все они завершатся, значение возвращается обратно в пул.

Единственная проблема в том, что я понятия не имею, как это правильно реализовать в RX. Является ли это подходящим способом справиться с моей ситуацией, и если да, то есть ли какие-либо указания о том, как на самом деле его реализовать?

Изменить 1 - грязное решение с использованием ISubject:

Я попытался заставить его работать, используя различные комбинации Observable.Select/Create/Defer, но не смог заставить Intended Goal выше работать при этом. Вместо этого мне пришлось обратиться к предметам, которых, как я знаю, избегают. Вот мой текущий код.

public class SharedDisposableValueSubject<T> : AbstractDisposable, ISubject<IDisposableValue<T>>
{
    private readonly Subject<SharedDisposable> subject;

    private readonly SubscriptionCounter<SharedDisposable> counter;

    private readonly IObservable<IDisposableValue<T>> observable;

    public SharedDisposableValueSubject()
    {
        this.subject = new Subject<SharedDisposable>();
        this.counter = new SubscriptionCounter<SharedDisposable>(this.subject);
        this.observable = this.counter.Source.Select(value => value.GetValue());
    }

    /// <inheritdoc />
    public void OnCompleted() => this.subject.OnCompleted();

    /// <inheritdoc />
    public void OnError(Exception error) => this.subject.OnError(error);

    /// <inheritdoc />
    public void OnNext(IDisposableValue<T> value) =>
        this.subject.OnNext(new SharedDisposable(value, this.counter.Count));

    /// <inheritdoc />
    public IDisposable Subscribe(IObserver<IDisposableValue<T>> observer) => this.observable.Subscribe(observer);

    /// <inheritdoc />
    protected override void ManagedDisposal() => this.subject.Dispose();

    private class SharedDisposable
    {
        private readonly IDisposableValue<T> value;

        private readonly AtomicInt count;

        public SharedDisposable(IDisposableValue<T> value, int count)
        {
            Contracts.Requires.That(count >= 0);

            this.value = value;
            this.count = new AtomicInt(count);

            if (count == 0)
            {
                this.value?.Dispose();
            }
        }

        public IDisposableValue<T> GetValue() => new ValuePin(this);

        private class ValuePin : AbstractDisposable, IDisposableValue<T>
        {
            private readonly SharedDisposable parent;

            public ValuePin(SharedDisposable parent)
            {
                Contracts.Requires.That(parent != null);

                this.parent = parent;
            }

            /// <inheritdoc />
            public T Value => this.parent.value != null ? this.parent.value.Value : default(T);

            /// <inheritdoc />
            protected override void ManagedDisposal()
            {
                if (this.parent.count.Decrement() == 0)
                {
                    this.parent.value?.Dispose();
                }
            }
        }
    }
}

public class SubscriptionCounter<T>
{
    private readonly AtomicInt count = new AtomicInt(0);

    public SubscriptionCounter(IObservable<T> source)
    {
        Contracts.Requires.That(source != null);

        this.Source = Observable.Create<T>(observer =>
        {
            this.count.Increment();
            return new Subscription(source.Subscribe(observer), this.count);
        });
    }

    public int Count => this.count.Read();

    public IObservable<T> Source { get; }

    private class Subscription : AbstractDisposable
    {
        private readonly IDisposable subscription;

        private readonly AtomicInt count;

        public Subscription(IDisposable subscription, AtomicInt count)
        {
            Contracts.Requires.That(subscription != null);
            Contracts.Requires.That(count != null);

            this.subscription = subscription;
            this.count = count;
        }

        /// <inheritdoc />
        protected override void ManagedDisposal()
        {
            this.subscription.Dispose();
            this.count.Decrement();
        }
    }
}

public interface IDisposableValue<out T> : IDisposable
{
    bool IsDisposed { get; }

    T Value { get; }
}

AbstractDisposable - это просто реализация базового класса одноразового шаблона для типов, которые не поддерживают неуправляемые типы. Это гарантирует, что ManagedDisposal () вызывается только один раз при первом вызове Dispose (). AtomicInt - это оболочка над Interlocked для int, обеспечивающая потокобезопасные атомарные обновления для int.

Мой тестовый код, показывающий, как предполагается использовать SharedDisposableValueSubject;

public static class SharedDisposableValueSubjectTests
{
    [Fact]
    public static void NoSubcribersValueAutoDisposes()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var sourceValue = new DisposableWrapper<int>(0);
            sourceValue.IsDisposed.Should().BeFalse();

            subject.OnNext(sourceValue);
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static void SingleSurcriber()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            IDisposableValue<int> retrieved = null;
            subject.Subscribe(value => retrieved = value);

            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();

            // disposing retrieved disposes the source value
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static void ManySubcribers()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            IDisposableValue<int> retrieved1 = null;
            subject.Subscribe(value => retrieved1 = value);
            IDisposableValue<int> retrieved2 = null;
            subject.Subscribe(value => retrieved2 = value);

            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue);
            retrieved1.Should().NotBeNull();
            retrieved1.Value.Should().Be(testNumber);
            retrieved1.IsDisposed.Should().BeFalse();
            retrieved2.Should().NotBeNull();
            retrieved2.Value.Should().Be(testNumber);
            retrieved2.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();

            // disposing only 1 retrieved value does not yet dispose the source value
            retrieved1.Dispose();
            retrieved1.IsDisposed.Should().BeTrue();
            retrieved2.IsDisposed.Should().BeFalse();
            retrieved2.Value.Should().Be(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            // disposing both retrieved values disposes the source value
            retrieved2.Dispose();
            retrieved2.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static void DisposingManyTimesStillRequiresEachSubscriberToDispose()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            IDisposableValue<int> retrieved1 = null;
            subject.Subscribe(value => retrieved1 = value);
            IDisposableValue<int> retrieved2 = null;
            subject.Subscribe(value => retrieved2 = value);

            subject.OnNext(sourceValue);

            // disposing only 1 retrieved value does not yet dispose the source value
            // even though the retrieved value is disposed many times
            retrieved1.Dispose();
            retrieved1.Dispose();
            retrieved1.Dispose();
            retrieved1.IsDisposed.Should().BeTrue();
            retrieved2.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();

            // disposing both retrieved values disposes the source value
            retrieved2.Dispose();
            retrieved2.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static void SingleSubcriberUnsubcribes()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            var subscription = subject.Subscribe(value => { });
            subscription.Dispose();

            // source value auto disposes because no subscribers
            subject.OnNext(sourceValue);
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static void SubcriberUnsubcribes()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            IDisposableValue<int> retrieved = null;
            subject.Subscribe(value => retrieved = value);

            var subscription = subject.Subscribe(value => { });
            subscription.Dispose();

            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue.IsDisposed.Should().BeFalse();

            // disposing retrieved causes source to be disposed
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static async Task DelayedSubcriberAsync()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber = 1;
            var sourceValue = new DisposableWrapper<int>(testNumber);
            sourceValue.IsDisposed.Should().BeFalse();

            // delay countdown event used just to ensure that the value isn't disposed until assertions checked
            var delay = new AsyncCountdownEvent(1);
            var disposed = new AsyncCountdownEvent(2);

            subject.Delay(TimeSpan.FromSeconds(1)).Subscribe(async value =>
            {
                await delay.WaitAsync().DontMarshallContext();
                value.Dispose();
                disposed.Signal(1);
            });

            subject.Subscribe(value =>
            {
                value.Dispose();
                disposed.Signal(1);
            });

            // value is not yet disposed
            subject.OnNext(sourceValue);
            sourceValue.IsDisposed.Should().BeFalse();

            // wait for value to be disposed
            delay.Signal(1);
            await disposed.WaitAsync().DontMarshallContext();
            sourceValue.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }

    [Fact]
    public static void MultipleObservedValues()
    {
        using (var subject = new SharedDisposableValueSubject<int>())
        {
            var testNumber1 = 1;
            var sourceValue1 = new DisposableWrapper<int>(testNumber1);
            sourceValue1.IsDisposed.Should().BeFalse();

            var testNumber2 = 2;
            var sourceValue2 = new DisposableWrapper<int>(testNumber2);
            sourceValue2.IsDisposed.Should().BeFalse();

            IDisposableValue<int> retrieved = null;
            subject.Subscribe(value => retrieved = value);

            // first test value
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue1);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber1);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue1.IsDisposed.Should().BeFalse();

            // disposing retrieved disposes the source value
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue1.IsDisposed.Should().BeTrue();

            // second test value
            // value retrieved from sequence but not disposed yet
            subject.OnNext(sourceValue2);
            retrieved.Should().NotBeNull();
            retrieved.Value.Should().Be(testNumber2);
            retrieved.IsDisposed.Should().BeFalse();
            sourceValue2.IsDisposed.Should().BeFalse();

            // disposing retrieved disposes the source value
            retrieved.Dispose();
            retrieved.IsDisposed.Should().BeTrue();
            sourceValue2.IsDisposed.Should().BeTrue();

            subject.OnCompleted();
        }
    }
}

Все это проходит, но я понимаю, что есть много вещей, которые вы можете сделать с наблюдаемым, поэтому могут быть варианты использования, которые я не рассматривал, которые могут нарушить эту реализацию. Если вы знаете о каких-либо проблемах, дайте мне знать. Также может быть случай, когда я пытаюсь заставить Rx делать то, для чего он изначально не предназначен.

Изменить 2 - Решение с использованием публикации:

Я использую Publish, чтобы обернуть одноразовые значения из исходного наблюдаемого объекта в SharedDisposable, гарантируя, что каждое исходное значение будет обернуто только один раз. Затем опубликованный наблюдаемый объект подсчитывается для подписчиков, и каждый подписчик получает отдельный ValuePin, который при удалении уменьшает счетчик для SharedDisposable. Когда счетчик SharedDisposable достигает 0, он удаляет исходное значение.

Я попытался не производить подсчет подписок и вместо этого иметь его так, чтобы каждый раз, когда выдается ValuePin, он увеличивал счетчик, но я не мог найти способ гарантировать, что он создаст ValuePins для каждого подписчика, прежде чем позволить подписчикам избавиться от них. . Это привело к тому, что подписчик 1 получил свой пин, счетчик переходит от 0 до 1, затем удаляет этот пин до того, как подписчик 2 получит свой пин, счет идет от 1 до 0, инициируя удаление исходного значения, а затем подписчик 2 должен получить булавка, но уже слишком поздно.

public static IObservable<IDisposableValue<T>> ShareDisposable<T>(this IObservable<IDisposableValue<T>> source)
{
    Contracts.Requires.That(source != null);

    var published = source.Select(value => new SharedDisposable<T>(value)).Publish();
    var counter = new SubscriptionCounter<SharedDisposable<T>>(published);
    published.Connect();
    return counter.CountedSource.Select(value => value.GetValue(counter.Count));
}

private class SharedDisposable<T>
{
    private const int Uninitialized = -1;

    private readonly IDisposableValue<T> value;

    private readonly AtomicInt count;

    public SharedDisposable(IDisposableValue<T> value)
    {
        this.value = value;
        this.count = new AtomicInt(Uninitialized);
    }

    public IDisposableValue<T> GetValue(int subscriberCount)
    {
        Contracts.Requires.That(subscriberCount >= 0);

        this.count.CompareExchange(subscriberCount, Uninitialized);
        return new ValuePin(this);
    }

    private class ValuePin : AbstractDisposable, IDisposableValue<T>
    {
        private readonly SharedDisposable<T> parent;

        public ValuePin(SharedDisposable<T> parent)
        {
            Contracts.Requires.That(parent != null);

            this.parent = parent;
        }

        /// <inheritdoc />
        public T Value => this.parent.value != null ? this.parent.value.Value : default(T);

        /// <inheritdoc />
        protected override void ManagedDisposal()
        {
            if (this.parent.count.Decrement() == 0)
            {
                this.parent.value?.Dispose();
            }
        }
    }
}

Это, безусловно, кажется лучше, поскольку мне не нужно каким-либо образом использовать Subject, хотя подсчет подписчиков кажется грязным. Особенно из-за того, что мне нужно, чтобы счетчик не инициализировался, пока не будет выдан первый ValuePin. И для ясности, я пытаюсь справиться с удалением значений, созданных наблюдаемым объектом, который будет использоваться 0 для многих подписчиков, а не с удалением подключений к самому наблюдаемому объекту, поэтому я не использую RefCount вместо Connect .


  • Я не уверен, что Rx может вам так сильно помочь, хотя я не совсем понимаю, что именно вам нужно. Некоторые примеры кода / классов помогут проиллюстрировать вашу проблему. 09.09.2016
  • Похоже, вы неправильно понимаете, как работает Rx. Каждая подписка является отдельным экземпляром наблюдаемого конвейера исходного исходного кода. Две подписки на одно и то же наблюдаемое не зависят друг от друга. Можете ли вы показать на примере проблемы, с которой столкнулись? 11.09.2016
  • @Enigmativity - Вы абсолютно правы. Изначально я думал, что конвейер был выполнен только один раз, и каждому подписчику был дан результат. Когда я пытался сделать это правильным Rx-способом, я понял, что это повторное выполнение конвейера для каждой подписки, что мне очень противоречит интуиции. Если хорошо управляемые наблюдаемые объекты содержат неизменяемые данные и не имеют побочных эффектов, зачем нужно переоценивать конвейер для каждой подписки? Если выполняется какой-либо серьезный объем вычислений, это может отрицательно сказаться на производительности. 11.09.2016
  • @StevenAckerman - Вот почему существует набор операторов .Publish(...), чтобы можно было совместно использовать дорогостоящие вычисления. Но в целом операторы Rx работают очень быстро. Они замедляются только при введении параллелизма или при выполнении дорогостоящих вычислений. 12.09.2016
  • @StevenAckerman Не все подписчики получат одинаковые значения. Это фундаментальная концепция, разделяющая горячие и холодные наблюдаемые последовательности. См. introtorx.com/Content/v1.0.10621.0/. 12.09.2016
  • @Enigmativity - Я не понимал публикации, когда впервые наткнулся на нее, но это дает мне совершенно новый способ увидеть это. Спасибо, что указали на это. 12.09.2016
  • @LeeCampbell - Я освежу в памяти эпизоды «горячее против холода». Спасибо за совет. Ставки на большую часть того, что происходит в моих системах, были бы горячими последовательностями. 12.09.2016
  • Трудно узнать. Я все еще думаю, что это звучит именно для того, для чего предназначен Disruptor. Я думаю, что с Rx вы можете столкнуться с незаметными условиями гонки, которые в лучшем случае раздражают, но хуже, когда вы скрываете их за такими вещами, как Rx. Если вы можете гарантировать, что все ваши подписчики работают очень быстро и все они подписываются до начала производства событий, тогда Rx может вам подойти. Если какой-либо из ваших подписчиков действительно выполняет работу / ввод-вывод или может подписаться с опозданием, не имея значения, то я думаю, что Rx может быть ужасной идеей. 12.09.2016
  • @LeeCampbell - Если у вас есть хорошие материалы / ссылки по паттерну Disruptor, мы будем очень признательны. Мне трудно найти на нем много реального. Что касается Rx, подписки будут сделаны до того, как движок запустится, поэтому не следует пропустить никаких значений там, но будут некоторые подписчики, которым необходимо начать серьезную работу процессора и работу ввода-вывода. Я рассматриваю возможность изменения того, как я обрабатываю объединение / обработку данных с учетом этих ограничений. 12.09.2016
  • @Enigmativity - Я снова попытался использовать Publish. Надеюсь, это немного лучше. 13.09.2016
  • В github есть порты Disruptor, это похоже на хорошее начало github.com/disruptor-net/ Disruptor-net. Для документов это хорошее начало lmax-exchange.github.io/disruptor или старый добрый Фаулер martinfowler.com/articles/lmax.html 14.09.2016

Ответы:


1

Думаю, вы могли бы пересчитать одноразовые. Это потребовало бы, чтобы издатель инициировал подсчет ссылок, а затем каждый подписчик увеличивал и уменьшал счетчик. Для этого вы можете использовать RefCountDisposable. Я бы подумал о том, чтобы сделать это только для частного / внутреннего кода, иначе у вас может быть негерметичный потребитель, нарушающий вашу систему. Альтернативным решением для Rx может быть использование паттерна Disruptor.

10.09.2016
  • По сути, это то, чем я закончил. Мне пришлось самому реализовать ISubject ‹T›, чтобы правильно обернуть ресурс одним RefCountDisposable и выдать отдельные результаты RefCountDisposable.GetDisposable каждому подписчику. Еще мне пришлось сделать так, чтобы он подсчитывал подписчиков. К счастью, я избавил себя от проблем, связанных с правильной реализацией наблюдаемого контракта, внутренне используя обычный Subject ‹T›, чтобы делать большую часть работы за меня. По крайней мере, надеюсь, это правильно. Сдал свои тесты. 10.09.2016
  • Никогда раньше не слышал о паттерне Disruptor. Спасибо за совет. Это выглядит очень интересно и, возможно, станет достойным будущим обновлением для моего двигателя. 10.09.2016
  • @StevenAckerman - Если вы используете Subject<T>, значит, вы, вероятно, делаете что-то не так. Если вы реализуете ISubject<T>, то почти наверняка будете. Вам действительно стоит опубликовать пример проблемы, с которой вы столкнулись. 11.09.2016
  • @Enigmativity - Я знаю, что использование Subject, особенно его самостоятельная реализация, очень не рекомендуется. Я просто не мог найти другого способа заставить его работать после 2 дней попыток. Я опубликовал то, что придумал, выше. Надеюсь, это проясняет, чего я пытаюсь достичь. Дайте мне знать, если вы видите что-то не так. 11.09.2016
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..