diff --git a/src/backends/native/meta-kms.c b/src/backends/native/meta-kms.c index 8872fb4b6..80b50e7b5 100644 --- a/src/backends/native/meta-kms.c +++ b/src/backends/native/meta-kms.c @@ -225,7 +225,11 @@ meta_kms_queue_callback (MetaKms *kms, { MetaThread *thread = META_THREAD (kms); - meta_thread_queue_callback (thread, callback, user_data, user_data_destroy); + meta_thread_queue_callback (thread, + NULL, + callback, + user_data, + user_data_destroy); } gpointer diff --git a/src/backends/native/meta-thread-impl.c b/src/backends/native/meta-thread-impl.c index 086e9268b..f0145dd2b 100644 --- a/src/backends/native/meta-thread-impl.c +++ b/src/backends/native/meta-thread-impl.c @@ -65,7 +65,7 @@ struct _MetaThreadTask MetaThreadTaskFeedbackFunc feedback_func; gpointer feedback_user_data; - MetaThreadTaskFeedbackType feedback_type; + GMainContext *feedback_main_context; gpointer retval; GError *error; @@ -267,11 +267,11 @@ meta_thread_impl_get_main_context (MetaThreadImpl *thread_impl) } MetaThreadTask * -meta_thread_task_new (MetaThreadTaskFunc func, - gpointer user_data, - MetaThreadTaskFeedbackFunc feedback_func, - gpointer feedback_user_data, - MetaThreadTaskFeedbackType feedback_type) +meta_thread_task_new (MetaThreadTaskFunc func, + gpointer user_data, + MetaThreadTaskFeedbackFunc feedback_func, + gpointer feedback_user_data, + GMainContext *feedback_main_context) { MetaThreadTask *task; @@ -281,7 +281,7 @@ meta_thread_task_new (MetaThreadTaskFunc func, .user_data = user_data, .feedback_func = feedback_func, .feedback_user_data = feedback_user_data, - .feedback_type = feedback_type, + .feedback_main_context = feedback_main_context, }; return task; @@ -512,19 +512,21 @@ meta_thread_impl_dispatch (MetaThreadImpl *thread_impl) if (task->feedback_func) { - switch (task->feedback_type) + if (task->feedback_main_context == priv->thread_context) { - case META_THREAD_TASK_FEEDBACK_TYPE_IMPL: task->feedback_func (retval, error, task->feedback_user_data); - break; - case META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK: + } + else + { + GMainContext *feedback_main_context = task->feedback_main_context; + task->retval = retval; task->error = g_steal_pointer (&error); meta_thread_queue_callback (priv->thread, + feedback_main_context, invoke_task_feedback, g_steal_pointer (&task), (GDestroyNotify) meta_thread_task_free); - break; } } diff --git a/src/backends/native/meta-thread-impl.h b/src/backends/native/meta-thread-impl.h index 2dd880210..164506f7f 100644 --- a/src/backends/native/meta-thread-impl.h +++ b/src/backends/native/meta-thread-impl.h @@ -73,11 +73,11 @@ int meta_thread_impl_dispatch (MetaThreadImpl *thread_impl); gboolean meta_thread_impl_is_in_impl (MetaThreadImpl *thread_impl); -MetaThreadTask * meta_thread_task_new (MetaThreadTaskFunc func, - gpointer user_data, - MetaThreadTaskFeedbackFunc feedback_func, - gpointer feedback_user_data, - MetaThreadTaskFeedbackType feedback_type); +MetaThreadTask * meta_thread_task_new (MetaThreadTaskFunc func, + gpointer user_data, + MetaThreadTaskFeedbackFunc feedback_func, + gpointer feedback_user_data, + GMainContext *feedback_main_context); void meta_thread_task_free (MetaThreadTask *task); diff --git a/src/backends/native/meta-thread.c b/src/backends/native/meta-thread.c index f0de9d18f..ae3cb00a8 100644 --- a/src/backends/native/meta-thread.c +++ b/src/backends/native/meta-thread.c @@ -21,6 +21,8 @@ #include "backends/native/meta-thread-private.h" +#include + #include "backends/meta-backend-private.h" #include "backends/meta-backend-types.h" #include "backends/native/meta-thread-impl.h" @@ -47,6 +49,19 @@ typedef struct _MetaThreadCallbackData GDestroyNotify user_data_destroy; } MetaThreadCallbackData; +typedef struct _MetaThreadCallbackSource +{ + GSource base; + + GMutex mutex; + GCond cond; + + MetaThread *thread; + GMainContext *main_context; + GList *callbacks; + gboolean needs_flush; +} MetaThreadCallbackSource; + typedef struct _MetaThreadPrivate { MetaBackend *backend; @@ -58,8 +73,7 @@ typedef struct _MetaThreadPrivate gboolean waiting_for_impl_task; GMutex callbacks_mutex; - GList *pending_callbacks; - guint callbacks_source_id; + GHashTable *callback_sources; MetaThreadType thread_type; @@ -172,6 +186,11 @@ meta_thread_initable_init (GInitable *initable, priv->main_context = g_main_context_default (); + priv->callback_sources = + g_hash_table_new_full (NULL, NULL, + NULL, (GDestroyNotify) g_source_destroy); + meta_thread_register_callback_context (thread, priv->main_context); + switch (priv->thread_type) { case META_THREAD_TYPE_USER: @@ -248,7 +267,9 @@ meta_thread_finalize (GObject *object) } meta_thread_flush_callbacks (thread); + meta_thread_unregister_callback_context (thread, priv->main_context); + g_warn_if_fail (g_hash_table_size (priv->callback_sources) == 0); g_clear_object (&priv->impl); g_clear_pointer (&priv->name, g_free); @@ -336,50 +357,185 @@ dispatch_callbacks (MetaThread *thread, return callback_count; } -int +void meta_thread_flush_callbacks (MetaThread *thread) { MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); g_autoptr (GList) pending_callbacks = NULL; + MetaThreadCallbackSource *main_callback_source; + g_autoptr (GList) callback_sources = NULL; + GList *l; - meta_assert_not_in_thread_impl (thread); + g_assert (!g_main_context_get_thread_default ()); - g_mutex_lock (&priv->callbacks_mutex); - pending_callbacks = g_steal_pointer (&priv->pending_callbacks); - g_clear_handle_id (&priv->callbacks_source_id, g_source_remove); - g_mutex_unlock (&priv->callbacks_mutex); + while (TRUE) + { + gboolean needs_reflush = FALSE; - return dispatch_callbacks (thread, pending_callbacks); + g_mutex_lock (&priv->callbacks_mutex); + main_callback_source = g_hash_table_lookup (priv->callback_sources, + priv->main_context); + pending_callbacks = g_steal_pointer (&main_callback_source->callbacks); + callback_sources = g_hash_table_get_values (priv->callback_sources); + g_mutex_unlock (&priv->callbacks_mutex); + + if (dispatch_callbacks (thread, pending_callbacks) > 0) + needs_reflush = TRUE; + + g_list_foreach (callback_sources, (GFunc) g_source_ref, NULL); + for (l = callback_sources; l; l = l->next) + { + MetaThreadCallbackSource *callback_source = l->data; + + if (callback_source == main_callback_source) + continue; + + g_mutex_lock (&callback_source->mutex); + while (callback_source->needs_flush) + { + needs_reflush = TRUE; + g_cond_wait (&callback_source->cond, &callback_source->mutex); + } + g_mutex_unlock (&callback_source->mutex); + } + g_list_foreach (callback_sources, (GFunc) g_source_unref, NULL); + + if (!needs_reflush) + break; + } } static gboolean -callback_idle (gpointer user_data) +callback_source_prepare (GSource *source, + int *timeout) { - MetaThread *thread = user_data; + MetaThreadCallbackSource *callback_source = + (MetaThreadCallbackSource *) source; + MetaThread *thread = callback_source->thread; + MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); + gboolean retval; + + *timeout = -1; + + g_mutex_lock (&priv->callbacks_mutex); + retval = !!callback_source->callbacks; + g_mutex_unlock (&priv->callbacks_mutex); + + return retval; +} + +static gboolean +callback_source_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + MetaThreadCallbackSource *callback_source = + (MetaThreadCallbackSource *) source; + MetaThread *thread = callback_source->thread; MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); g_autoptr (GList) pending_callbacks = NULL; - meta_assert_not_in_thread_impl (thread); - g_mutex_lock (&priv->callbacks_mutex); - pending_callbacks = g_steal_pointer (&priv->pending_callbacks); - priv->callbacks_source_id = 0; + pending_callbacks = g_steal_pointer (&callback_source->callbacks); g_mutex_unlock (&priv->callbacks_mutex); dispatch_callbacks (thread, pending_callbacks); - return G_SOURCE_REMOVE; + g_mutex_lock (&priv->callbacks_mutex); + + if (callback_source->callbacks) + { + g_source_set_ready_time (source, 0); + } + else + { + g_source_set_ready_time (source, -1); + + g_mutex_lock (&callback_source->mutex); + callback_source->needs_flush = FALSE; + g_cond_signal (&callback_source->cond); + g_mutex_unlock (&callback_source->mutex); + } + + g_mutex_unlock (&priv->callbacks_mutex); + + return G_SOURCE_CONTINUE; +} + +static void +callback_source_finalize (GSource *source) +{ + MetaThreadCallbackSource *callback_source = + (MetaThreadCallbackSource *) source; + + g_list_free_full (callback_source->callbacks, + (GDestroyNotify) meta_thread_callback_data_free); + + g_cond_clear (&callback_source->cond); + g_mutex_clear (&callback_source->mutex); +} + +static GSourceFuncs callback_source_funcs = { + .prepare = callback_source_prepare, + .dispatch = callback_source_dispatch, + .finalize = callback_source_finalize, +}; + +void +meta_thread_register_callback_context (MetaThread *thread, + GMainContext *main_context) +{ + MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); + GSource *source; + MetaThreadCallbackSource *callback_source; + + source = g_source_new (&callback_source_funcs, + sizeof (MetaThreadCallbackSource)); + callback_source = (MetaThreadCallbackSource *) source; + g_mutex_init (&callback_source->mutex); + g_cond_init (&callback_source->cond); + callback_source->thread = thread; + callback_source->main_context = main_context; + + g_source_set_ready_time (&callback_source->base, -1); + g_source_set_priority (source, G_PRIORITY_HIGH + 1); + g_source_attach (source, main_context); + g_source_unref (source); + + g_hash_table_insert (priv->callback_sources, + main_context, + callback_source); +} + +void +meta_thread_unregister_callback_context (MetaThread *thread, + GMainContext *main_context) +{ + MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); + + g_hash_table_remove (priv->callback_sources, main_context); } void meta_thread_queue_callback (MetaThread *thread, + GMainContext *main_context, MetaThreadCallback callback, gpointer user_data, GDestroyNotify user_data_destroy) { MetaThreadPrivate *priv = meta_thread_get_instance_private (thread); + g_autoptr (GMutexLocker) locker; + MetaThreadCallbackSource *callback_source; MetaThreadCallbackData *callback_data; + if (!main_context) + main_context = g_main_context_default (); + + locker = g_mutex_locker_new (&priv->callbacks_mutex); + + callback_source = g_hash_table_lookup (priv->callback_sources, main_context); + g_return_if_fail (callback_source); + callback_data = g_new0 (MetaThreadCallbackData, 1); *callback_data = (MetaThreadCallbackData) { .callback = callback, @@ -387,20 +543,12 @@ meta_thread_queue_callback (MetaThread *thread, .user_data_destroy = user_data_destroy, }; - g_mutex_lock (&priv->callbacks_mutex); - priv->pending_callbacks = g_list_append (priv->pending_callbacks, - callback_data); - if (!priv->callbacks_source_id) - { - GSource *idle_source; - - idle_source = g_idle_source_new (); - g_source_set_callback (idle_source, callback_idle, thread, NULL); - priv->callbacks_source_id = g_source_attach (idle_source, - priv->main_context); - g_source_unref (idle_source); - } - g_mutex_unlock (&priv->callbacks_mutex); + g_mutex_lock (&callback_source->mutex); + callback_source->needs_flush = TRUE; + callback_source->callbacks = g_list_append (callback_source->callbacks, + callback_data); + g_source_set_ready_time (&callback_source->base, 0); + g_mutex_unlock (&callback_source->mutex); } typedef struct _MetaSyncTaskData @@ -438,7 +586,7 @@ run_impl_task_sync_user (MetaThread *thread, task = meta_thread_task_new (func, user_data, sync_task_done_user_in_impl, &data, - META_THREAD_TASK_FEEDBACK_TYPE_IMPL); + meta_thread_impl_get_main_context (priv->impl)); meta_thread_impl_queue_task (priv->impl, task); priv->waiting_for_impl_task = TRUE; @@ -487,7 +635,7 @@ run_impl_task_sync_kernel (MetaThread *thread, task = meta_thread_task_new (func, user_data, sync_task_done_kernel_in_impl, &data, - META_THREAD_TASK_FEEDBACK_TYPE_IMPL); + meta_thread_impl_get_main_context (priv->impl)); meta_thread_impl_queue_task (priv->impl, task); while (!data.done) @@ -537,7 +685,7 @@ meta_thread_post_impl_task (MetaThread *thread, task = meta_thread_task_new (func, user_data, feedback_func, feedback_user_data, - META_THREAD_TASK_FEEDBACK_TYPE_CALLBACK); + g_main_context_get_thread_default ()); meta_thread_impl_queue_task (priv->impl, task); } diff --git a/src/backends/native/meta-thread.h b/src/backends/native/meta-thread.h index a754e891f..142c8c204 100644 --- a/src/backends/native/meta-thread.h +++ b/src/backends/native/meta-thread.h @@ -54,14 +54,23 @@ typedef void (* MetaThreadTaskFeedbackFunc) (gpointer retval, const GError *error, gpointer user_data); +META_EXPORT_TEST +void meta_thread_register_callback_context (MetaThread *thread, + GMainContext *main_context); + +META_EXPORT_TEST +void meta_thread_unregister_callback_context (MetaThread *thread, + GMainContext *main_context); + META_EXPORT_TEST void meta_thread_queue_callback (MetaThread *thread, + GMainContext *main_context, MetaThreadCallback callback, gpointer user_data, GDestroyNotify user_data_destroy); META_EXPORT_TEST -int meta_thread_flush_callbacks (MetaThread *thread); +void meta_thread_flush_callbacks (MetaThread *thread); META_EXPORT_TEST gpointer meta_thread_run_impl_task_sync (MetaThread *thread, diff --git a/src/tests/native-thread.c b/src/tests/native-thread.c index 41f51ce6d..d90617adc 100644 --- a/src/tests/native-thread.c +++ b/src/tests/native-thread.c @@ -104,6 +104,7 @@ queue_callback_func (MetaThreadImpl *thread_impl, *state = 1; meta_thread_queue_callback (meta_thread_impl_get_thread (thread_impl), + NULL, callback_func, user_data, user_data_destroy); @@ -415,6 +416,215 @@ mixed_sync_func (MetaThreadImpl *thread_impl, return GINT_TO_POINTER (2); } +typedef struct +{ + MetaThread *thread; + gboolean registered; + + GThread *gthread; + GMutex init_mutex; + GCond init_cond; + + GMainContext *main_context; + GMainLoop *main_loop; + int sleep_s; + + int state; +} FlushData; + +typedef struct +{ + GMainLoop *loop; + int use_count; +} LoopUser; + +static gpointer +blocking_flush_thread_func (gpointer user_data) +{ + FlushData *flush_data = user_data; + + meta_thread_register_callback_context (flush_data->thread, + flush_data->main_context); + g_mutex_lock (&flush_data->init_mutex); + flush_data->registered = TRUE; + g_cond_signal (&flush_data->init_cond); + g_mutex_unlock (&flush_data->init_mutex); + + flush_data->main_loop = g_main_loop_new (flush_data->main_context, FALSE); + g_main_loop_run (flush_data->main_loop); + g_clear_pointer (&flush_data->main_loop, g_main_loop_unref); + meta_thread_unregister_callback_context (flush_data->thread, + flush_data->main_context); + + return GINT_TO_POINTER (TRUE); +} + +static void +slow_callback (MetaThread *thread, + gpointer user_data) +{ + FlushData *flush_data = user_data; + + g_assert_cmpint (flush_data->state, ==, 1); + flush_data->state = 2; + + sleep (flush_data->sleep_s); + + g_assert_cmpint (flush_data->state, ==, 2); + flush_data->state = 3; + + g_main_loop_quit (flush_data->main_loop); +} + +static gpointer +queue_slow_callback (MetaThreadImpl *thread_impl, + gpointer user_data, + GError **error) +{ + FlushData *flush_data = user_data; + + g_mutex_lock (&flush_data->init_mutex); + g_mutex_unlock (&flush_data->init_mutex); + + g_assert_cmpint (flush_data->state, ==, 0); + flush_data->state = 1; + + meta_thread_queue_callback (meta_thread_impl_get_thread (thread_impl), + flush_data->main_context, + slow_callback, + flush_data, + NULL); + return GINT_TO_POINTER (TRUE); +} + +static void +quit_main_loop_feedback_func (gpointer retval, + const GError *error, + gpointer user_data) +{ + LoopUser *loop_user = user_data; + + g_assert_cmpint (loop_user->use_count, >, 0); + + loop_user->use_count--; + if (loop_user->use_count == 0) + g_main_loop_quit (loop_user->loop); +} + +typedef struct +{ + GThread *gthread; + GMutex init_mutex; + MetaThread *thread; + + GMainLoop *main_thread_loop; + + GMainContext *thread_main_context; + GMainLoop *thread_loop; + + int state; +} CallbackData; + +static void +non_default_thread_callback_func (MetaThread *thread, + gpointer user_data) +{ + CallbackData *callback_data = user_data; + + g_assert (g_thread_self () == callback_data->gthread); + + g_assert_cmpint (callback_data->state, ==, 3); + callback_data->state = 4; +} + +static void +callback_destroy_cb (gpointer user_data) +{ + CallbackData *callback_data = user_data; + + g_assert (g_thread_self () == callback_data->gthread); + + g_assert_cmpint (callback_data->state, ==, 4); + callback_data->state = 5; +} + +static gpointer +queue_non_default_callback_func (MetaThreadImpl *thread_impl, + gpointer user_data, + GError **error) +{ + CallbackData *callback_data = user_data; + + meta_assert_in_thread_impl (meta_thread_impl_get_thread (thread_impl)); + + g_assert_cmpint (callback_data->state, ==, 2); + callback_data->state = 3; + + meta_thread_queue_callback (meta_thread_impl_get_thread (thread_impl), + callback_data->thread_main_context, + non_default_thread_callback_func, + callback_data, + callback_destroy_cb); + + return GINT_TO_POINTER (42); +} + +static void +non_default_thread_feedback_func (gpointer retval, + const GError *error, + gpointer user_data) +{ + CallbackData *callback_data = user_data; + + g_assert (g_thread_self () == callback_data->gthread); + + g_assert_cmpint (callback_data->state, ==, 5); + callback_data->state = 6; + + g_assert_cmpint (GPOINTER_TO_INT (retval), ==, 42); + g_assert_null (error); + + g_main_loop_quit (callback_data->thread_loop); +} + +static gpointer +non_default_callback_thread_func (gpointer user_data) +{ + CallbackData *callback_data = user_data; + + g_mutex_lock (&callback_data->init_mutex); + g_mutex_unlock (&callback_data->init_mutex); + + g_assert_cmpint (callback_data->state, ==, 1); + callback_data->state = 2; + + callback_data->thread_main_context = g_main_context_new (); + g_main_context_push_thread_default (callback_data->thread_main_context); + callback_data->thread_loop = + g_main_loop_new (callback_data->thread_main_context, FALSE); + meta_thread_register_callback_context (callback_data->thread, + callback_data->thread_main_context); + + meta_thread_post_impl_task (callback_data->thread, + queue_non_default_callback_func, callback_data, + non_default_thread_feedback_func, + callback_data); + + g_main_loop_run (callback_data->thread_loop); + g_main_loop_unref (callback_data->thread_loop); + + g_assert_cmpint (callback_data->state, ==, 6); + callback_data->state = 7; + + g_main_loop_quit (callback_data->main_thread_loop); + meta_thread_unregister_callback_context (callback_data->thread, + callback_data->thread_main_context); + g_main_context_pop_thread_default (callback_data->thread_main_context); + g_main_context_unref (callback_data->thread_main_context); + + return GINT_TO_POINTER (TRUE); +} + static void run_thread_tests (MetaThread *thread) { @@ -428,6 +638,10 @@ run_thread_tests (MetaThread *thread) IdleData idle_data; AsyncData async_data; MixedData mixed_data; + FlushData flush_data1; + FlushData flush_data2; + LoopUser loop_user; + CallbackData callback_data; meta_assert_not_in_thread_impl (thread); @@ -543,6 +757,81 @@ run_thread_tests (MetaThread *thread) g_assert_cmpint (mixed_data.state, ==, 3); g_mutex_unlock (&mixed_data.mutex); g_mutex_clear (&mixed_data.mutex); + + /* Blocking flush. */ + g_debug ("Test blocking flush"); + loop_user = (LoopUser) { + .loop = g_main_loop_new (NULL, FALSE), + .use_count = 2, + }; + flush_data1 = (FlushData) { + .thread = thread, + .sleep_s = 3, + }; + g_mutex_init (&flush_data1.init_mutex); + g_cond_init (&flush_data1.init_cond); + g_mutex_lock (&flush_data1.init_mutex); + flush_data1.main_context = g_main_context_new (); + flush_data1.gthread = g_thread_new ("blocking-flush-thread #1", + blocking_flush_thread_func, + &flush_data1); + while (!flush_data1.registered) + g_cond_wait (&flush_data1.init_cond, &flush_data1.init_mutex); + g_mutex_unlock (&flush_data1.init_mutex); + meta_thread_post_impl_task (thread, + queue_slow_callback, + &flush_data1, + quit_main_loop_feedback_func, + &loop_user); + flush_data2 = (FlushData) { + .thread = thread, + .sleep_s = 2, + }; + g_mutex_init (&flush_data2.init_mutex); + g_mutex_lock (&flush_data2.init_mutex); + flush_data2.main_context = g_main_context_new (); + flush_data2.gthread = g_thread_new ("blocking-flush-thread #2", + blocking_flush_thread_func, + &flush_data2); + while (!flush_data2.registered) + g_cond_wait (&flush_data2.init_cond, &flush_data2.init_mutex); + g_mutex_unlock (&flush_data2.init_mutex); + meta_thread_post_impl_task (thread, + queue_slow_callback, + &flush_data2, + quit_main_loop_feedback_func, + &loop_user); + + g_main_loop_run (loop_user.loop); + g_clear_pointer (&loop_user.loop, g_main_loop_unref); + + meta_thread_flush_callbacks (thread); + + g_assert_cmpint (flush_data1.state, ==, 3); + g_assert_cmpint (flush_data2.state, ==, 3); + + g_thread_join (flush_data1.gthread); + g_main_context_unref (flush_data1.main_context); + g_thread_join (flush_data2.gthread); + g_main_context_unref (flush_data2.main_context); + + /* Callbacks to non-default thread. */ + g_debug ("Test callbacks to non-default thread"); + callback_data = (CallbackData) {}; + callback_data.thread = thread; + callback_data.main_thread_loop = g_main_loop_new (NULL, FALSE); + g_mutex_init (&callback_data.init_mutex); + g_mutex_lock (&callback_data.init_mutex); + callback_data.gthread = + g_thread_new ("test-non-default-callback-thread", + non_default_callback_thread_func, &callback_data); + callback_data.state = 1; + g_mutex_unlock (&callback_data.init_mutex); + g_main_loop_run (callback_data.main_thread_loop); + g_main_loop_unref (callback_data.main_thread_loop); + g_thread_join (callback_data.gthread); + g_mutex_clear (&callback_data.init_mutex); + g_assert_cmpint (callback_data.state, ==, 7); } static void