thread: Add support for posting async task

This uses the queue that was introduced when migrating impl task
management from MetaThread to MetaThreadImpl, with the exception that
it's now fully used as an actual queue. It now has a GSource that sits
on the right GMainContext that is dispatched whenever there are tasks to
execute.

Part-of: <https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/2777>
This commit is contained in:
Jonas Ådahl 2021-06-08 11:51:28 +02:00
parent fda883e859
commit aa723e7207
5 changed files with 204 additions and 9 deletions

View File

@ -37,6 +37,12 @@ enum
static GParamSpec *obj_props[N_PROPS];
typedef struct _MetaThreadImplSource
{
GSource base;
MetaThreadImpl *thread_impl;
} MetaThreadImplSource;
typedef struct _MetaThreadImplPrivate
{
MetaThread *thread;
@ -44,6 +50,7 @@ typedef struct _MetaThreadImplPrivate
gboolean in_impl_task;
GMainContext *thread_context;
GSource *impl_source;
GAsyncQueue *task_queue;
} MetaThreadImplPrivate;
@ -51,8 +58,13 @@ struct _MetaThreadTask
{
MetaThreadTaskFunc func;
gpointer user_data;
MetaThreadTaskFeedbackFunc feedback_func;
gpointer feedback_user_data;
MetaThreadTaskFeedbackType feedback_type;
gpointer retval;
GError *error;
};
G_DEFINE_TYPE_WITH_PRIVATE (MetaThreadImpl, meta_thread_impl, G_TYPE_OBJECT)
@ -105,12 +117,74 @@ meta_thread_impl_set_property (GObject *object,
}
}
static gboolean
impl_source_prepare (GSource *source,
int *timeout)
{
MetaThreadImplSource *impl_source = (MetaThreadImplSource *) source;
MetaThreadImpl *thread_impl = impl_source->thread_impl;
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
g_assert (g_source_get_context (source) == priv->thread_context);
*timeout = -1;
return g_async_queue_length (priv->task_queue) > 0;
}
static gboolean
impl_source_check (GSource *source)
{
MetaThreadImplSource *impl_source = (MetaThreadImplSource *) source;
MetaThreadImpl *thread_impl = impl_source->thread_impl;
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
g_assert (g_source_get_context (source) == priv->thread_context);
return g_async_queue_length (priv->task_queue) > 0;
}
static gboolean
impl_source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
MetaThreadImplSource *impl_source = (MetaThreadImplSource *) source;
MetaThreadImpl *thread_impl = impl_source->thread_impl;
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
g_assert (g_source_get_context (source) == priv->thread_context);
meta_thread_impl_dispatch (thread_impl);
return G_SOURCE_CONTINUE;
}
static GSourceFuncs impl_source_funcs = {
.prepare = impl_source_prepare,
.check = impl_source_check,
.dispatch = impl_source_dispatch,
};
static void
meta_thread_impl_constructed (GObject *object)
{
MetaThreadImpl *thread_impl = META_THREAD_IMPL (object);
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
GSource *source;
MetaThreadImplSource *impl_source;
source = g_source_new (&impl_source_funcs, sizeof (MetaThreadImplSource));
impl_source = (MetaThreadImplSource *) source;
impl_source->thread_impl = thread_impl;
g_source_attach (source, priv->thread_context);
g_source_unref (source);
priv->impl_source = source;
priv->task_queue = g_async_queue_new ();
@ -124,6 +198,7 @@ meta_thread_impl_finalize (GObject *object)
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
g_clear_pointer (&priv->impl_source, g_source_destroy);
g_clear_pointer (&priv->task_queue, g_async_queue_unref);
g_clear_pointer (&priv->thread_context, g_main_context_unref);
@ -186,7 +261,8 @@ MetaThreadTask *
meta_thread_task_new (MetaThreadTaskFunc func,
gpointer user_data,
MetaThreadTaskFeedbackFunc feedback_func,
gpointer feedback_user_data)
gpointer feedback_user_data,
MetaThreadTaskFeedbackType feedback_type)
{
MetaThreadTask *task;
@ -196,6 +272,7 @@ meta_thread_task_new (MetaThreadTaskFunc func,
.user_data = user_data,
.feedback_func = feedback_func,
.feedback_user_data = feedback_user_data,
.feedback_type = feedback_type,
};
return task;
@ -204,6 +281,7 @@ meta_thread_task_new (MetaThreadTaskFunc func,
void
meta_thread_task_free (MetaThreadTask *task)
{
g_clear_error (&task->error);
g_free (task);
}
@ -365,6 +443,17 @@ meta_thread_impl_is_in_impl (MetaThreadImpl *thread_impl)
return priv->in_impl_task;
}
static void
invoke_task_feedback (MetaThread *thread,
gpointer user_data)
{
MetaThreadTask *task = user_data;
meta_assert_not_in_thread_impl (thread);
task->feedback_func (task->retval, task->error, task->feedback_user_data);
}
void
meta_thread_impl_dispatch (MetaThreadImpl *thread_impl)
{
@ -380,9 +469,24 @@ meta_thread_impl_dispatch (MetaThreadImpl *thread_impl)
retval = task->func (thread_impl, task->user_data, &error);
if (task->feedback_func)
task->feedback_func (retval, error, task->feedback_user_data);
{
switch (task->feedback_type)
{
case META_THREAD_TASK_FEEDBACK_TYPE_IMPL:
task->feedback_func (retval, error, task->feedback_user_data);
break;
case META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK:
task->retval = retval;
task->error = g_steal_pointer (&error);
meta_thread_queue_callback (priv->thread,
invoke_task_feedback,
g_steal_pointer (&task),
(GDestroyNotify) meta_thread_task_free);
break;
}
}
meta_thread_task_free (task);
g_clear_pointer (&task, meta_thread_task_free);
priv->in_impl_task = FALSE;
}

View File

@ -37,11 +37,13 @@ struct _MetaThreadImplClass
GObjectClass parent_class;
};
typedef struct _MetaThreadTask MetaThreadTask;
typedef enum _MetaThreadTaskFeedbackType
{
META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK,
META_THREAD_TASK_FEEDBACK_TYPE_IMPL,
} MetaThreadTaskFeedbackType;
typedef void (* MetaThreadTaskFeedbackFunc) (gpointer retval,
const GError *error,
gpointer user_data);
typedef struct _MetaThreadTask MetaThreadTask;
META_EXPORT_TEST
MetaThread * meta_thread_impl_get_thread (MetaThreadImpl *thread_impl);
@ -70,7 +72,8 @@ gboolean meta_thread_impl_is_in_impl (MetaThreadImpl *thread_impl);
MetaThreadTask * meta_thread_task_new (MetaThreadTaskFunc func,
gpointer user_data,
MetaThreadTaskFeedbackFunc feedback_func,
gpointer feedback_user_data);
gpointer feedback_user_data,
MetaThreadTaskFeedbackType feedback_type);
void meta_thread_task_free (MetaThreadTask *task);

