thread: Support queuing callbacks on any thread

Callbacks could be queued to be invoked either on the impl side or the
main thread side of the thread; change this to take a GMainContext,
which effectively means a callback can be queued to be invoked on any
thread that has a GMainLoop running on its own GMainContext.

Flushing is made to handle flushing callbacks synchronously on all
threads. This works by keeping a hash table of queued callbacks per
thread (GMainContext); when flushing (from the main thread), callbacks
on the main thread context is flushed, followed by synchronization with
all the other threads.

meta_thread_flush_callbacks() is changed to no longer return the number
of dispatched callbacks; it becomes much harder when there are N queues
spread across multiple threads. Since it wasn't used for anything, just
drop the counting, making life slightly easier.

Feedback to thread tasks are however always queued on the callers
thread.

Part-of: <https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/2777>
This commit is contained in:
Jonas Ådahl
2021-07-09 19:16:58 +02:00
parent ae48019cce
commit f83c30bae5
6 changed files with 504 additions and 52 deletions

View File

@@ -104,6 +104,7 @@ queue_callback_func (MetaThreadImpl *thread_impl,
*state = 1;
meta_thread_queue_callback (meta_thread_impl_get_thread (thread_impl),
NULL,
callback_func,
user_data,
user_data_destroy);
@@ -415,6 +416,215 @@ mixed_sync_func (MetaThreadImpl *thread_impl,
return GINT_TO_POINTER (2);
}
typedef struct
{
MetaThread *thread;
gboolean registered;
GThread *gthread;
GMutex init_mutex;
GCond init_cond;
GMainContext *main_context;
GMainLoop *main_loop;
int sleep_s;
int state;
} FlushData;
typedef struct
{
GMainLoop *loop;
int use_count;
} LoopUser;
static gpointer
blocking_flush_thread_func (gpointer user_data)
{
FlushData *flush_data = user_data;
meta_thread_register_callback_context (flush_data->thread,
flush_data->main_context);
g_mutex_lock (&flush_data->init_mutex);
flush_data->registered = TRUE;
g_cond_signal (&flush_data->init_cond);
g_mutex_unlock (&flush_data->init_mutex);
flush_data->main_loop = g_main_loop_new (flush_data->main_context, FALSE);
g_main_loop_run (flush_data->main_loop);
g_clear_pointer (&flush_data->main_loop, g_main_loop_unref);
meta_thread_unregister_callback_context (flush_data->thread,
flush_data->main_context);
return GINT_TO_POINTER (TRUE);
}
static void
slow_callback (MetaThread *thread,
gpointer user_data)
{
FlushData *flush_data = user_data;
g_assert_cmpint (flush_data->state, ==, 1);
flush_data->state = 2;
sleep (flush_data->sleep_s);
g_assert_cmpint (flush_data->state, ==, 2);
flush_data->state = 3;
g_main_loop_quit (flush_data->main_loop);
}
static gpointer
queue_slow_callback (MetaThreadImpl *thread_impl,
gpointer user_data,
GError **error)
{
FlushData *flush_data = user_data;
g_mutex_lock (&flush_data->init_mutex);
g_mutex_unlock (&flush_data->init_mutex);
g_assert_cmpint (flush_data->state, ==, 0);
flush_data->state = 1;
meta_thread_queue_callback (meta_thread_impl_get_thread (thread_impl),
flush_data->main_context,
slow_callback,
flush_data,
NULL);
return GINT_TO_POINTER (TRUE);
}
static void
quit_main_loop_feedback_func (gpointer retval,
const GError *error,
gpointer user_data)
{
LoopUser *loop_user = user_data;
g_assert_cmpint (loop_user->use_count, >, 0);
loop_user->use_count--;
if (loop_user->use_count == 0)
g_main_loop_quit (loop_user->loop);
}
typedef struct
{
GThread *gthread;
GMutex init_mutex;
MetaThread *thread;
GMainLoop *main_thread_loop;
GMainContext *thread_main_context;
GMainLoop *thread_loop;
int state;
} CallbackData;
static void
non_default_thread_callback_func (MetaThread *thread,
gpointer user_data)
{
CallbackData *callback_data = user_data;
g_assert (g_thread_self () == callback_data->gthread);
g_assert_cmpint (callback_data->state, ==, 3);
callback_data->state = 4;
}
static void
callback_destroy_cb (gpointer user_data)
{
CallbackData *callback_data = user_data;
g_assert (g_thread_self () == callback_data->gthread);
g_assert_cmpint (callback_data->state, ==, 4);
callback_data->state = 5;
}
static gpointer
queue_non_default_callback_func (MetaThreadImpl *thread_impl,
gpointer user_data,
GError **error)
{
CallbackData *callback_data = user_data;
meta_assert_in_thread_impl (meta_thread_impl_get_thread (thread_impl));
g_assert_cmpint (callback_data->state, ==, 2);
callback_data->state = 3;
meta_thread_queue_callback (meta_thread_impl_get_thread (thread_impl),
callback_data->thread_main_context,
non_default_thread_callback_func,
callback_data,
callback_destroy_cb);
return GINT_TO_POINTER (42);
}
static void
non_default_thread_feedback_func (gpointer retval,
const GError *error,
gpointer user_data)
{
CallbackData *callback_data = user_data;
g_assert (g_thread_self () == callback_data->gthread);
g_assert_cmpint (callback_data->state, ==, 5);
callback_data->state = 6;
g_assert_cmpint (GPOINTER_TO_INT (retval), ==, 42);
g_assert_null (error);
g_main_loop_quit (callback_data->thread_loop);
}
static gpointer
non_default_callback_thread_func (gpointer user_data)
{
CallbackData *callback_data = user_data;
g_mutex_lock (&callback_data->init_mutex);
g_mutex_unlock (&callback_data->init_mutex);
g_assert_cmpint (callback_data->state, ==, 1);
callback_data->state = 2;
callback_data->thread_main_context = g_main_context_new ();
g_main_context_push_thread_default (callback_data->thread_main_context);
callback_data->thread_loop =
g_main_loop_new (callback_data->thread_main_context, FALSE);
meta_thread_register_callback_context (callback_data->thread,
callback_data->thread_main_context);
meta_thread_post_impl_task (callback_data->thread,
queue_non_default_callback_func, callback_data,
non_default_thread_feedback_func,
callback_data);
g_main_loop_run (callback_data->thread_loop);
g_main_loop_unref (callback_data->thread_loop);
g_assert_cmpint (callback_data->state, ==, 6);
callback_data->state = 7;
g_main_loop_quit (callback_data->main_thread_loop);
meta_thread_unregister_callback_context (callback_data->thread,
callback_data->thread_main_context);
g_main_context_pop_thread_default (callback_data->thread_main_context);
g_main_context_unref (callback_data->thread_main_context);
return GINT_TO_POINTER (TRUE);
}
static void
run_thread_tests (MetaThread *thread)
{
@@ -428,6 +638,10 @@ run_thread_tests (MetaThread *thread)
IdleData idle_data;
AsyncData async_data;
MixedData mixed_data;
FlushData flush_data1;
FlushData flush_data2;
LoopUser loop_user;
CallbackData callback_data;
meta_assert_not_in_thread_impl (thread);
@@ -543,6 +757,81 @@ run_thread_tests (MetaThread *thread)
g_assert_cmpint (mixed_data.state, ==, 3);
g_mutex_unlock (&mixed_data.mutex);
g_mutex_clear (&mixed_data.mutex);
/* Blocking flush. */
g_debug ("Test blocking flush");
loop_user = (LoopUser) {
.loop = g_main_loop_new (NULL, FALSE),
.use_count = 2,
};
flush_data1 = (FlushData) {
.thread = thread,
.sleep_s = 3,
};
g_mutex_init (&flush_data1.init_mutex);
g_cond_init (&flush_data1.init_cond);
g_mutex_lock (&flush_data1.init_mutex);
flush_data1.main_context = g_main_context_new ();
flush_data1.gthread = g_thread_new ("blocking-flush-thread #1",
blocking_flush_thread_func,
&flush_data1);
while (!flush_data1.registered)
g_cond_wait (&flush_data1.init_cond, &flush_data1.init_mutex);
g_mutex_unlock (&flush_data1.init_mutex);
meta_thread_post_impl_task (thread,
queue_slow_callback,
&flush_data1,
quit_main_loop_feedback_func,
&loop_user);
flush_data2 = (FlushData) {
.thread = thread,
.sleep_s = 2,
};
g_mutex_init (&flush_data2.init_mutex);
g_mutex_lock (&flush_data2.init_mutex);
flush_data2.main_context = g_main_context_new ();
flush_data2.gthread = g_thread_new ("blocking-flush-thread #2",
blocking_flush_thread_func,
&flush_data2);
while (!flush_data2.registered)
g_cond_wait (&flush_data2.init_cond, &flush_data2.init_mutex);
g_mutex_unlock (&flush_data2.init_mutex);
meta_thread_post_impl_task (thread,
queue_slow_callback,
&flush_data2,
quit_main_loop_feedback_func,
&loop_user);
g_main_loop_run (loop_user.loop);
g_clear_pointer (&loop_user.loop, g_main_loop_unref);
meta_thread_flush_callbacks (thread);
g_assert_cmpint (flush_data1.state, ==, 3);
g_assert_cmpint (flush_data2.state, ==, 3);
g_thread_join (flush_data1.gthread);
g_main_context_unref (flush_data1.main_context);
g_thread_join (flush_data2.gthread);
g_main_context_unref (flush_data2.main_context);
/* Callbacks to non-default thread. */
g_debug ("Test callbacks to non-default thread");
callback_data = (CallbackData) {};
callback_data.thread = thread;
callback_data.main_thread_loop = g_main_loop_new (NULL, FALSE);
g_mutex_init (&callback_data.init_mutex);
g_mutex_lock (&callback_data.init_mutex);
callback_data.gthread =
g_thread_new ("test-non-default-callback-thread",
non_default_callback_thread_func, &callback_data);
callback_data.state = 1;
g_mutex_unlock (&callback_data.init_mutex);
g_main_loop_run (callback_data.main_thread_loop);
g_main_loop_unref (callback_data.main_thread_loop);
g_thread_join (callback_data.gthread);
g_mutex_clear (&callback_data.init_mutex);
g_assert_cmpint (callback_data.state, ==, 7);
}
static void