dbusmock/logind: Mock logind and remove KVM special handling
Part-of: <https://gitlab.gnome.org/GNOME/mutter/-/merge_requests/4022>
This commit is contained in:
@ -105,7 +105,7 @@ variables:
|
||||
- .skip-git-clone
|
||||
variables:
|
||||
FDO_DISTRIBUTION_VERSION: 41
|
||||
BASE_TAG: '2024-12-18.0'
|
||||
BASE_TAG: '2025-01-15.0'
|
||||
MUTTER_USER: 'meta-user'
|
||||
FDO_DISTRIBUTION_PACKAGES:
|
||||
clang
|
||||
|
180
src/tests/dbusmock-templates/logind.py
Normal file
180
src/tests/dbusmock-templates/logind.py
Normal file
@ -0,0 +1,180 @@
|
||||
'''logind proxy mock template
|
||||
'''
|
||||
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU Lesser General Public License as published by the Free
|
||||
# Software Foundation; either version 3 of the License, or (at your option) any
|
||||
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
|
||||
# of the license.
|
||||
|
||||
__author__ = 'Sebastian Wick'
|
||||
__copyright__ = '(c) 2024 Red Hat Inc.'
|
||||
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import dbus
|
||||
from gi.repository import GLib
|
||||
from gi.repository import Gio
|
||||
|
||||
from dbusmock import MOCK_IFACE, mockobject
|
||||
|
||||
BUS_NAME = 'org.freedesktop.login1'
|
||||
MAIN_OBJ = '/org/freedesktop/login1'
|
||||
MAIN_IFACE = 'org.freedesktop.login1.Manager'
|
||||
MANAGER_IFACE = MAIN_IFACE
|
||||
SEAT_IFACE = 'org.freedesktop.login1.Seat'
|
||||
SESSION_IFACE = 'org.freedesktop.login1.Session'
|
||||
PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
||||
SYSTEM_BUS = True
|
||||
|
||||
|
||||
def escape_object_path(path):
|
||||
b = bytearray()
|
||||
b.extend(path.encode())
|
||||
path = Gio.dbus_escape_object_path_bytestring(b)
|
||||
if path[0].isdigit():
|
||||
path = '_{0:02x}{1}'.format(ord(path[0]), path[1:])
|
||||
return os.path.basename(path)
|
||||
|
||||
def find_host_session_id(bus):
|
||||
if 'XDG_SESSION_ID' in os.environ:
|
||||
return escape_object_path(os.environ['XDG_SESSION_ID'])
|
||||
|
||||
auto_session = bus.get_object(BUS_NAME, f'{MAIN_OBJ}/session/auto')
|
||||
session_id = auto_session.Get(SESSION_IFACE, 'Id', dbus_interface=PROP_IFACE)
|
||||
|
||||
manager = bus.get_object(BUS_NAME, MAIN_OBJ)
|
||||
session_path = manager.GetSession(session_id, dbus_interface=MANAGER_IFACE)
|
||||
|
||||
return os.path.basename(session_path)
|
||||
|
||||
def find_host_seat_id(bus, session_id):
|
||||
session = bus.get_object(BUS_NAME, f'{MAIN_OBJ}/session/{session_id}')
|
||||
(seat_id, _) = session.Get(SESSION_IFACE, 'Seat', dbus_interface=PROP_IFACE)
|
||||
return seat_id
|
||||
|
||||
|
||||
class Login1Seat(mockobject.DBusMockObject):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Login1Seat, self).__init__(*args, **kwargs)
|
||||
|
||||
bus = kwargs.get('mock_data')
|
||||
self.host_seat = bus.get_object(self.bus_name.get_name(), self.path) if bus else None
|
||||
|
||||
@staticmethod
|
||||
def add_new(manager, seat_id, host_bus):
|
||||
seat_path = f'{MAIN_OBJ}/seat/{seat_id}'
|
||||
if not seat_path in mockobject.objects:
|
||||
manager.AddObject(seat_path, SEAT_IFACE,
|
||||
{
|
||||
'Id': seat_id,
|
||||
},
|
||||
[],
|
||||
mock_class=Login1Seat,
|
||||
mock_data=host_bus)
|
||||
|
||||
@dbus.service.method(SEAT_IFACE, in_signature='u', out_signature='')
|
||||
def SwitchTo(self, n):
|
||||
if self.host_seat:
|
||||
return self.host_seat.SwitchTo(n, dbus_interface=SEAT_IFACE)
|
||||
# noop
|
||||
|
||||
|
||||
class Login1Session(mockobject.DBusMockObject):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Login1Session, self).__init__(*args, **kwargs)
|
||||
|
||||
bus = kwargs.get('mock_data')
|
||||
self.host_session = bus.get_object(self.bus_name.get_name(), self.path) if bus else None
|
||||
|
||||
@staticmethod
|
||||
def add_new(manager, session_id, seat_id, host_bus):
|
||||
session_path = f'{MAIN_OBJ}/session/{session_id}'
|
||||
seat_path = f'{MAIN_OBJ}/seat/{seat_id}'
|
||||
if not session_path in mockobject.objects:
|
||||
manager.AddObject(session_path, SESSION_IFACE,
|
||||
{
|
||||
'Id': session_id,
|
||||
'Active': True,
|
||||
'Seat': (seat_id, dbus.ObjectPath(seat_path)),
|
||||
},
|
||||
[],
|
||||
mock_class=Login1Session,
|
||||
mock_data=host_bus)
|
||||
|
||||
def open_file_direct(self, major, minor):
|
||||
sysfs_uevent_path = '/sys/dev/char/{}:{}/uevent'.format(major, minor)
|
||||
sysfs_uevent = open(sysfs_uevent_path, 'r')
|
||||
devname = None
|
||||
for line in sysfs_uevent.readlines():
|
||||
match = re.match('DEVNAME=(.*)', line)
|
||||
if match:
|
||||
devname = match[1]
|
||||
break
|
||||
sysfs_uevent.close()
|
||||
if not devname:
|
||||
raise dbus.exceptions.DBusException(f'Device file {major}:{minor} doesn\\\'t exist',
|
||||
major=major, minor=minor)
|
||||
fd = os.open('/dev/' + devname, os.O_RDWR | os.O_CLOEXEC)
|
||||
unix_fd = dbus.types.UnixFd(fd)
|
||||
os.close(fd)
|
||||
return (unix_fd, False)
|
||||
|
||||
@dbus.service.method(SESSION_IFACE, in_signature='uu', out_signature='hb')
|
||||
def TakeDevice(self, major, minor):
|
||||
if self.host_session:
|
||||
return self.host_session.TakeDevice(major, minor,
|
||||
dbus_interface=SESSION_IFACE)
|
||||
return self.open_file_direct (major, minor)
|
||||
|
||||
@dbus.service.method(SESSION_IFACE, in_signature='uu', out_signature='')
|
||||
def ReleaseDevice(self, major, minor):
|
||||
if self.host_session:
|
||||
return self.host_session.ReleaseDevice(major, minor,
|
||||
dbus_interface=SESSION_IFACE)
|
||||
# noop
|
||||
|
||||
@dbus.service.method(SESSION_IFACE, in_signature='b', out_signature='')
|
||||
def TakeControl(self, force):
|
||||
if self.host_session:
|
||||
return self.host_session.TakeControl(force, dbus_interface=SESSION_IFACE)
|
||||
|
||||
@dbus.service.method(SESSION_IFACE, in_signature='', out_signature='')
|
||||
def ReleaseControl(self):
|
||||
if self.host_session:
|
||||
return self.host_session.ReleaseControl(dbus_interface=SESSION_IFACE)
|
||||
# noop
|
||||
|
||||
|
||||
@dbus.service.method(MANAGER_IFACE, in_signature='u', out_signature='o')
|
||||
def GetSessionByPID(self, pid):
|
||||
session_path = f'{MAIN_OBJ}/session/{self.preferred_session_id}'
|
||||
return session_path
|
||||
|
||||
def create_session(self, host_bus):
|
||||
session_id = 'dummy'
|
||||
seat_id = 'seat0'
|
||||
|
||||
if host_bus:
|
||||
session_id = find_host_session_id(host_bus)
|
||||
seat_id = find_host_seat_id(host_bus, session_id)
|
||||
|
||||
if not self.preferred_session_id or host_bus:
|
||||
self.preferred_session_id = session_id
|
||||
|
||||
Login1Seat.add_new(self, seat_id, host_bus)
|
||||
Login1Session.add_new(self, session_id, seat_id, host_bus)
|
||||
|
||||
def load(manager, parameters):
|
||||
try:
|
||||
bus_address = parameters['host_system_bus_address']
|
||||
host_bus = dbus.bus.BusConnection(bus_address)
|
||||
except:
|
||||
host_bus = None
|
||||
|
||||
manager.preferred_session_id = None
|
||||
|
||||
# Try to create a passthrough session
|
||||
create_session(manager, host_bus)
|
@ -50,7 +50,7 @@ foreach test_case: privileged_tests
|
||||
args: [
|
||||
kernel_image_path,
|
||||
meta_dbus_runner.full_path(),
|
||||
'--kvm --',
|
||||
'--',
|
||||
meson.current_build_dir(),
|
||||
vm_env,
|
||||
kvm_wrapper,
|
||||
|
@ -1,47 +0,0 @@
|
||||
import dbus
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
|
||||
host_system_bus_connection = None
|
||||
|
||||
|
||||
def ensure_system_bus(address):
|
||||
global host_system_bus_connection
|
||||
|
||||
if host_system_bus_connection is None:
|
||||
bus = dbus.bus.BusConnection(address)
|
||||
host_system_bus_connection = bus
|
||||
|
||||
return host_system_bus_connection
|
||||
|
||||
|
||||
def open_file_direct(major, minor):
|
||||
sysfs_uevent_path = '/sys/dev/char/{}:{}/uevent'.format(major, minor)
|
||||
sysfs_uevent = open(sysfs_uevent_path, 'r')
|
||||
devname = None
|
||||
for line in sysfs_uevent.readlines():
|
||||
match = re.match('DEVNAME=(.*)', line)
|
||||
if match:
|
||||
devname = match[1]
|
||||
break
|
||||
sysfs_uevent.close()
|
||||
if not devname:
|
||||
raise dbus.exceptions.DBusException(f'Device file {major}:{minor} doesn\\\'t exist',
|
||||
major=major, minor=minor)
|
||||
fd = os.open('/dev/' + devname, os.O_RDWR | os.O_CLOEXEC)
|
||||
unix_fd = dbus.types.UnixFd(fd)
|
||||
os.close(fd)
|
||||
return (unix_fd, False)
|
||||
|
||||
|
||||
def call_host(address, object_path, interface, method, typesig, args):
|
||||
bus = ensure_system_bus(address)
|
||||
|
||||
return bus.call_blocking('org.freedesktop.login1',
|
||||
object_path,
|
||||
interface,
|
||||
method,
|
||||
typesig,
|
||||
args)
|
@ -199,7 +199,6 @@ tests_datadir = pkgdatadir / 'tests'
|
||||
install_data(
|
||||
[
|
||||
'mutter_dbusrunner.py',
|
||||
'logind_helpers.py',
|
||||
'socket-launch.sh',
|
||||
],
|
||||
install_dir: tests_datadir,
|
||||
|
@ -7,7 +7,6 @@ import fcntl
|
||||
import subprocess
|
||||
import getpass
|
||||
import argparse
|
||||
import logind_helpers
|
||||
import tempfile
|
||||
import select
|
||||
import socket
|
||||
@ -27,15 +26,6 @@ class MultiOrderedDict(OrderedDict):
|
||||
else:
|
||||
super(OrderedDict, self).__setitem__(key, value)
|
||||
|
||||
|
||||
def escape_object_path(path):
|
||||
b = bytearray()
|
||||
b.extend(path.encode())
|
||||
path = Gio.dbus_escape_object_path_bytestring(b)
|
||||
if path[0].isdigit():
|
||||
path = "_{0:02x}{1}".format(ord(path[0]), path[1:])
|
||||
return os.path.basename(path)
|
||||
|
||||
def get_subprocess_stdout():
|
||||
if os.getenv('META_DBUS_RUNNER_VERBOSE') == '1':
|
||||
return sys.stderr
|
||||
@ -49,20 +39,14 @@ class MutterDBusRunner(DBusTestCase):
|
||||
return os.path.join(os.path.dirname(__file__), 'dbusmock-templates')
|
||||
|
||||
@classmethod
|
||||
def setUpClass(klass, enable_kvm=False, launch=[], bind_sockets=False):
|
||||
def setUpClass(klass, launch=[], bind_sockets=False):
|
||||
klass.templates_dirs = [klass.__get_templates_dir()]
|
||||
|
||||
klass.mocks = OrderedDict()
|
||||
|
||||
klass.host_system_bus_address = os.getenv('DBUS_SYSTEM_BUS_ADDRESS')
|
||||
if klass.host_system_bus_address is None:
|
||||
klass.host_system_bus_address = 'unix:path=/run/dbus/system_bus_socket'
|
||||
|
||||
try:
|
||||
dbus.bus.BusConnection(klass.host_system_bus_address)
|
||||
klass.has_host_system_bus = True
|
||||
except:
|
||||
klass.has_host_system_bus = False
|
||||
host_system_bus_address = os.getenv('DBUS_SYSTEM_BUS_ADDRESS')
|
||||
if host_system_bus_address is None:
|
||||
host_system_bus_address = 'unix:path=/run/dbus/system_bus_socket'
|
||||
|
||||
print('Starting D-Bus daemons (session & system)...', file=sys.stderr)
|
||||
DBusTestCase.setUpClass()
|
||||
@ -88,12 +72,13 @@ class MutterDBusRunner(DBusTestCase):
|
||||
klass.start_from_local_template('gsd-color')
|
||||
klass.start_from_local_template('rtkit')
|
||||
klass.start_from_local_template('screensaver')
|
||||
klass.start_from_local_template('logind', {
|
||||
'host_system_bus_address': host_system_bus_address,
|
||||
})
|
||||
|
||||
klass.system_bus_con = klass.get_dbus(system_bus=True)
|
||||
klass.session_bus_con = klass.get_dbus(system_bus=False)
|
||||
|
||||
klass.init_logind(enable_kvm)
|
||||
|
||||
if klass.session_bus_con.name_has_owner('org.gnome.Mutter.DisplayConfig'):
|
||||
raise Exception(
|
||||
'org.gnome.Mutter.DisplayConfig already has owner on the session bus, bailing')
|
||||
@ -133,15 +118,6 @@ class MutterDBusRunner(DBusTestCase):
|
||||
template = klass.find_template(template_file_name)
|
||||
return klass.start_from_template(template, params, system_bus=system_bus)
|
||||
|
||||
@classmethod
|
||||
def start_from_template_managed(klass, template):
|
||||
klass.mock_obj.StartFromTemplate(template)
|
||||
|
||||
@classmethod
|
||||
def start_from_local_template_managed(klass, template_file_name):
|
||||
template = klass.find_template(template_file_name)
|
||||
klass.mock_obj.StartFromLocalTemplate(template)
|
||||
|
||||
@classmethod
|
||||
def start_from_class(klass, mock_class, params={}):
|
||||
mock_server = \
|
||||
@ -158,116 +134,6 @@ class MutterDBusRunner(DBusTestCase):
|
||||
mocks = (mock_server, mock_obj)
|
||||
return mocks
|
||||
|
||||
def wrap_logind_call(call):
|
||||
code = \
|
||||
f'''
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, '{os.path.dirname(__file__)}')
|
||||
import logind_helpers
|
||||
|
||||
{call}
|
||||
'''
|
||||
return code
|
||||
|
||||
@classmethod
|
||||
def forward_to_host(klass, object_path, interface, method, in_type, out_type):
|
||||
proxy = klass.system_bus_con.get_object('org.freedesktop.login1',
|
||||
object_path)
|
||||
proxy.AddMethod(interface, method, in_type, out_type,
|
||||
f'''
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, '{os.path.dirname(__file__)}')
|
||||
import logind_helpers
|
||||
|
||||
ret = logind_helpers.call_host('{klass.host_system_bus_address}',
|
||||
'{object_path}',
|
||||
'{interface}',
|
||||
'{method}',
|
||||
'{in_type}',
|
||||
args)
|
||||
''')
|
||||
|
||||
@classmethod
|
||||
def init_logind_forward(klass, session_path, seat_path):
|
||||
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
|
||||
'TakeDevice',
|
||||
'uu', 'hb')
|
||||
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
|
||||
'ReleaseDevice',
|
||||
'uu', '')
|
||||
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
|
||||
'TakeDevice',
|
||||
'uu', 'hb')
|
||||
klass.forward_to_host(session_path, 'org.freedesktop.login1.Session',
|
||||
'TakeControl',
|
||||
'b', '')
|
||||
klass.forward_to_host(seat_path, 'org.freedesktop.login1.Seat',
|
||||
'SwitchTo',
|
||||
'u', '')
|
||||
|
||||
@classmethod
|
||||
def init_logind_kvm(klass, session_path):
|
||||
session_obj = klass.system_bus_con.get_object('org.freedesktop.login1', session_path)
|
||||
session_obj.AddMethod('org.freedesktop.login1.Session',
|
||||
'TakeDevice',
|
||||
'uu', 'hb',
|
||||
klass.wrap_logind_call(
|
||||
f'''
|
||||
major = args[0]
|
||||
minor = args[1]
|
||||
ret = logind_helpers.open_file_direct(major, minor)
|
||||
'''))
|
||||
session_obj.AddMethods('org.freedesktop.login1.Session', [
|
||||
('ReleaseDevice', 'uu', '', ''),
|
||||
('TakeControl', 'b', '', ''),
|
||||
])
|
||||
|
||||
|
||||
@classmethod
|
||||
def find_host_session_name(klass):
|
||||
if 'XDG_SESSION_ID' in os.environ:
|
||||
return escape_object_path(os.environ['XDG_SESSION_ID'])
|
||||
|
||||
bus = dbus.bus.BusConnection(klass.host_system_bus_address)
|
||||
session_auto_proxy = bus.get_object('org.freedesktop.login1',
|
||||
'/org/freedesktop/login1/session/auto')
|
||||
props = dbus.Interface(session_auto_proxy,
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
session_id = props.Get('org.freedesktop.login1.Session', 'Id')
|
||||
manager_proxy = bus.get_object('org.freedesktop.login1',
|
||||
'/org/freedesktop/login1')
|
||||
manager = dbus.Interface(manager_proxy,
|
||||
dbus_interface='org.freedesktop.login1.Manager')
|
||||
session_path = manager.GetSession(session_id)
|
||||
return os.path.basename(session_path)
|
||||
|
||||
@classmethod
|
||||
def init_logind(klass, enable_kvm):
|
||||
logind = klass.start_from_template('logind')
|
||||
|
||||
[p_mock, obj] = logind
|
||||
|
||||
mock_iface = 'org.freedesktop.DBus.Mock'
|
||||
seat_path = obj.AddSeat('seat0', dbus_interface=mock_iface)
|
||||
session_name = 'dummy'
|
||||
if klass.has_host_system_bus:
|
||||
session_name = klass.find_host_session_name()
|
||||
|
||||
session_path = obj.AddSession(session_name, 'seat0',
|
||||
dbus.types.UInt32(os.getuid()),
|
||||
getpass.getuser(),
|
||||
True,
|
||||
dbus_interface=mock_iface)
|
||||
|
||||
if enable_kvm:
|
||||
klass.init_logind_kvm(session_path)
|
||||
elif klass.has_host_system_bus:
|
||||
klass.init_logind_forward(session_path, seat_path)
|
||||
|
||||
@classmethod
|
||||
def add_template_dir(klass, templates_dir):
|
||||
klass.templates_dirs += [templates_dir]
|
||||
@ -373,6 +239,7 @@ def wrap_call(args, wrapper, extra_env):
|
||||
env['GSETTINGS_BACKEND'] = 'memory'
|
||||
env['XDG_CURRENT_DESKTOP'] = ''
|
||||
env['META_DBUS_RUNNER_ACTIVE'] = '1'
|
||||
env.pop('XDG_SESSION_ID', None)
|
||||
|
||||
if extra_env:
|
||||
env |= extra_env
|
||||
@ -393,7 +260,6 @@ def meta_run(klass, extra_env=None, setup_argparse=None, handle_argparse=None):
|
||||
DBusGMainLoop(set_as_default=True)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--kvm', action='store_true', default=False)
|
||||
parser.add_argument('--launch', action='append', default=[])
|
||||
parser.add_argument('--no-isolate-dirs', action='store_true', default=False)
|
||||
if setup_argparse:
|
||||
@ -413,7 +279,6 @@ def meta_run(klass, extra_env=None, setup_argparse=None, handle_argparse=None):
|
||||
|
||||
if args.no_isolate_dirs:
|
||||
return meta_run_klass(klass, rest,
|
||||
enable_kvm=args.kvm,
|
||||
extra_env=extra_env)
|
||||
|
||||
test_root = os.getenv('MUTTER_DBUS_RUNNER_TEST_ROOT')
|
||||
@ -441,17 +306,15 @@ def meta_run(klass, extra_env=None, setup_argparse=None, handle_argparse=None):
|
||||
print('Setup', env_dir, 'as', directory, file=sys.stderr)
|
||||
|
||||
return meta_run_klass(klass, rest,
|
||||
enable_kvm=args.kvm,
|
||||
launch=args.launch,
|
||||
bind_sockets=True,
|
||||
extra_env=extra_env)
|
||||
|
||||
def meta_run_klass(klass, rest, enable_kvm=False, launch=[], bind_sockets=False, extra_env=None):
|
||||
def meta_run_klass(klass, rest, launch=[], bind_sockets=False, extra_env=None):
|
||||
result = 1
|
||||
|
||||
if os.getenv('META_DBUS_RUNNER_ACTIVE') == None:
|
||||
klass.setUpClass(enable_kvm=enable_kvm,
|
||||
launch=launch,
|
||||
klass.setUpClass(launch=launch,
|
||||
bind_sockets=bind_sockets)
|
||||
runner = klass()
|
||||
runner.assertGreater(len(rest), 0)
|
||||
|
Reference in New Issue
Block a user