View File

@ -298,7 +298,8 @@ meta_thread_run_impl_task_sync (MetaThread *thread,
MetaSyncTaskData data = { 0 };
task = meta_thread_task_new (func, user_data,
sync_task_done_in_impl, &data);
sync_task_done_in_impl, &data,
META_THREAD_TASK_FEEDBACK_TYPE_IMPL);
meta_thread_impl_queue_task (priv->impl, task);
priv->waiting_for_impl_task = TRUE;
@ -314,6 +315,22 @@ meta_thread_run_impl_task_sync (MetaThread *thread,
return data.retval;
}
void
meta_thread_post_impl_task (MetaThread *thread,
MetaThreadTaskFunc func,
gpointer user_data,
MetaThreadTaskFeedbackFunc feedback_func,
gpointer feedback_user_data)
{
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
MetaThreadTask *task;
task = meta_thread_task_new (func, user_data,
feedback_func, feedback_user_data,
META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK);
meta_thread_impl_queue_task (priv->impl, task);
}
MetaBackend *
meta_thread_get_backend (MetaThread *thread)
{

View File

@ -44,6 +44,9 @@ typedef void (* MetaThreadCallback) (MetaThread *thread,
typedef gpointer (* MetaThreadTaskFunc) (MetaThreadImpl *thread_impl,
gpointer user_data,
GError **error);
typedef void (* MetaThreadTaskFeedbackFunc) (gpointer retval,
const GError *error,
gpointer user_data);
META_EXPORT_TEST
void meta_thread_queue_callback (MetaThread *thread,
@ -60,6 +63,13 @@ gpointer meta_thread_run_impl_task_sync (MetaThread *thread,
gpointer user_data,
GError **error);
META_EXPORT_TEST
void meta_thread_post_impl_task (MetaThread *thread,
MetaThreadTaskFunc func,
gpointer user_data,
MetaThreadTaskFeedbackFunc feedback_func,
gpointer feedback_user_data);
META_EXPORT_TEST
MetaBackend * meta_thread_get_backend (MetaThread *thread);

View File

@ -193,6 +193,48 @@ add_idle_func (MetaThreadImpl *thread_impl,
return GINT_TO_POINTER (TRUE);
}
typedef struct
{
MetaThread *thread;
GMainLoop *loop;
GMutex mutex;
int state;
} AsyncData;
static gpointer
async_func (MetaThreadImpl *thread_impl,
gpointer user_data,
GError **error)
{
AsyncData *async_data = user_data;
meta_assert_in_thread_impl (async_data->thread);
g_mutex_lock (&async_data->mutex);
g_assert_cmpint (async_data->state, ==, 0);
async_data->state = 1;
g_mutex_unlock (&async_data->mutex);
return GINT_TO_POINTER (TRUE);
}
static void
async_feedback_func (gpointer retval,
const GError *error,
gpointer user_data)
{
AsyncData *async_data = user_data;
meta_assert_not_in_thread_impl (async_data->thread);
g_mutex_lock (&async_data->mutex);
g_assert_cmpint (async_data->state, ==, 1);
async_data->state = 2;
g_main_loop_quit (async_data->loop);
g_mutex_unlock (&async_data->mutex);
}
static void
run_thread_tests (MetaThread *thread)
{
@ -204,6 +246,7 @@ run_thread_tests (MetaThread *thread)
int buf;
PipeData pipe_data;
IdleData idle_data;
AsyncData async_data;
meta_assert_not_in_thread_impl (thread);
@ -254,6 +297,24 @@ run_thread_tests (MetaThread *thread)
g_main_loop_run (idle_data.loop);
g_assert_cmpint (idle_data.state, ==, 3);
g_main_loop_unref (idle_data.loop);
/* Test async tasks */
g_debug ("Test async task");
async_data = (AsyncData) { 0 };
g_mutex_init (&async_data.mutex);
async_data.thread = thread;
async_data.loop = g_main_loop_new (NULL, FALSE);
g_mutex_lock (&async_data.mutex);
meta_thread_post_impl_task (thread, async_func, &async_data,
async_feedback_func, &async_data);
g_assert_cmpint (async_data.state, ==, 0);
g_mutex_unlock (&async_data.mutex);
g_main_loop_run (async_data.loop);
g_mutex_lock (&async_data.mutex);
g_assert_cmpint (async_data.state, ==, 2);
g_mutex_unlock (&async_data.mutex);
g_main_loop_unref (async_data.loop);
g_mutex_clear (&async_data.mutex);
}
static void