From aa723e7207f54ddd023c56f229b61cf1425a9929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20=C3=85dahl?= Date: Tue, 8 Jun 2021 11:51:28 +0200 Subject: [PATCH] thread: Add support for posting async task This uses the queue that was introduced when migrating impl task management from MetaThread to MetaThreadImpl, with the exception that it's now fully used as an actual queue. It now has a GSource that sits on the right GMainContext that is dispatched whenever there are tasks to execute. Part-of: --- src/backends/native/meta-thread-impl.c | 110 ++++++++++++++++++++++++- src/backends/native/meta-thread-impl.h | 13 +-- src/backends/native/meta-thread.c | 19 ++++- src/backends/native/meta-thread.h | 10 +++ src/tests/native-thread.c | 61 ++++++++++++++ 5 files changed, 204 insertions(+), 9 deletions(-) diff --git a/src/backends/native/meta-thread-impl.c b/src/backends/native/meta-thread-impl.c index 5b2de1bd2..977dc836d 100644 --- a/src/backends/native/meta-thread-impl.c +++ b/src/backends/native/meta-thread-impl.c @@ -37,6 +37,12 @@ enum static GParamSpec *obj_props[N_PROPS]; +typedef struct _MetaThreadImplSource +{ + GSource base; + MetaThreadImpl *thread_impl; +} MetaThreadImplSource; + typedef struct _MetaThreadImplPrivate { MetaThread *thread; @@ -44,6 +50,7 @@ typedef struct _MetaThreadImplPrivate gboolean in_impl_task; GMainContext *thread_context; + GSource *impl_source; GAsyncQueue *task_queue; } MetaThreadImplPrivate; @@ -51,8 +58,13 @@ struct _MetaThreadTask { MetaThreadTaskFunc func; gpointer user_data; + MetaThreadTaskFeedbackFunc feedback_func; gpointer feedback_user_data; + MetaThreadTaskFeedbackType feedback_type; + + gpointer retval; + GError *error; }; G_DEFINE_TYPE_WITH_PRIVATE (MetaThreadImpl, meta_thread_impl, G_TYPE_OBJECT) @@ -105,12 +117,74 @@ meta_thread_impl_set_property (GObject *object, } } +static gboolean +impl_source_prepare (GSource *source, + int *timeout) +{ + MetaThreadImplSource *impl_source = (MetaThreadImplSource *) source; + MetaThreadImpl *thread_impl = impl_source->thread_impl; + MetaThreadImplPrivate *priv = + meta_thread_impl_get_instance_private (thread_impl); + + g_assert (g_source_get_context (source) == priv->thread_context); + + *timeout = -1; + + return g_async_queue_length (priv->task_queue) > 0; +} + +static gboolean +impl_source_check (GSource *source) +{ + MetaThreadImplSource *impl_source = (MetaThreadImplSource *) source; + MetaThreadImpl *thread_impl = impl_source->thread_impl; + MetaThreadImplPrivate *priv = + meta_thread_impl_get_instance_private (thread_impl); + + g_assert (g_source_get_context (source) == priv->thread_context); + + return g_async_queue_length (priv->task_queue) > 0; +} + +static gboolean +impl_source_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + MetaThreadImplSource *impl_source = (MetaThreadImplSource *) source; + MetaThreadImpl *thread_impl = impl_source->thread_impl; + MetaThreadImplPrivate *priv = + meta_thread_impl_get_instance_private (thread_impl); + + g_assert (g_source_get_context (source) == priv->thread_context); + + meta_thread_impl_dispatch (thread_impl); + + return G_SOURCE_CONTINUE; +} + +static GSourceFuncs impl_source_funcs = { + .prepare = impl_source_prepare, + .check = impl_source_check, + .dispatch = impl_source_dispatch, +}; + static void meta_thread_impl_constructed (GObject *object) { MetaThreadImpl *thread_impl = META_THREAD_IMPL (object); MetaThreadImplPrivate *priv = meta_thread_impl_get_instance_private (thread_impl); + GSource *source; + MetaThreadImplSource *impl_source; + + source = g_source_new (&impl_source_funcs, sizeof (MetaThreadImplSource)); + impl_source = (MetaThreadImplSource *) source; + impl_source->thread_impl = thread_impl; + g_source_attach (source, priv->thread_context); + g_source_unref (source); + + priv->impl_source = source; priv->task_queue = g_async_queue_new (); @@ -124,6 +198,7 @@ meta_thread_impl_finalize (GObject *object) MetaThreadImplPrivate *priv = meta_thread_impl_get_instance_private (thread_impl); + g_clear_pointer (&priv->impl_source, g_source_destroy); g_clear_pointer (&priv->task_queue, g_async_queue_unref); g_clear_pointer (&priv->thread_context, g_main_context_unref); @@ -186,7 +261,8 @@ MetaThreadTask * meta_thread_task_new (MetaThreadTaskFunc func, gpointer user_data, MetaThreadTaskFeedbackFunc feedback_func, - gpointer feedback_user_data) + gpointer feedback_user_data, + MetaThreadTaskFeedbackType feedback_type) { MetaThreadTask *task; @@ -196,6 +272,7 @@ meta_thread_task_new (MetaThreadTaskFunc func, .user_data = user_data, .feedback_func = feedback_func, .feedback_user_data = feedback_user_data, + .feedback_type = feedback_type, }; return task; @@ -204,6 +281,7 @@ meta_thread_task_new (MetaThreadTaskFunc func, void meta_thread_task_free (MetaThreadTask *task) { + g_clear_error (&task->error); g_free (task); } @@ -365,6 +443,17 @@ meta_thread_impl_is_in_impl (MetaThreadImpl *thread_impl) return priv->in_impl_task; } +static void +invoke_task_feedback (MetaThread *thread, + gpointer user_data) +{ + MetaThreadTask *task = user_data; + + meta_assert_not_in_thread_impl (thread); + + task->feedback_func (task->retval, task->error, task->feedback_user_data); +} + void meta_thread_impl_dispatch (MetaThreadImpl *thread_impl) { @@ -380,9 +469,24 @@ meta_thread_impl_dispatch (MetaThreadImpl *thread_impl) retval = task->func (thread_impl, task->user_data, &error); if (task->feedback_func) - task->feedback_func (retval, error, task->feedback_user_data); + { + switch (task->feedback_type) + { + case META_THREAD_TASK_FEEDBACK_TYPE_IMPL: + task->feedback_func (retval, error, task->feedback_user_data); + break; + case META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK: + task->retval = retval; + task->error = g_steal_pointer (&error); + meta_thread_queue_callback (priv->thread, + invoke_task_feedback, + g_steal_pointer (&task), + (GDestroyNotify) meta_thread_task_free); + break; + } + } - meta_thread_task_free (task); + g_clear_pointer (&task, meta_thread_task_free); priv->in_impl_task = FALSE; } diff --git a/src/backends/native/meta-thread-impl.h b/src/backends/native/meta-thread-impl.h index 157b6ab95..83fd36493 100644 --- a/src/backends/native/meta-thread-impl.h +++ b/src/backends/native/meta-thread-impl.h @@ -37,11 +37,13 @@ struct _MetaThreadImplClass GObjectClass parent_class; }; -typedef struct _MetaThreadTask MetaThreadTask; +typedef enum _MetaThreadTaskFeedbackType +{ + META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK, + META_THREAD_TASK_FEEDBACK_TYPE_IMPL, +} MetaThreadTaskFeedbackType; -typedef void (* MetaThreadTaskFeedbackFunc) (gpointer retval, - const GError *error, - gpointer user_data); +typedef struct _MetaThreadTask MetaThreadTask; META_EXPORT_TEST MetaThread * meta_thread_impl_get_thread (MetaThreadImpl *thread_impl); @@ -70,7 +72,8 @@ gboolean meta_thread_impl_is_in_impl (MetaThreadImpl *thread_impl); MetaThreadTask * meta_thread_task_new (MetaThreadTaskFunc func, gpointer user_data, MetaThreadTaskFeedbackFunc feedback_func, - gpointer feedback_user_data); + gpointer feedback_user_data, + MetaThreadTaskFeedbackType feedback_type); void meta_thread_task_free (MetaThreadTask *task); diff --git a/src/backends/native/meta-thread.c b/src/backends/native/meta-thread.c index 74839765d..a6e7ab1d7 100644 --- a/src/backends/native/meta-thread.c +++ b/src/backends/native/meta-thread.c @@ -298,7 +298,8 @@ meta_thread_run_impl_task_sync (MetaThread *thread, MetaSyncTaskData data = { 0 }; task = meta_thread_task_new (func, user_data, - sync_task_done_in_impl, &data); + sync_task_done_in_impl, &data, + META_THREAD_TASK_FEEDBACK_TYPE_IMPL); meta_thread_impl_queue_task (priv->impl, task); priv->waiting_for_impl_task = TRUE; @@ -314,6 +315,22 @@ meta_thread_run_impl_task_sync (MetaThread *thread, return data.retval; } +void +meta_thread_post_impl_task (MetaThread *thread, + MetaThreadTaskFunc func, + gpointer user_data, + MetaThreadTaskFeedbackFunc feedback_func, + gpointer feedback_user_data) +{ + MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); + MetaThreadTask *task; + + task = meta_thread_task_new (func, user_data, + feedback_func, feedback_user_data, + META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK); + meta_thread_impl_queue_task (priv->impl, task); +} + MetaBackend * meta_thread_get_backend (MetaThread *thread) { diff --git a/src/backends/native/meta-thread.h b/src/backends/native/meta-thread.h index 228c29802..0703115c1 100644 --- a/src/backends/native/meta-thread.h +++ b/src/backends/native/meta-thread.h @@ -44,6 +44,9 @@ typedef void (* MetaThreadCallback) (MetaThread *thread, typedef gpointer (* MetaThreadTaskFunc) (MetaThreadImpl *thread_impl, gpointer user_data, GError **error); +typedef void (* MetaThreadTaskFeedbackFunc) (gpointer retval, + const GError *error, + gpointer user_data); META_EXPORT_TEST void meta_thread_queue_callback (MetaThread *thread, @@ -60,6 +63,13 @@ gpointer meta_thread_run_impl_task_sync (MetaThread *thread, gpointer user_data, GError **error); +META_EXPORT_TEST +void meta_thread_post_impl_task (MetaThread *thread, + MetaThreadTaskFunc func, + gpointer user_data, + MetaThreadTaskFeedbackFunc feedback_func, + gpointer feedback_user_data); + META_EXPORT_TEST MetaBackend * meta_thread_get_backend (MetaThread *thread); diff --git a/src/tests/native-thread.c b/src/tests/native-thread.c index 13f814987..9a2294bdc 100644 --- a/src/tests/native-thread.c +++ b/src/tests/native-thread.c @@ -193,6 +193,48 @@ add_idle_func (MetaThreadImpl *thread_impl, return GINT_TO_POINTER (TRUE); } +typedef struct +{ + MetaThread *thread; + GMainLoop *loop; + + GMutex mutex; + int state; +} AsyncData; + +static gpointer +async_func (MetaThreadImpl *thread_impl, + gpointer user_data, + GError **error) +{ + AsyncData *async_data = user_data; + + meta_assert_in_thread_impl (async_data->thread); + + g_mutex_lock (&async_data->mutex); + g_assert_cmpint (async_data->state, ==, 0); + async_data->state = 1; + g_mutex_unlock (&async_data->mutex); + + return GINT_TO_POINTER (TRUE); +} + +static void +async_feedback_func (gpointer retval, + const GError *error, + gpointer user_data) +{ + AsyncData *async_data = user_data; + + meta_assert_not_in_thread_impl (async_data->thread); + + g_mutex_lock (&async_data->mutex); + g_assert_cmpint (async_data->state, ==, 1); + async_data->state = 2; + g_main_loop_quit (async_data->loop); + g_mutex_unlock (&async_data->mutex); +} + static void run_thread_tests (MetaThread *thread) { @@ -204,6 +246,7 @@ run_thread_tests (MetaThread *thread) int buf; PipeData pipe_data; IdleData idle_data; + AsyncData async_data; meta_assert_not_in_thread_impl (thread); @@ -254,6 +297,24 @@ run_thread_tests (MetaThread *thread) g_main_loop_run (idle_data.loop); g_assert_cmpint (idle_data.state, ==, 3); g_main_loop_unref (idle_data.loop); + + /* Test async tasks */ + g_debug ("Test async task"); + async_data = (AsyncData) { 0 }; + g_mutex_init (&async_data.mutex); + async_data.thread = thread; + async_data.loop = g_main_loop_new (NULL, FALSE); + g_mutex_lock (&async_data.mutex); + meta_thread_post_impl_task (thread, async_func, &async_data, + async_feedback_func, &async_data); + g_assert_cmpint (async_data.state, ==, 0); + g_mutex_unlock (&async_data.mutex); + g_main_loop_run (async_data.loop); + g_mutex_lock (&async_data.mutex); + g_assert_cmpint (async_data.state, ==, 2); + g_mutex_unlock (&async_data.mutex); + g_main_loop_unref (async_data.loop); + g_mutex_clear (&async_data.mutex); } static void