shell-recorder-src: rework queue handling
Use our own locking and queue instead of async_queue. Implement unlock and unlock_stop to make the create function return FLUSHING. This is important to be able to pause the pipeline after some error occured in the pipeline. Implement start/stop to clear the queue and its state.
This commit is contained in:
parent
35889a0f7d
commit
2015fc97dc
@ -11,12 +11,15 @@ struct _ShellRecorderSrc
|
|||||||
{
|
{
|
||||||
GstPushSrc parent;
|
GstPushSrc parent;
|
||||||
|
|
||||||
GMutex mutex_data;
|
GMutex mutex;
|
||||||
GMutex *mutex;
|
|
||||||
|
|
||||||
GstCaps *caps;
|
GstCaps *caps;
|
||||||
GAsyncQueue *queue;
|
GMutex queue_lock;
|
||||||
gboolean closed;
|
GCond queue_cond;
|
||||||
|
GQueue *queue;
|
||||||
|
|
||||||
|
gboolean eos;
|
||||||
|
gboolean flushing;
|
||||||
guint memory_used;
|
guint memory_used;
|
||||||
guint memory_used_update_idle;
|
guint memory_used_update_idle;
|
||||||
};
|
};
|
||||||
@ -32,9 +35,6 @@ enum {
|
|||||||
PROP_MEMORY_USED
|
PROP_MEMORY_USED
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Special marker value once the source is closed */
|
|
||||||
#define RECORDER_QUEUE_END ((GstBuffer *)1)
|
|
||||||
|
|
||||||
#define shell_recorder_src_parent_class parent_class
|
#define shell_recorder_src_parent_class parent_class
|
||||||
G_DEFINE_TYPE(ShellRecorderSrc, shell_recorder_src, GST_TYPE_PUSH_SRC);
|
G_DEFINE_TYPE(ShellRecorderSrc, shell_recorder_src, GST_TYPE_PUSH_SRC);
|
||||||
|
|
||||||
@ -44,9 +44,10 @@ shell_recorder_src_init (ShellRecorderSrc *src)
|
|||||||
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
|
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
|
||||||
gst_base_src_set_live (GST_BASE_SRC (src), TRUE);
|
gst_base_src_set_live (GST_BASE_SRC (src), TRUE);
|
||||||
|
|
||||||
src->queue = g_async_queue_new ();
|
src->queue = g_queue_new ();
|
||||||
src->mutex = &src->mutex_data;
|
g_mutex_init (&src->mutex);
|
||||||
g_mutex_init (src->mutex);
|
g_mutex_init (&src->queue_lock);
|
||||||
|
g_cond_init (&src->queue_cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
@ -54,9 +55,9 @@ shell_recorder_src_memory_used_update_idle (gpointer data)
|
|||||||
{
|
{
|
||||||
ShellRecorderSrc *src = data;
|
ShellRecorderSrc *src = data;
|
||||||
|
|
||||||
g_mutex_lock (src->mutex);
|
g_mutex_lock (&src->mutex);
|
||||||
src->memory_used_update_idle = 0;
|
src->memory_used_update_idle = 0;
|
||||||
g_mutex_unlock (src->mutex);
|
g_mutex_unlock (&src->mutex);
|
||||||
|
|
||||||
g_object_notify (G_OBJECT (src), "memory-used");
|
g_object_notify (G_OBJECT (src), "memory-used");
|
||||||
|
|
||||||
@ -70,14 +71,14 @@ static void
|
|||||||
shell_recorder_src_update_memory_used (ShellRecorderSrc *src,
|
shell_recorder_src_update_memory_used (ShellRecorderSrc *src,
|
||||||
int delta)
|
int delta)
|
||||||
{
|
{
|
||||||
g_mutex_lock (src->mutex);
|
g_mutex_lock (&src->mutex);
|
||||||
src->memory_used += delta;
|
src->memory_used += delta;
|
||||||
if (src->memory_used_update_idle == 0)
|
if (src->memory_used_update_idle == 0)
|
||||||
{
|
{
|
||||||
src->memory_used_update_idle = g_idle_add (shell_recorder_src_memory_used_update_idle, src);
|
src->memory_used_update_idle = g_idle_add (shell_recorder_src_memory_used_update_idle, src);
|
||||||
g_source_set_name_by_id (src->memory_used_update_idle, "[gnome-shell] shell_recorder_src_memory_used_update_idle");
|
g_source_set_name_by_id (src->memory_used_update_idle, "[gnome-shell] shell_recorder_src_memory_used_update_idle");
|
||||||
}
|
}
|
||||||
g_mutex_unlock (src->mutex);
|
g_mutex_unlock (&src->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _negotiate() is called when we have to decide on a format. We
|
/* _negotiate() is called when we have to decide on a format. We
|
||||||
@ -93,6 +94,62 @@ shell_recorder_src_negotiate (GstBaseSrc * base_src)
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
shell_recorder_src_unlock (GstBaseSrc * base_src)
|
||||||
|
{
|
||||||
|
ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
|
||||||
|
|
||||||
|
g_mutex_lock (&src->queue_lock);
|
||||||
|
src->flushing = TRUE;
|
||||||
|
g_cond_signal (&src->queue_cond);
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
shell_recorder_src_unlock_stop (GstBaseSrc * base_src)
|
||||||
|
{
|
||||||
|
ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
|
||||||
|
|
||||||
|
g_mutex_lock (&src->queue_lock);
|
||||||
|
src->flushing = FALSE;
|
||||||
|
g_cond_signal (&src->queue_cond);
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
shell_recorder_src_start (GstBaseSrc * base_src)
|
||||||
|
{
|
||||||
|
ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
|
||||||
|
|
||||||
|
g_mutex_lock (&src->queue_lock);
|
||||||
|
src->flushing = FALSE;
|
||||||
|
src->eos = FALSE;
|
||||||
|
g_cond_signal (&src->queue_cond);
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
shell_recorder_src_stop (GstBaseSrc * base_src)
|
||||||
|
{
|
||||||
|
ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
|
||||||
|
|
||||||
|
g_mutex_lock (&src->queue_lock);
|
||||||
|
src->flushing = TRUE;
|
||||||
|
src->eos = FALSE;
|
||||||
|
g_queue_foreach (src->queue, (GFunc) gst_buffer_unref, NULL);
|
||||||
|
g_queue_clear (src->queue);
|
||||||
|
g_cond_signal (&src->queue_cond);
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
shell_recorder_src_send_event (GstElement * element, GstEvent * event)
|
shell_recorder_src_send_event (GstElement * element, GstEvent * event)
|
||||||
{
|
{
|
||||||
@ -123,17 +180,29 @@ shell_recorder_src_create (GstPushSrc *push_src,
|
|||||||
ShellRecorderSrc *src = SHELL_RECORDER_SRC (push_src);
|
ShellRecorderSrc *src = SHELL_RECORDER_SRC (push_src);
|
||||||
GstBuffer *buffer;
|
GstBuffer *buffer;
|
||||||
|
|
||||||
if (src->closed)
|
g_mutex_lock (&src->queue_lock);
|
||||||
return GST_FLOW_EOS;
|
while (TRUE) {
|
||||||
|
/* int the flushing state we just return FLUSHING */
|
||||||
|
if (src->flushing) {
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
|
return GST_FLOW_FLUSHING;
|
||||||
|
}
|
||||||
|
|
||||||
buffer = g_async_queue_pop (src->queue);
|
buffer = g_queue_pop_head (src->queue);
|
||||||
|
|
||||||
if (buffer == RECORDER_QUEUE_END)
|
/* we have a buffer, exit the loop to handle it */
|
||||||
{
|
if (buffer != NULL)
|
||||||
/* Returning UNEXPECTED here will cause a EOS message to be sent */
|
break;
|
||||||
src->closed = TRUE;
|
|
||||||
|
/* no buffer, check EOS */
|
||||||
|
if (src->eos) {
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
return GST_FLOW_EOS;
|
return GST_FLOW_EOS;
|
||||||
}
|
}
|
||||||
|
/* wait for something to happen and try again */
|
||||||
|
g_cond_wait (&src->queue_cond, &src->queue_lock);
|
||||||
|
}
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
|
|
||||||
shell_recorder_src_update_memory_used (src,
|
shell_recorder_src_update_memory_used (src,
|
||||||
- (int)(gst_buffer_get_size(buffer) / 1024));
|
- (int)(gst_buffer_get_size(buffer) / 1024));
|
||||||
@ -176,9 +245,11 @@ shell_recorder_src_finalize (GObject *object)
|
|||||||
g_source_remove (src->memory_used_update_idle);
|
g_source_remove (src->memory_used_update_idle);
|
||||||
|
|
||||||
shell_recorder_src_set_caps (src, NULL);
|
shell_recorder_src_set_caps (src, NULL);
|
||||||
g_async_queue_unref (src->queue);
|
g_queue_free_full (src->queue, (GDestroyNotify) gst_buffer_unref);
|
||||||
|
|
||||||
g_mutex_clear (src->mutex);
|
g_mutex_clear (&src->mutex);
|
||||||
|
g_mutex_clear (&src->queue_lock);
|
||||||
|
g_cond_clear (&src->queue_cond);
|
||||||
|
|
||||||
G_OBJECT_CLASS (shell_recorder_src_parent_class)->finalize (object);
|
G_OBJECT_CLASS (shell_recorder_src_parent_class)->finalize (object);
|
||||||
}
|
}
|
||||||
@ -216,9 +287,9 @@ shell_recorder_src_get_property (GObject *object,
|
|||||||
gst_value_set_caps (value, src->caps);
|
gst_value_set_caps (value, src->caps);
|
||||||
break;
|
break;
|
||||||
case PROP_MEMORY_USED:
|
case PROP_MEMORY_USED:
|
||||||
g_mutex_lock (src->mutex);
|
g_mutex_lock (&src->mutex);
|
||||||
g_value_set_uint (value, src->memory_used);
|
g_value_set_uint (value, src->memory_used);
|
||||||
g_mutex_unlock (src->mutex);
|
g_mutex_unlock (&src->mutex);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||||
@ -270,6 +341,10 @@ shell_recorder_src_class_init (ShellRecorderSrcClass *klass)
|
|||||||
element_class->send_event = shell_recorder_src_send_event;
|
element_class->send_event = shell_recorder_src_send_event;
|
||||||
|
|
||||||
base_src_class->negotiate = shell_recorder_src_negotiate;
|
base_src_class->negotiate = shell_recorder_src_negotiate;
|
||||||
|
base_src_class->unlock = shell_recorder_src_unlock;
|
||||||
|
base_src_class->unlock_stop = shell_recorder_src_unlock_stop;
|
||||||
|
base_src_class->start = shell_recorder_src_start;
|
||||||
|
base_src_class->stop = shell_recorder_src_stop;
|
||||||
|
|
||||||
push_src_class->create = shell_recorder_src_create;
|
push_src_class->create = shell_recorder_src_create;
|
||||||
}
|
}
|
||||||
@ -292,7 +367,10 @@ shell_recorder_src_add_buffer (ShellRecorderSrc *src,
|
|||||||
shell_recorder_src_update_memory_used (src,
|
shell_recorder_src_update_memory_used (src,
|
||||||
(int)(gst_buffer_get_size(buffer) / 1024));
|
(int)(gst_buffer_get_size(buffer) / 1024));
|
||||||
|
|
||||||
g_async_queue_push (src->queue, gst_buffer_ref (buffer));
|
g_mutex_lock (&src->queue_lock);
|
||||||
|
g_queue_push_tail (src->queue, gst_buffer_ref (buffer));
|
||||||
|
g_cond_signal (&src->queue_cond);
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -305,10 +383,13 @@ void
|
|||||||
shell_recorder_src_close (ShellRecorderSrc *src)
|
shell_recorder_src_close (ShellRecorderSrc *src)
|
||||||
{
|
{
|
||||||
/* We can't send a message to the source immediately or buffers that haven't
|
/* We can't send a message to the source immediately or buffers that haven't
|
||||||
* been pushed yet will be discarded. Instead stick a marker onto our own
|
* been pushed yet will be discarded. Instead mark ourselves EOS, which will
|
||||||
* queue to send an event once everything has been pushed.
|
* make us send an event once everything has been pushed.
|
||||||
*/
|
*/
|
||||||
g_async_queue_push (src->queue, RECORDER_QUEUE_END);
|
g_mutex_lock (&src->queue_lock);
|
||||||
|
src->eos = TRUE;
|
||||||
|
g_cond_signal (&src->queue_cond);
|
||||||
|
g_mutex_unlock (&src->queue_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
|
Loading…
x
Reference in New Issue
Block a user