From 2015fc97dc3938bceb90feace193c012f9bb2e9d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 16 Jun 2015 09:03:30 +0200 Subject: [PATCH] shell-recorder-src: rework queue handling Use our own locking and queue instead of async_queue. Implement unlock and unlock_stop to make the create function return FLUSHING. This is important to be able to pause the pipeline after some error occured in the pipeline. Implement start/stop to clear the queue and its state. --- src/shell-recorder-src.c | 139 +++++++++++++++++++++++++++++++-------- 1 file changed, 110 insertions(+), 29 deletions(-) diff --git a/src/shell-recorder-src.c b/src/shell-recorder-src.c index b2fb41b2d..58b836cd4 100644 --- a/src/shell-recorder-src.c +++ b/src/shell-recorder-src.c @@ -11,12 +11,15 @@ struct _ShellRecorderSrc { GstPushSrc parent; - GMutex mutex_data; - GMutex *mutex; + GMutex mutex; GstCaps *caps; - GAsyncQueue *queue; - gboolean closed; + GMutex queue_lock; + GCond queue_cond; + GQueue *queue; + + gboolean eos; + gboolean flushing; guint memory_used; guint memory_used_update_idle; }; @@ -32,9 +35,6 @@ enum { PROP_MEMORY_USED }; -/* Special marker value once the source is closed */ -#define RECORDER_QUEUE_END ((GstBuffer *)1) - #define shell_recorder_src_parent_class parent_class G_DEFINE_TYPE(ShellRecorderSrc, shell_recorder_src, GST_TYPE_PUSH_SRC); @@ -44,9 +44,10 @@ shell_recorder_src_init (ShellRecorderSrc *src) gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); gst_base_src_set_live (GST_BASE_SRC (src), TRUE); - src->queue = g_async_queue_new (); - src->mutex = &src->mutex_data; - g_mutex_init (src->mutex); + src->queue = g_queue_new (); + g_mutex_init (&src->mutex); + g_mutex_init (&src->queue_lock); + g_cond_init (&src->queue_cond); } static gboolean @@ -54,9 +55,9 @@ shell_recorder_src_memory_used_update_idle (gpointer data) { ShellRecorderSrc *src = data; - g_mutex_lock (src->mutex); + g_mutex_lock (&src->mutex); src->memory_used_update_idle = 0; - g_mutex_unlock (src->mutex); + g_mutex_unlock (&src->mutex); g_object_notify (G_OBJECT (src), "memory-used"); @@ -70,14 +71,14 @@ static void shell_recorder_src_update_memory_used (ShellRecorderSrc *src, int delta) { - g_mutex_lock (src->mutex); + g_mutex_lock (&src->mutex); src->memory_used += delta; if (src->memory_used_update_idle == 0) { src->memory_used_update_idle = g_idle_add (shell_recorder_src_memory_used_update_idle, src); g_source_set_name_by_id (src->memory_used_update_idle, "[gnome-shell] shell_recorder_src_memory_used_update_idle"); } - g_mutex_unlock (src->mutex); + g_mutex_unlock (&src->mutex); } /* _negotiate() is called when we have to decide on a format. We @@ -93,6 +94,62 @@ shell_recorder_src_negotiate (GstBaseSrc * base_src) return result; } +static gboolean +shell_recorder_src_unlock (GstBaseSrc * base_src) +{ + ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src); + + g_mutex_lock (&src->queue_lock); + src->flushing = TRUE; + g_cond_signal (&src->queue_cond); + g_mutex_unlock (&src->queue_lock); + + return TRUE; +} + +static gboolean +shell_recorder_src_unlock_stop (GstBaseSrc * base_src) +{ + ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src); + + g_mutex_lock (&src->queue_lock); + src->flushing = FALSE; + g_cond_signal (&src->queue_cond); + g_mutex_unlock (&src->queue_lock); + + return TRUE; +} + +static gboolean +shell_recorder_src_start (GstBaseSrc * base_src) +{ + ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src); + + g_mutex_lock (&src->queue_lock); + src->flushing = FALSE; + src->eos = FALSE; + g_cond_signal (&src->queue_cond); + g_mutex_unlock (&src->queue_lock); + + return TRUE; +} + +static gboolean +shell_recorder_src_stop (GstBaseSrc * base_src) +{ + ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src); + + g_mutex_lock (&src->queue_lock); + src->flushing = TRUE; + src->eos = FALSE; + g_queue_foreach (src->queue, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (src->queue); + g_cond_signal (&src->queue_cond); + g_mutex_unlock (&src->queue_lock); + + return TRUE; +} + static gboolean shell_recorder_src_send_event (GstElement * element, GstEvent * event) { @@ -123,17 +180,29 @@ shell_recorder_src_create (GstPushSrc *push_src, ShellRecorderSrc *src = SHELL_RECORDER_SRC (push_src); GstBuffer *buffer; - if (src->closed) - return GST_FLOW_EOS; + g_mutex_lock (&src->queue_lock); + while (TRUE) { + /* int the flushing state we just return FLUSHING */ + if (src->flushing) { + g_mutex_unlock (&src->queue_lock); + return GST_FLOW_FLUSHING; + } - buffer = g_async_queue_pop (src->queue); + buffer = g_queue_pop_head (src->queue); - if (buffer == RECORDER_QUEUE_END) - { - /* Returning UNEXPECTED here will cause a EOS message to be sent */ - src->closed = TRUE; + /* we have a buffer, exit the loop to handle it */ + if (buffer != NULL) + break; + + /* no buffer, check EOS */ + if (src->eos) { + g_mutex_unlock (&src->queue_lock); return GST_FLOW_EOS; } + /* wait for something to happen and try again */ + g_cond_wait (&src->queue_cond, &src->queue_lock); + } + g_mutex_unlock (&src->queue_lock); shell_recorder_src_update_memory_used (src, - (int)(gst_buffer_get_size(buffer) / 1024)); @@ -176,9 +245,11 @@ shell_recorder_src_finalize (GObject *object) g_source_remove (src->memory_used_update_idle); shell_recorder_src_set_caps (src, NULL); - g_async_queue_unref (src->queue); + g_queue_free_full (src->queue, (GDestroyNotify) gst_buffer_unref); - g_mutex_clear (src->mutex); + g_mutex_clear (&src->mutex); + g_mutex_clear (&src->queue_lock); + g_cond_clear (&src->queue_cond); G_OBJECT_CLASS (shell_recorder_src_parent_class)->finalize (object); } @@ -216,9 +287,9 @@ shell_recorder_src_get_property (GObject *object, gst_value_set_caps (value, src->caps); break; case PROP_MEMORY_USED: - g_mutex_lock (src->mutex); + g_mutex_lock (&src->mutex); g_value_set_uint (value, src->memory_used); - g_mutex_unlock (src->mutex); + g_mutex_unlock (&src->mutex); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -270,6 +341,10 @@ shell_recorder_src_class_init (ShellRecorderSrcClass *klass) element_class->send_event = shell_recorder_src_send_event; base_src_class->negotiate = shell_recorder_src_negotiate; + base_src_class->unlock = shell_recorder_src_unlock; + base_src_class->unlock_stop = shell_recorder_src_unlock_stop; + base_src_class->start = shell_recorder_src_start; + base_src_class->stop = shell_recorder_src_stop; push_src_class->create = shell_recorder_src_create; } @@ -292,7 +367,10 @@ shell_recorder_src_add_buffer (ShellRecorderSrc *src, shell_recorder_src_update_memory_used (src, (int)(gst_buffer_get_size(buffer) / 1024)); - g_async_queue_push (src->queue, gst_buffer_ref (buffer)); + g_mutex_lock (&src->queue_lock); + g_queue_push_tail (src->queue, gst_buffer_ref (buffer)); + g_cond_signal (&src->queue_cond); + g_mutex_unlock (&src->queue_lock); } /** @@ -305,10 +383,13 @@ void shell_recorder_src_close (ShellRecorderSrc *src) { /* We can't send a message to the source immediately or buffers that haven't - * been pushed yet will be discarded. Instead stick a marker onto our own - * queue to send an event once everything has been pushed. + * been pushed yet will be discarded. Instead mark ourselves EOS, which will + * make us send an event once everything has been pushed. */ - g_async_queue_push (src->queue, RECORDER_QUEUE_END); + g_mutex_lock (&src->queue_lock); + src->eos = TRUE; + g_cond_signal (&src->queue_cond); + g_mutex_unlock (&src->queue_lock); } static gboolean