dbusmock/logind: Mock logind and remove KVM special handling

Part-of: <https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/4022>
This commit is contained in:
Sebastian Wick
2024-09-16 20:22:22 +02:00
parent e1aeb86bc9
commit 98b347b204
6 changed files with 192 additions and 197 deletions

View File

@ -105,7 +105,7 @@ variables:
- .skip-git-clone
variables:
FDO_DISTRIBUTION_VERSION: 41
BASE_TAG: '2024-12-18.0'
BASE_TAG: '2025-01-15.0'
MUTTER_USER: 'meta-user'
FDO_DISTRIBUTION_PACKAGES:
clang

View File

@ -0,0 +1,180 @@
'''logind proxy mock template
'''
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.
__author__ = 'Sebastian Wick'
__copyright__ = '(c) 2024 Red Hat Inc.'
import os
import re
from pathlib import Path
import dbus
from gi.repository import GLib
from gi.repository import Gio
from dbusmock import MOCK_IFACE, mockobject
BUS_NAME = 'org.freedesktop.login1'
MAIN_OBJ = '/org/freedesktop/login1'
MAIN_IFACE = 'org.freedesktop.login1.Manager'
MANAGER_IFACE = MAIN_IFACE
SEAT_IFACE = 'org.freedesktop.login1.Seat'
SESSION_IFACE = 'org.freedesktop.login1.Session'
PROP_IFACE = 'org.freedesktop.DBus.Properties'
SYSTEM_BUS = True
def escape_object_path(path):
b = bytearray()
b.extend(path.encode())
path = Gio.dbus_escape_object_path_bytestring(b)
if path[0].isdigit():
path = '_{0:02x}{1}'.format(ord(path[0]), path[1:])
return os.path.basename(path)
def find_host_session_id(bus):
if 'XDG_SESSION_ID' in os.environ:
return escape_object_path(os.environ['XDG_SESSION_ID'])
auto_session = bus.get_object(BUS_NAME, f'{MAIN_OBJ}/session/auto')
session_id = auto_session.Get(SESSION_IFACE, 'Id', dbus_interface=PROP_IFACE)
manager = bus.get_object(BUS_NAME, MAIN_OBJ)
session_path = manager.GetSession(session_id, dbus_interface=MANAGER_IFACE)
return os.path.basename(session_path)
def find_host_seat_id(bus, session_id):
session = bus.get_object(BUS_NAME, f'{MAIN_OBJ}/session/{session_id}')
(seat_id, _) = session.Get(SESSION_IFACE, 'Seat', dbus_interface=PROP_IFACE)
return seat_id
class Login1Seat(mockobject.DBusMockObject):
def __init__(self, *args, **kwargs):
super(Login1Seat, self).__init__(*args, **kwargs)
bus = kwargs.get('mock_data')
self.host_seat = bus.get_object(self.bus_name.get_name(), self.path) if bus else None
@staticmethod
def add_new(manager, seat_id, host_bus):
seat_path = f'{MAIN_OBJ}/seat/{seat_id}'
if not seat_path in mockobject.objects:
manager.AddObject(seat_path, SEAT_IFACE,
{
'Id': seat_id,
},
[],
mock_class=Login1Seat,
mock_data=host_bus)
@dbus.service.method(SEAT_IFACE, in_signature='u', out_signature='')
def SwitchTo(self, n):
if self.host_seat:
return self.host_seat.SwitchTo(n, dbus_interface=SEAT_IFACE)
# noop
class Login1Session(mockobject.DBusMockObject):
def __init__(self, *args, **kwargs):
super(Login1Session, self).__init__(*args, **kwargs)
bus = kwargs.get('mock_data')
self.host_session = bus.get_object(self.bus_name.get_name(), self.path) if bus else None
@staticmethod
def add_new(manager, session_id, seat_id, host_bus):
session_path = f'{MAIN_OBJ}/session/{session_id}'
seat_path = f'{MAIN_OBJ}/seat/{seat_id}'
if not session_path in mockobject.objects:
manager.AddObject(session_path, SESSION_IFACE,
{
'Id': session_id,
'Active': True,
'Seat': (seat_id, dbus.ObjectPath(seat_path)),
},
[],
mock_class=Login1Session,
mock_data=host_bus)
def open_file_direct(self, major, minor):
sysfs_uevent_path = '/sys/dev/char/{}:{}/uevent'.format(major, minor)
sysfs_uevent = open(sysfs_uevent_path, 'r')
devname = None
for line in sysfs_uevent.readlines():
match = re.match('DEVNAME=(.*)', line)
if match:
devname = match[1]
break
sysfs_uevent.close()
if not devname:
raise dbus.exceptions.DBusException(f'Device file {major}:{minor} doesn\\\'t exist',
major=major, minor=minor)
fd = os.open('/dev/' + devname, os.O_RDWR | os.O_CLOEXEC)
unix_fd = dbus.types.UnixFd(fd)
os.close(fd)
return (unix_fd, False)
@dbus.service.method(SESSION_IFACE, in_signature='uu', out_signature='hb')
def TakeDevice(self, major, minor):
if self.host_session:
return self.host_session.TakeDevice(major, minor,
dbus_interface=SESSION_IFACE)
return self.open_file_direct (major, minor)
@dbus.service.method(SESSION_IFACE, in_signature='uu', out_signature='')
def ReleaseDevice(self, major, minor):
if self.host_session:
return self.host_session.ReleaseDevice(major, minor,
dbus_interface=SESSION_IFACE)
# noop
@dbus.service.method(SESSION_IFACE, in_signature='b', out_signature='')
def TakeControl(self, force):
if self.host_session:
return self.host_session.TakeControl(force, dbus_interface=SESSION_IFACE)
@dbus.service.method(SESSION_IFACE, in_signature='', out_signature='')
def ReleaseControl(self):
if self.host_session:
return self.host_session.ReleaseControl(dbus_interface=SESSION_IFACE)
# noop
@dbus.service.method(MANAGER_IFACE, in_signature='u', out_signature='o')
def GetSessionByPID(self, pid):
session_path = f'{MAIN_OBJ}/session/{self.preferred_session_id}'
return session_path
def create_session(self, host_bus):
session_id = 'dummy'
seat_id = 'seat0'
if host_bus:
session_id = find_host_session_id(host_bus)
seat_id = find_host_seat_id(host_bus, session_id)
if not self.preferred_session_id or host_bus:
self.preferred_session_id = session_id
Login1Seat.add_new(self, seat_id, host_bus)
Login1Session.add_new(self, session_id, seat_id, host_bus)
def load(manager, parameters):
try:
bus_address = parameters['host_system_bus_address']
host_bus = dbus.bus.BusConnection(bus_address)
except:
host_bus = None
manager.preferred_session_id = None
# Try to create a passthrough session
create_session(manager, host_bus)

