thread: Allow switching thread type

This will be necessary in order to default to 'kernel' and then switch
to 'user' if the thread instance can no longer be properly multi
threaded.

To avoid having the same thread impl creating and destroying
GMainContext's, this also means always creating a GMainContext for the
thread-impl. When running in user-thread mode, the GMainContext is
wrapped in a wrapper source and dispatched as part of the real main
thread GMainContext, and when in kernel-thread mode, it runs
independently in the dedicated thread.

This has the consequence that the wrapper source will always have the
priority of the highest impl context GSource, but only after it has
dispatched once. Would we need it earlier than that, we either need a
way to introspect existing sources in a GMainContext and their
priorities, or manually track known sources in MetaThreadImpl.

The wrapper source will never be below 0, as that'd mean it could reach
INT_MAX priority if it had no more sources attached to it, meaning it'd
never be dispatched again.

Part-of: <https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/2777>
This commit is contained in:
Jonas Ådahl 2022-06-15 22:50:52 +02:00
parent 33b33aa370
commit 788ad43e17
4 changed files with 317 additions and 37 deletions

View File

@ -174,10 +174,9 @@ static GSourceFuncs impl_source_funcs = {
.dispatch = impl_source_dispatch,
};
static void
meta_thread_impl_constructed (GObject *object)
static GSource *
create_impl_source (MetaThreadImpl *thread_impl)
{
MetaThreadImpl *thread_impl = META_THREAD_IMPL (object);
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
GSource *source;
@ -193,8 +192,17 @@ meta_thread_impl_constructed (GObject *object)
g_source_attach (source, priv->thread_context);
g_source_unref (source);
priv->impl_source = source;
return source;
}
static void
meta_thread_impl_constructed (GObject *object)
{
MetaThreadImpl *thread_impl = META_THREAD_IMPL (object);
MetaThreadImplPrivate *priv =
meta_thread_impl_get_instance_private (thread_impl);
priv->impl_source = create_impl_source (thread_impl);
priv->task_queue = g_async_queue_new ();
G_OBJECT_CLASS (meta_thread_impl_parent_class)->constructed (object);
@ -239,7 +247,7 @@ meta_thread_impl_class_init (MetaThreadImplClass *klass)
"GMainContext",
G_TYPE_MAIN_CONTEXT,
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_CONSTRUCT |
G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (object_class, N_PROPS, obj_props);
}

View File

