LibMedia: Keep data providers suspended until they are active

This allows us to avoid waiting on the condition variable for the error
handler to be set when an error occurs.
This commit is contained in:
Zaggy1024
2025-11-28 17:29:49 -06:00
committed by Gregory Bertilson
parent 8bfaae473d
commit 2bd541c70c
Notes: github-actions[bot] 2025-12-03 18:22:07 +00:00
6 changed files with 98 additions and 33 deletions

View File

@@ -173,6 +173,7 @@ NonnullRefPtr<DisplayingVideoSink> PlaybackManager::get_or_create_the_displaying
if (track_data.display == nullptr) { if (track_data.display == nullptr) {
track_data.display = MUST(Media::DisplayingVideoSink::try_create(m_time_provider)); track_data.display = MUST(Media::DisplayingVideoSink::try_create(m_time_provider));
track_data.display->set_provider(track, track_data.provider); track_data.display->set_provider(track, track_data.provider);
track_data.provider->start();
track_data.provider->seek(m_time_provider->current_time(), SeekMode::Accurate); track_data.provider->seek(m_time_provider->current_time(), SeekMode::Accurate);
} }
@@ -202,8 +203,10 @@ void PlaybackManager::enable_an_audio_track(Track const& track)
auto& track_data = get_audio_data_for_track(track); auto& track_data = get_audio_data_for_track(track);
auto had_provider = m_audio_sink->provider(track) != nullptr; auto had_provider = m_audio_sink->provider(track) != nullptr;
m_audio_sink->set_provider(track, track_data.provider); m_audio_sink->set_provider(track, track_data.provider);
if (!had_provider) if (!had_provider) {
track_data.provider->start();
track_data.provider->seek(current_time()); track_data.provider->seek(current_time());
}
} }
void PlaybackManager::disable_an_audio_track(Track const& track) void PlaybackManager::disable_an_audio_track(Track const& track)

View File

