Я запускаю свою собственную (простую) реализацию HLS, и для этого мне нужно загрузить список видеофайлов MPEG Transport Stream:
На данный момент у меня есть это, и это работает:
public async Task<IObservable<(int Current, int Total, Stream Stream)>> FetchVideoSegments()
{
var playlist = await GetPlaylist();
var total = playlist.PlaylistItems.Count();
return playlist.PlaylistItems.ToObservable()
//=> Get the video file path, relative to the current playlist URI
.Select(item => item.Uri)
//=> Convert the relative URI to an absolute one
.Select(MakeRelativeAbsoluteUrl)
//=> Download the video transport file, sequentially
.Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
.Concat()
//=> Return the progress info tuple with the video file stream
.Select((stream, index) => (index, total, stream));
}
Подписчик уведомляется по одному потоку за раз в правильном порядке.
Часто при загрузке файлов параллелизм 2-3 часто является идеальным. Я хотел бы добавить это в свой наблюдаемый конвейер, но я не вижу никакого удобного способа сделать это, сохраняя порядок вставки исходных URI в испускаемых потоках.
Возьми это:
return playlist.PlaylistItems.ToObservable()
//=> Get the video file path, relative to the current playlist URI
.Select(item => item.Uri)
//=> Convert the relative URI to an absolute one
.Select(MakeRelativeAbsoluteUrl)
//=> Download the video transport file
.Select(uri => Observable.Defer(() => DownloadVideoSegment(uri)))
//=> Limit concurrent requests to a reasonable number
.Merge(FetchSegmentsMaxConcurrency)
//=> Return the progress info tuple with the video file stream
.Select((stream, index) => (index, total, stream));
Обратите внимание, что .Concat()
заменено на .Merge(maxConcurrency)
.
Это наивное решение, и, конечно же, оно не работает: видеопотоки генерируются в недетерминированном порядке.
Каков канонический способ добиться этого? Должен ли я поддерживать другое значение «индекса», которое я храню в наблюдаемом конвейере?