Nano Hash - криптовалюты, майнинг, программирование

Rx: одновременная загрузка списка файлов, сохранение порядка FIFO

Я запускаю свою собственную (простую) реализацию HLS, и для этого мне нужно загрузить список видеофайлов MPEG Transport Stream:

На данный момент у меня есть это, и это работает:

public async Task<IObservable<(int Current, int Total, Stream Stream)>> FetchVideoSegments()
{
    var playlist = await GetPlaylist();

    var total = playlist.PlaylistItems.Count();

    return playlist.PlaylistItems.ToObservable()
        //=> Get the video file path, relative to the current playlist URI
        .Select(item => item.Uri)
        //=> Convert the relative URI to an absolute one
        .Select(MakeRelativeAbsoluteUrl)
        //=> Download the video transport file, sequentially
        .Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
        .Concat()
        //=> Return the progress info tuple with the video file stream
        .Select((stream, index) => (index, total, stream));
}

Подписчик уведомляется по одному потоку за раз в правильном порядке.

Часто при загрузке файлов параллелизм 2-3 часто является идеальным. Я хотел бы добавить это в свой наблюдаемый конвейер, но я не вижу никакого удобного способа сделать это, сохраняя порядок вставки исходных URI в испускаемых потоках.

Возьми это:

return playlist.PlaylistItems.ToObservable()
    //=> Get the video file path, relative to the current playlist URI
    .Select(item => item.Uri)
    //=> Convert the relative URI to an absolute one
    .Select(MakeRelativeAbsoluteUrl)
    //=> Download the video transport file
    .Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
    //=> Limit concurrent requests to a reasonable number
    .Merge(FetchSegmentsMaxConcurrency)
    //=> Return the progress info tuple with the video file stream
    .Select((stream, index) => (index, total, stream));

Обратите внимание, что .Concat() заменено на .Merge(maxConcurrency).

Это наивное решение, и, конечно же, оно не работает: видеопотоки генерируются в недетерминированном порядке.

Каков канонический способ добиться этого? Должен ли я поддерживать другое значение «индекса», которое я храню в наблюдаемом конвейере?


  • То, о чем вы просите, это чтобы наблюдаемое производило значения как можно скорее, если оно находится в том же порядке, что и источник? 19.01.2018
  • @Enigmativity Точно! 19.01.2018
  • В какой-то момент я написал именно этот оператор. Я посмотрю, смогу ли я найти его для вас. 27.01.2018
  • Разве это не должно использовать .Buffer или .Window в списке файлов для достижения параллелизма? 28.01.2018
  • Не знал этих операторов, спасибо! Я не буду вдаваться в подробности прямо сейчас: наконец, загрузка видеофайлов, подобных этому, больше не понадобится, и в настоящее время я рефакторинг кода, чтобы загрузить только большой двоичный объект (весь файл mp4) с одним HTTP-запросом. Менее интересно, но гораздо проще. Но мне все равно было бы очень интересно увидеть решение для этого, поскольку я, вероятно, снова столкнусь с этим вариантом использования, поэтому @anyone не стесняйтесь публиковать (проверенный) ответ, который я могу принять. 28.01.2018

Ответы:


1

Будет ли что-то вроде этого тем, что вы ищете? Конечно, замените char тем, что возвращает ваш DownloadVideoSegment.

var random = new Random();

Observable
    .Range(65, 26)
    .Select(char.ConvertFromUtf32)
    .Select((letter, index) =>
        Observable
            .FromAsync(() =>
                Task.Delay(random.Next(5000))
                    .ContinueWith(t => (letter, index))))
    .Merge()
    .Do(tuple => Console.WriteLine($"{tuple.letter}:{tuple.index}"))
    .Buffer(Observable.Never<Unit>())
    .Subscribe(tuples =>
        Console.WriteLine($"{string.Join(",", tuples.OrderBy(t => t.index).Select(t => t.letter))}"));
30.01.2018
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..