View File

@ -50,7 +50,7 @@ foreach test_case: privileged_tests
args: [
kernel_image_path,
meta_dbus_runner.full_path(),
'--kvm --',
'--',
meson.current_build_dir(),
vm_env,
kvm_wrapper,

View File

@ -1,47 +0,0 @@
import dbus
import os
import re
import sys
host_system_bus_connection = None
def ensure_system_bus(address):
global host_system_bus_connection
if host_system_bus_connection is None:
bus = dbus.bus.BusConnection(address)
host_system_bus_connection = bus
return host_system_bus_connection
def open_file_direct(major, minor):
sysfs_uevent_path = '/sys/dev/char/{}:{}/uevent'.format(major, minor)
sysfs_uevent = open(sysfs_uevent_path, 'r')
devname = None
for line in sysfs_uevent.readlines():
match = re.match('DEVNAME=(.*)', line)
if match:
devname = match[1]
break
sysfs_uevent.close()
if not devname:
raise dbus.exceptions.DBusException(f'Device file {major}:{minor} doesn\\\'t exist',
major=major, minor=minor)
fd = os.open('/dev/' + devname, os.O_RDWR | os.O_CLOEXEC)
unix_fd = dbus.types.UnixFd(fd)
os.close(fd)
return (unix_fd, False)
def call_host(address, object_path, interface, method, typesig, args):
bus = ensure_system_bus(address)
return bus.call_blocking('org.freedesktop.login1',
object_path,
interface,
method,
typesig,
args)

View File

@ -199,7 +199,6 @@ tests_datadir = pkgdatadir / 'tests'
install_data(
[
'mutter_dbusrunner.py',
'logind_helpers.py',
'socket-launch.sh',
],
install_dir: tests_datadir,

View File

@ -7,7 +7,6 @@ import fcntl
import subprocess
import getpass
import argparse
import logind_helpers
import tempfile
import select
import socket
@ -27,15 +26,6 @@ class MultiOrderedDict(OrderedDict):
else:
super(OrderedDict, self).__setitem__(key, value)
def escape_object_path(path):
b = bytearray()
b.extend(path.encode())
path = Gio.dbus_escape_object_path_bytestring(b)
if path[0].isdigit():
path = "_{0:02x}{1}".format(ord(path[0]), path[1:])
return os.path.basename(path)
def get_subprocess_stdout():
if os.getenv('META_DBUS_RUNNER_VERBOSE') == '1':
return sys.stderr
@ -49,20 +39,14 @@ class MutterDBusRunner(DBusTestCase):
return os.path.join(os.path.dirname(__file__), 'dbusmock-templates')
@classmethod
def setUpClass(klass, enable_kvm=False, launch=[], bind_sockets=False):
def setUpClass(klass, launch=[], bind_sockets=False):
klass.templates_dirs = [klass.__get_templates_dir()]
klass.mocks = OrderedDict()
klass.host_system_bus_address = os.getenv('DBUS_SYSTEM_BUS_ADDRESS')
if klass.host_system_bus_address is None:
klass.host_system_bus_address = 'unix:path=/run/dbus/system_bus_socket'
try:
dbus.bus.BusConnection(klass.host_system_bus_address)
klass.has_host_system_bus = True
except:
klass.has_host_system_bus = False
host_system_bus_address = os.getenv('DBUS_SYSTEM_BUS_ADDRESS')
if host_system_bus_address is None:
host_system_bus_address = 'unix:path=/run/dbus/system_bus_socket'
print('Starting D-Bus daemons (session & system)...', file=sys.stderr)
DBusTestCase.setUpClass()
@ -88,12 +72,13 @@ class MutterDBusRunner(DBusTestCase):
klass.start_from_local_template('gsd-color')
klass.start_from_local_template('rtkit')
klass.start_from_local_template('screensaver')
klass.start_from_local_template('logind', {
'host_system_bus_address': host_system_bus_address,
})
klass.system_bus_con = klass.get_dbus(system_bus=True)
klass.session_bus_con = klass.get_dbus(system_bus=False)
klass.init_logind(enable_kvm)
if klass.session_bus_con.name_has_owner('org.gnome.Mutter.DisplayConfig'):
raise Exception(
'org.gnome.Mutter.DisplayConfig already has owner on the session bus, bailing')
@ -133,15 +118,6 @@ class MutterDBusRunner(DBusTestCase):
template = klass.find_template(template_file_name)
return klass.start_from_template(template, params, system_bus=system_bus)
@classmethod
def start_from_template_managed(klass, template):
klass.mock_obj.StartFromTemplate(template)
@classmethod
def start_from_local_template_managed(klass, template_file_name):
template = klass.find_template(template_file_name)
klass.mock_obj.StartFromLocalTemplate(template)
@classmethod
def start_from_class(klass, mock_class, params={}):
mock_server = \
@ -158,116 +134,6 @@ class MutterDBusRunner(DBusTestCase):
mocks = (mock_server, mock_obj)
return mocks
def wrap_logind_call(call):
code = \
f'''
import os
import sys
sys.path.insert(0, '{os.path.dirname(__file__)}')
import logind_helpers
{call}
'''
return code
@classmethod
def forward_to_host(klass, object_path, interface, method, in_type, out_type):
proxy = klass.system_bus_con.get_object('org.freedesktop.login1',
object_path)
proxy.AddMethod(interface, method, in_type, out_type,
f'''
import os
import sys
sys.path.insert(0, '{os.path.dirname(__file__)}')
import logind_helpers
ret = logind_helpers.call_host('{klass.host_system_bus_address}',
'{object_path}',
'{interface}',
'{method}',
'{in_type}',
args)
''')
@classmethod
def init_logind_forward(klass, session_path, seat_path):
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
'TakeDevice',
'uu', 'hb')
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
'ReleaseDevice',
'uu', '')
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
'TakeDevice',
'uu', 'hb')
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
'TakeControl',
'b', '')
klass.forward_to_host(seat_path, 'org.freedesktop.login1.Seat',
'SwitchTo',
'u', '')
@classmethod
def init_logind_kvm(klass, session_path):
session_obj = klass.system_bus_con.get_object('org.freedesktop.login1', session_path)
session_obj.AddMethod('org.freedesktop.login1.Session',
'TakeDevice',
'uu', 'hb',
klass.wrap_logind_call(
f'''
major = args[0]
minor = args[1]
ret = logind_helpers.open_file_direct(major, minor)
'''))
session_obj.AddMethods('org.freedesktop.login1.Session', [
('ReleaseDevice', 'uu', '', ''),
('TakeControl', 'b', '', ''),
])
@classmethod
def find_host_session_name(klass):
if 'XDG_SESSION_ID' in os.environ:
return escape_object_path(os.environ['XDG_SESSION_ID'])
bus = dbus.bus.BusConnection(klass.host_system_bus_address)
session_auto_proxy = bus.get_object('org.freedesktop.login1',
'/org/freedesktop/login1/session/auto')
props = dbus.Interface(session_auto_proxy,
dbus_interface='org.freedesktop.DBus.Properties')
session_id = props.Get('org.freedesktop.login1.Session', 'Id')
manager_proxy = bus.get_object('org.freedesktop.login1',
'/org/freedesktop/login1')
manager = dbus.Interface(manager_proxy,
dbus_interface='org.freedesktop.login1.Manager')
session_path = manager.GetSession(session_id)
return os.path.basename(session_path)
@classmethod
def init_logind(klass, enable_kvm):
logind = klass.start_from_template('logind')
[p_mock, obj] = logind
mock_iface = 'org.freedesktop.DBus.Mock'
seat_path = obj.AddSeat('seat0', dbus_interface=mock_iface)
session_name = 'dummy'
if klass.has_host_system_bus:
session_name = klass.find_host_session_name()
session_path = obj.AddSession(session_name, 'seat0',
dbus.types.UInt32(os.getuid()),
getpass.getuser(),
True,
dbus_interface=mock_iface)
if enable_kvm:
klass.init_logind_kvm(session_path)
elif klass.has_host_system_bus:
klass.init_logind_forward(session_path, seat_path)
@classmethod
def add_template_dir(klass, templates_dir):
klass.templates_dirs += [templates_dir]
@ -373,6 +239,7 @@ def wrap_call(args, wrapper, extra_env):
env['GSETTINGS_BACKEND'] = 'memory'
env['XDG_CURRENT_DESKTOP'] = ''
env['META_DBUS_RUNNER_ACTIVE'] = '1'
env.pop('XDG_SESSION_ID', None)
if extra_env:
env |= extra_env
@ -393,7 +260,6 @@ def meta_run(klass, extra_env=None, setup_argparse=None, handle_argparse=None):
DBusGMainLoop(set_as_default=True)
parser = argparse.ArgumentParser()
parser.add_argument('--kvm', action='store_true', default=False)
parser.add_argument('--launch', action='append', default=[])
parser.add_argument('--no-isolate-dirs', action='store_true', default=False)
if setup_argparse:
@ -413,7 +279,6 @@ def meta_run(klass, extra_env=None, setup_argparse=None, handle_argparse=None):
if args.no_isolate_dirs:
return meta_run_klass(klass, rest,
enable_kvm=args.kvm,
extra_env=extra_env)
test_root = os.getenv('MUTTER_DBUS_RUNNER_TEST_ROOT')
@ -441,17 +306,15 @@ def meta_run(klass, extra_env=None, setup_argparse=None, handle_argparse=None):
print('Setup', env_dir, 'as', directory, file=sys.stderr)
return meta_run_klass(klass, rest,
enable_kvm=args.kvm,
launch=args.launch,
bind_sockets=True,
extra_env=extra_env)
def meta_run_klass(klass, rest, enable_kvm=False, launch=[], bind_sockets=False, extra_env=None):
def meta_run_klass(klass, rest, launch=[], bind_sockets=False, extra_env=None):
result = 1
if os.getenv('META_DBUS_RUNNER_ACTIVE') == None:
klass.setUpClass(enable_kvm=enable_kvm,
launch=launch,
klass.setUpClass(launch=launch,
bind_sockets=bind_sockets)
runner = klass()
runner.assertGreater(len(rest), 0)