@ -71,6 +71,7 @@ typedef struct _MetaThreadPrivate
MetaThreadImpl *impl;
gboolean waiting_for_impl_task;
GSource *wrapper_source;
GMutex callbacks_mutex;
GHashTable *callback_sources;
@ -173,6 +174,162 @@ thread_impl_func (gpointer user_data)
return GINT_TO_POINTER (TRUE);
}
typedef struct _WrapperSource
{
GSource base;
GMainContext *thread_main_context;
GPollFD fds[256];
gpointer fd_tags[256];
int n_fds;
int priority;
} WrapperSource;
static gboolean
wrapper_source_prepare (GSource *source,
int *timeout)
{
WrapperSource *wrapper_source = (WrapperSource *) source;
int ret;
int old_n_fds = wrapper_source->n_fds;
GPollFD old_fds[wrapper_source->n_fds];
int i;
ret = g_main_context_prepare (wrapper_source->thread_main_context,
&wrapper_source->priority);
if (old_n_fds > 0)
memcpy (old_fds, wrapper_source->fds, sizeof (GPollFD) * old_n_fds);
wrapper_source->n_fds =
g_main_context_query (wrapper_source->thread_main_context,
INT_MAX,
timeout,
wrapper_source->fds,
G_N_ELEMENTS (wrapper_source->fds));
if (wrapper_source->n_fds == old_n_fds &&
old_n_fds > 0 &&
memcmp (old_fds, wrapper_source->fds, old_n_fds * sizeof (GPollFD)) == 0)
return ret;
for (i = 0; i < old_n_fds; i++)
g_source_remove_unix_fd (source, wrapper_source->fd_tags[i]);
for (i = 0; i < wrapper_source->n_fds; i++)
{
wrapper_source->fd_tags[i] =
g_source_add_unix_fd (source,
wrapper_source->fds[i].fd,
wrapper_source->fds[i].events);
}
return ret;
}
static gboolean
wrapper_source_check (GSource *source)
{
WrapperSource *wrapper_source = (WrapperSource *) source;
GIOCondition all_revents = 0;
int i;
for (i = 0; i < wrapper_source->n_fds; i++)
{
GIOCondition revents;
revents = g_source_query_unix_fd (source, wrapper_source->fd_tags[i]);
wrapper_source->fds[i].revents = revents;
all_revents |= revents;
}
return !!all_revents;
}
static gboolean
wrapper_source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
WrapperSource *wrapper_source = (WrapperSource *) source;
g_source_set_priority (source, MIN (0, wrapper_source->priority));
if (g_main_context_check (wrapper_source->thread_main_context,
wrapper_source->priority,
wrapper_source->fds,
wrapper_source->n_fds))
g_main_context_dispatch (wrapper_source->thread_main_context);
return G_SOURCE_CONTINUE;
}
static void
wrapper_source_finalize (GSource *source)
{
}
static GSourceFuncs wrapper_source_funcs = {
.prepare = wrapper_source_prepare,
.check = wrapper_source_check,
.dispatch = wrapper_source_dispatch,
.finalize = wrapper_source_finalize,
};
static void
wrap_main_context (MetaThread *thread,
GMainContext *thread_main_context)
{
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
g_autoptr (GSource) source = NULL;
WrapperSource *wrapper_source;
if (!g_main_context_acquire (thread_main_context))
g_return_if_reached ();
source = g_source_new (&wrapper_source_funcs,
sizeof (WrapperSource));
wrapper_source = (WrapperSource *) source;
wrapper_source->thread_main_context = thread_main_context;
g_source_set_ready_time (source, -1);
g_source_attach (source, NULL);
priv->wrapper_source = source;
}
static void
unwrap_main_context (MetaThread *thread,
GMainContext *thread_main_context)
{
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
g_main_context_release (thread_main_context);
g_clear_pointer (&priv->wrapper_source, g_source_destroy);
}
static void
start_thread (MetaThread *thread)
{
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
switch (priv->thread_type)
{
case META_THREAD_TYPE_USER:
wrap_main_context (thread,
meta_thread_impl_get_main_context (priv->impl));
break;
case META_THREAD_TYPE_KERNEL:
g_mutex_init (&priv->kernel.init_mutex);
g_mutex_lock (&priv->kernel.init_mutex);
priv->kernel.thread = g_thread_new (priv->name,
thread_impl_func,
thread);
g_mutex_unlock (&priv->kernel.init_mutex);
break;
}
}
static gboolean
meta_thread_initable_init (GInitable *initable,
GCancellable *cancellable,
@ -193,15 +350,7 @@ meta_thread_initable_init (GInitable *initable,
NULL, (GDestroyNotify) g_source_destroy);
meta_thread_register_callback_context (thread, priv->main_context);
switch (priv->thread_type)
{
case META_THREAD_TYPE_USER:
thread_context = g_main_context_ref (priv->main_context);
break;
case META_THREAD_TYPE_KERNEL:
thread_context = g_main_context_new ();
break;
}
g_assert (g_type_is_a (class_priv->impl_type, META_TYPE_THREAD_IMPL));
priv->impl = g_object_new (class_priv->impl_type,
@ -209,19 +358,7 @@ meta_thread_initable_init (GInitable *initable,
"main-context", thread_context,
NULL);
switch (priv->thread_type)
{
case META_THREAD_TYPE_USER:
break;
case META_THREAD_TYPE_KERNEL:
g_mutex_init (&priv->kernel.init_mutex);
g_mutex_lock (&priv->kernel.init_mutex);
priv->kernel.thread = g_thread_new (priv->name,
thread_impl_func,
thread);
g_mutex_unlock (&priv->kernel.init_mutex);
break;
}
start_thread (thread);
return TRUE;
}
@ -238,6 +375,7 @@ finalize_thread_user (MetaThread *thread)
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
while (meta_thread_impl_dispatch (priv->impl) > 0);
unwrap_main_context (thread, meta_thread_impl_get_main_context (priv->impl));
}
static void
@ -251,11 +389,9 @@ finalize_thread_kernel (MetaThread *thread)
g_mutex_clear (&priv->kernel.init_mutex);
}
static void
meta_thread_finalize (GObject *object)
tear_down_thread (MetaThread *thread)
{
MetaThread *thread = META_THREAD (object);
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
switch (priv->thread_type)
@ -269,12 +405,22 @@ meta_thread_finalize (GObject *object)
}
meta_thread_flush_callbacks (thread);
}
static void
meta_thread_finalize (GObject *object)
{
MetaThread *thread = META_THREAD (object);
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
tear_down_thread (thread);
meta_thread_unregister_callback_context (thread, priv->main_context);
g_warn_if_fail (g_hash_table_size (priv->callback_sources) == 0);
g_clear_object (&priv->impl);
g_clear_pointer (&priv->name, g_free);
g_warn_if_fail (g_hash_table_size (priv->callback_sources) == 0);
g_mutex_clear (&priv->callbacks_mutex);
G_OBJECT_CLASS (meta_thread_parent_class)->finalize (object);
@ -341,6 +487,34 @@ meta_thread_class_register_impl_type (MetaThreadClass *thread_class,
class_priv->impl_type = impl_type;
}
void
meta_thread_reset_thread_type (MetaThread *thread,
MetaThreadType thread_type)
{
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
g_autoptr (GMainContext) thread_context = NULL;
if (priv->thread_type == thread_type)
return;
tear_down_thread (thread);
g_assert (!priv->wrapper_source);
priv->thread_type = thread_type;
start_thread (thread);
switch (priv->thread_type)
{
case META_THREAD_TYPE_USER:
g_assert (priv->wrapper_source);
break;
case META_THREAD_TYPE_KERNEL:
g_assert (!priv->wrapper_source);
break;
}
}
static int
dispatch_callbacks (MetaThread *thread,
GList *pending_callbacks)
@ -364,21 +538,45 @@ void
meta_thread_flush_callbacks (MetaThread *thread)
{
MetaThreadPrivate *priv = meta_thread_get_instance_private (thread);
g_autoptr (GList) pending_callbacks = NULL;
MetaThreadCallbackSource *main_callback_source;
GSource *source;
g_autoptr (GPtrArray) main_thread_sources = NULL;
g_autoptr (GList) callback_sources = NULL;
GList *l;
g_assert (!g_main_context_get_thread_default ());
main_thread_sources = g_ptr_array_new ();
source = g_hash_table_lookup (priv->callback_sources,
priv->main_context);
g_ptr_array_add (main_thread_sources, source);
switch (priv->thread_type)
{
case META_THREAD_TYPE_USER:
source =
g_hash_table_lookup (priv->callback_sources,
meta_thread_impl_get_main_context (priv->impl));
g_ptr_array_add (main_thread_sources, source);
break;
case META_THREAD_TYPE_KERNEL:
break;
}
while (TRUE)
{
g_autoptr (GList) pending_callbacks = NULL;
gboolean needs_reflush = FALSE;
int i;
g_mutex_lock (&priv->callbacks_mutex);
main_callback_source = g_hash_table_lookup (priv->callback_sources,
priv->main_context);
pending_callbacks = g_steal_pointer (&main_callback_source->callbacks);
for (i = 0; i < main_thread_sources->len; i++)
{
MetaThreadCallbackSource *source =
g_ptr_array_index (main_thread_sources, i);
pending_callbacks =
g_list_concat (pending_callbacks,
g_steal_pointer (&source->callbacks));
}
callback_sources = g_hash_table_get_values (priv->callback_sources);
g_mutex_unlock (&priv->callbacks_mutex);
@ -390,7 +588,7 @@ meta_thread_flush_callbacks (MetaThread *thread)
{
MetaThreadCallbackSource *callback_source = l->data;
if (callback_source == main_callback_source)
if (g_ptr_array_find (main_thread_sources, callback_source, NULL))
continue;
g_mutex_lock (&callback_source->mutex);

View File

@ -54,6 +54,10 @@ typedef void (* MetaThreadTaskFeedbackFunc) (gpointer retval,
const GError *error,
gpointer user_data);
META_EXPORT_TEST
void meta_thread_reset_thread_type (MetaThread *thread,
MetaThreadType thread_type);
META_EXPORT_TEST
void meta_thread_register_callback_context (MetaThread *thread,
GMainContext *main_context);

View File

@ -1051,6 +1051,74 @@ meta_test_thread_kernel_run_task_off_thread (void)
meta_test_thread_run_task_off_thread_common (META_THREAD_TYPE_KERNEL);
}
static gpointer
assert_not_thread (MetaThreadImpl *thread_impl,
gpointer user_data,
GError **error)
{
GThread **thread_to_check = user_data;
g_assert (g_steal_pointer (thread_to_check) != g_thread_self ());
return NULL;
}
static gpointer
assert_thread (MetaThreadImpl *thread_impl,
gpointer user_data,
GError **error)
{
GThread **thread_to_check = user_data;
g_assert (g_steal_pointer (thread_to_check) == g_thread_self ());
return NULL;
}
static void
meta_test_thread_change_thread_type (void)
{
MetaBackend *backend = meta_context_get_backend (test_context);
MetaThread *thread;
g_autoptr (GError) error = NULL;
GThread *main_thread;
GThread *test_thread;
thread = g_initable_new (META_TYPE_THREAD_TEST,
NULL, &error,
"backend", backend,
"name", "test late callback",
"thread-type", META_THREAD_TYPE_KERNEL,
NULL);
g_object_add_weak_pointer (G_OBJECT (thread), (gpointer *) &thread);
g_assert_nonnull (thread);
g_assert_null (error);
main_thread = g_thread_self ();
test_thread = main_thread;
meta_thread_post_impl_task (thread, assert_not_thread, &test_thread, NULL,
NULL, NULL);
meta_thread_reset_thread_type (thread, META_THREAD_TYPE_USER);
g_assert_null (test_thread);
test_thread = main_thread;
meta_thread_post_impl_task (thread, assert_thread, &test_thread, NULL,
NULL, NULL);
meta_thread_reset_thread_type (thread, META_THREAD_TYPE_KERNEL);
g_assert_null (test_thread);
test_thread = main_thread;
meta_thread_post_impl_task (thread, assert_not_thread, &test_thread, NULL,
NULL, NULL);
g_object_unref (thread);
g_assert_null (thread);
g_assert_null (test_thread);
}
static void
init_tests (void)
{
@ -1066,6 +1134,8 @@ init_tests (void)
meta_test_thread_user_run_task_off_thread);
g_test_add_func ("/backends/native/thread/kernel/run-task-off-thread",
meta_test_thread_kernel_run_task_off_thread);
g_test_add_func ("/backends/native/thread/change-thread-type",
meta_test_thread_change_thread_type);
}
int