@@ -26,6 +26,7 @@ DecoderErrorOr<NonnullRefPtr<AudioDataProvider>> AudioDataProvider::try_create(N
auto provider = DECODER_TRY_ALLOC(try_make_ref_counted<AudioDataProvider>(thread_data)); auto provider = DECODER_TRY_ALLOC(try_make_ref_counted<AudioDataProvider>(thread_data));
auto thread = DECODER_TRY_ALLOC(Threading::Thread::try_create([thread_data]() -> int { auto thread = DECODER_TRY_ALLOC(Threading::Thread::try_create([thread_data]() -> int {
thread_data->wait_for_start();
while (!thread_data->should_thread_exit()) { while (!thread_data->should_thread_exit()) {
thread_data->handle_seek(); thread_data->handle_seek();
thread_data->push_data_and_decode_a_block(); thread_data->push_data_and_decode_a_block();
@@ -53,6 +54,11 @@ void AudioDataProvider::set_error_handler(ErrorHandler&& handler)
m_thread_data->set_error_handler(move(handler)); m_thread_data->set_error_handler(move(handler));
} }
void AudioDataProvider::start()
{
m_thread_data->start();
}
void AudioDataProvider::seek(AK::Duration timestamp, SeekCompletionHandler&& completion_handler) void AudioDataProvider::seek(AK::Duration timestamp, SeekCompletionHandler&& completion_handler)
{ {
m_thread_data->seek(timestamp, move(completion_handler)); m_thread_data->seek(timestamp, move(completion_handler));
@@ -70,8 +76,21 @@ AudioDataProvider::ThreadData::~ThreadData() = default;
void AudioDataProvider::ThreadData::set_error_handler(ErrorHandler&& handler) void AudioDataProvider::ThreadData::set_error_handler(ErrorHandler&& handler)
{ {
auto locker = take_lock();
m_error_handler = move(handler); m_error_handler = move(handler);
}
void AudioDataProvider::ThreadData::start()
{
auto locker = take_lock();
VERIFY(m_requested_state == RequestedState::None);
m_requested_state = RequestedState::Running;
wake();
}
void AudioDataProvider::ThreadData::exit()
{
auto locker = take_lock();
m_requested_state = RequestedState::Exit;
wake(); wake();
} }
@@ -85,12 +104,6 @@ AudioBlock AudioDataProvider::retrieve_block()
return result; return result;
} }
void AudioDataProvider::ThreadData::exit()
{
m_exit = true;
wake();
}
void AudioDataProvider::ThreadData::seek(AK::Duration timestamp, SeekCompletionHandler&& completion_handler) void AudioDataProvider::ThreadData::seek(AK::Duration timestamp, SeekCompletionHandler&& completion_handler)
{ {
auto locker = take_lock(); auto locker = take_lock();
@@ -100,9 +113,17 @@ void AudioDataProvider::ThreadData::seek(AK::Duration timestamp, SeekCompletionH
wake(); wake();
} }
void AudioDataProvider::ThreadData::wait_for_start()
{
auto locker = take_lock();
while (m_requested_state == RequestedState::None)
m_wait_condition.wait();
}
bool AudioDataProvider::ThreadData::should_thread_exit() const bool AudioDataProvider::ThreadData::should_thread_exit() const
{ {
return m_exit; auto locker = take_lock();
return m_requested_state == RequestedState::Exit;
} }
void AudioDataProvider::ThreadData::flush_decoder() void AudioDataProvider::ThreadData::flush_decoder()
@@ -159,7 +180,8 @@ bool AudioDataProvider::ThreadData::handle_seek()
process_seek_on_main_thread(seek_id, process_seek_on_main_thread(seek_id,
[self = NonnullRefPtr(*this), error = move(error)] mutable { [self = NonnullRefPtr(*this), error = move(error)] mutable {
self->m_error_handler(move(error)); if (self->m_error_handler)
self->m_error_handler(move(error));
self->m_seek_completion_handler = nullptr; self->m_seek_completion_handler = nullptr;
}); });
}; };
@@ -249,10 +271,9 @@ void AudioDataProvider::ThreadData::push_data_and_decode_a_block()
{ {
auto locker = take_lock(); auto locker = take_lock();
m_is_in_error_state = true; m_is_in_error_state = true;
while (!m_error_handler)
m_wait_condition.wait();
m_main_thread_event_loop.deferred_invoke([self = NonnullRefPtr(*this), error = move(error)] mutable { m_main_thread_event_loop.deferred_invoke([self = NonnullRefPtr(*this), error = move(error)] mutable {
self->m_error_handler(move(error)); if (self->m_error_handler)
self->m_error_handler(move(error));
}); });
} }

View File

@@ -40,6 +40,8 @@ public:
void set_error_handler(ErrorHandler&&); void set_error_handler(ErrorHandler&&);
void start();
AudioBlock retrieve_block(); AudioBlock retrieve_block();
void seek(AK::Duration timestamp, SeekCompletionHandler&& = nullptr); void seek(AK::Duration timestamp, SeekCompletionHandler&& = nullptr);
@@ -52,6 +54,10 @@ private:
void set_error_handler(ErrorHandler&&); void set_error_handler(ErrorHandler&&);
void start();
void exit();
void wait_for_start();
bool should_thread_exit() const; bool should_thread_exit() const;
void flush_decoder(); void flush_decoder();
DecoderErrorOr<void> retrieve_next_block(AudioBlock&); DecoderErrorOr<void> retrieve_next_block(AudioBlock&);
@@ -61,23 +67,26 @@ private:
void resolve_seek(u32 seek_id); void resolve_seek(u32 seek_id);
void push_data_and_decode_a_block(); void push_data_and_decode_a_block();
void exit();
void set_stopped(bool);
bool is_stopped() const;
void seek(AK::Duration timestamp, SeekCompletionHandler&&); void seek(AK::Duration timestamp, SeekCompletionHandler&&);
[[nodiscard]] Threading::MutexLocker take_lock() { return Threading::MutexLocker(m_mutex); } [[nodiscard]] Threading::MutexLocker take_lock() const { return Threading::MutexLocker(m_mutex); }
void wake() { m_wait_condition.broadcast(); } void wake() const { m_wait_condition.broadcast(); }
AudioDecoder const& decoder() const { return *m_decoder; } AudioDecoder const& decoder() const { return *m_decoder; }
AudioQueue& queue() { return m_queue; } AudioQueue& queue() { return m_queue; }
private: private:
enum class RequestedState : u8 {
None,
Running,
Exit,
};
Core::EventLoop& m_main_thread_event_loop; Core::EventLoop& m_main_thread_event_loop;
Threading::Mutex m_mutex; mutable Threading::Mutex m_mutex;
Threading::ConditionVariable m_wait_condition { m_mutex }; mutable Threading::ConditionVariable m_wait_condition { m_mutex };
Atomic<bool> m_exit { false }; RequestedState m_requested_state { RequestedState::None };
NonnullRefPtr<MutexedDemuxer> m_demuxer; NonnullRefPtr<MutexedDemuxer> m_demuxer;
Track m_track; Track m_track;

View File

@@ -27,6 +27,7 @@ DecoderErrorOr<NonnullRefPtr<VideoDataProvider>> VideoDataProvider::try_create(N
auto provider = DECODER_TRY_ALLOC(try_make_ref_counted<VideoDataProvider>(thread_data)); auto provider = DECODER_TRY_ALLOC(try_make_ref_counted<VideoDataProvider>(thread_data));
auto thread = DECODER_TRY_ALLOC(Threading::Thread::try_create([thread_data]() -> int { auto thread = DECODER_TRY_ALLOC(Threading::Thread::try_create([thread_data]() -> int {
thread_data->wait_for_start();
while (!thread_data->should_thread_exit()) { while (!thread_data->should_thread_exit()) {
thread_data->handle_seek(); thread_data->handle_seek();
thread_data->push_data_and_decode_some_frames(); thread_data->push_data_and_decode_some_frames();
@@ -60,6 +61,11 @@ void VideoDataProvider::set_error_handler(ErrorHandler&& handler)
m_thread_data->set_error_handler(move(handler)); m_thread_data->set_error_handler(move(handler));
} }
void VideoDataProvider::start()
{
m_thread_data->start();
}
TimedImage VideoDataProvider::retrieve_frame() TimedImage VideoDataProvider::retrieve_frame()
{ {
auto locker = m_thread_data->take_lock(); auto locker = m_thread_data->take_lock();
@@ -88,14 +94,21 @@ VideoDataProvider::ThreadData::~ThreadData() = default;
void VideoDataProvider::ThreadData::set_error_handler(ErrorHandler&& handler) void VideoDataProvider::ThreadData::set_error_handler(ErrorHandler&& handler)
{ {
auto locker = take_lock();
m_error_handler = move(handler); m_error_handler = move(handler);
}
void VideoDataProvider::ThreadData::start()
{
auto locker = take_lock();
VERIFY(m_requested_state == RequestedState::None);
m_requested_state = RequestedState::Running;
wake(); wake();
} }
void VideoDataProvider::ThreadData::exit() void VideoDataProvider::ThreadData::exit()
{ {
m_exit = true; auto locker = take_lock();
m_requested_state = RequestedState::Exit;
wake(); wake();
} }
@@ -119,9 +132,17 @@ void VideoDataProvider::ThreadData::seek(AK::Duration timestamp, SeekMode seek_m
wake(); wake();
} }
void VideoDataProvider::ThreadData::wait_for_start()
{
auto locker = take_lock();
while (m_requested_state == RequestedState::None)
m_wait_condition.wait();
}
bool VideoDataProvider::ThreadData::should_thread_exit() const bool VideoDataProvider::ThreadData::should_thread_exit() const
{ {
return m_exit; auto locker = take_lock();
return m_requested_state == RequestedState::Exit;
} }
void VideoDataProvider::ThreadData::set_cicp_values(VideoFrame& frame) void VideoDataProvider::ThreadData::set_cicp_values(VideoFrame& frame)
@@ -199,7 +220,8 @@ bool VideoDataProvider::ThreadData::handle_seek()
} }
process_seek_on_main_thread(seek_id, process_seek_on_main_thread(seek_id,
[self = NonnullRefPtr(*this), error = move(error)] mutable { [self = NonnullRefPtr(*this), error = move(error)] mutable {
self->m_error_handler(move(error)); if (self->m_error_handler)
self->m_error_handler(move(error));
self->m_seek_completion_handler = nullptr; self->m_seek_completion_handler = nullptr;
}); });
}; };
@@ -344,10 +366,9 @@ void VideoDataProvider::ThreadData::push_data_and_decode_some_frames()
{ {
auto locker = take_lock(); auto locker = take_lock();
m_is_in_error_state = true; m_is_in_error_state = true;
while (!m_error_handler)
m_wait_condition.wait();
m_main_thread_event_loop.deferred_invoke([self = NonnullRefPtr(*this), error = move(error)] mutable { m_main_thread_event_loop.deferred_invoke([self = NonnullRefPtr(*this), error = move(error)] mutable {
self->m_error_handler(move(error)); if (self->m_error_handler)
self->m_error_handler(move(error));
}); });
} }

View File

@@ -43,6 +43,8 @@ public:
void set_error_handler(ErrorHandler&&); void set_error_handler(ErrorHandler&&);
void start();
TimedImage retrieve_frame(); TimedImage retrieve_frame();
void seek(AK::Duration timestamp, SeekMode, SeekCompletionHandler&& = nullptr); void seek(AK::Duration timestamp, SeekMode, SeekCompletionHandler&& = nullptr);
@@ -55,6 +57,7 @@ private:
void set_error_handler(ErrorHandler&&); void set_error_handler(ErrorHandler&&);
void start();
void exit(); void exit();
ImageQueue& queue(); ImageQueue& queue();
@@ -62,6 +65,7 @@ private:
void seek(AK::Duration timestamp, SeekMode, SeekCompletionHandler&&); void seek(AK::Duration timestamp, SeekMode, SeekCompletionHandler&&);
void wait_for_start();
bool should_thread_exit() const; bool should_thread_exit() const;
void set_cicp_values(VideoFrame&); void set_cicp_values(VideoFrame&);
void queue_frame(TimedImage&&); void queue_frame(TimedImage&&);
@@ -71,15 +75,21 @@ private:
void resolve_seek(u32 seek_id, AK::Duration const& timestamp); void resolve_seek(u32 seek_id, AK::Duration const& timestamp);
void push_data_and_decode_some_frames(); void push_data_and_decode_some_frames();
[[nodiscard]] Threading::MutexLocker take_lock() { return Threading::MutexLocker(m_mutex); } [[nodiscard]] Threading::MutexLocker take_lock() const { return Threading::MutexLocker(m_mutex); }
void wake() { m_wait_condition.broadcast(); } void wake() const { m_wait_condition.broadcast(); }
private: private:
enum class RequestedState : u8 {
None,
Running,
Exit,
};
Core::EventLoop& m_main_thread_event_loop; Core::EventLoop& m_main_thread_event_loop;
Threading::Mutex m_mutex; mutable Threading::Mutex m_mutex;
Threading::ConditionVariable m_wait_condition { m_mutex }; mutable Threading::ConditionVariable m_wait_condition { m_mutex };
Atomic<bool> m_exit { false }; RequestedState m_requested_state { RequestedState::None };
NonnullRefPtr<MutexedDemuxer> m_demuxer; NonnullRefPtr<MutexedDemuxer> m_demuxer;
Track m_track; Track m_track;

View File

@@ -86,6 +86,7 @@ static inline void decode_audio(StringView path, u32 sample_rate, u8 channel_cou
} }
FAIL("An error occurred while decoding."); FAIL("An error occurred while decoding.");
}); });
provider->start();
auto time_limit = AK::Duration::from_seconds(1); auto time_limit = AK::Duration::from_seconds(1);
auto start_time = MonotonicTime::now_coarse(); auto start_time = MonotonicTime::now_coarse();