502 lines
18 KiB
Python
Executable File
502 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import warnings
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
|
|
from bb import fetch2
|
|
import logging
|
|
import bb
|
|
import select
|
|
import errno
|
|
import signal
|
|
import pickle
|
|
import traceback
|
|
import queue
|
|
from multiprocessing import Lock
|
|
from threading import Thread
|
|
|
|
if sys.getfilesystemencoding() != "utf-8":
|
|
sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
|
|
|
|
# Users shouldn't be running this code directly
|
|
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
|
|
print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
|
|
sys.exit(1)
|
|
|
|
profiling = False
|
|
if sys.argv[1].startswith("decafbadbad"):
|
|
profiling = True
|
|
try:
|
|
import cProfile as profile
|
|
except:
|
|
import profile
|
|
|
|
# Unbuffer stdout to avoid log truncation in the event
|
|
# of an unorderly exit as well as to provide timely
|
|
# updates to log files for use with tail
|
|
try:
|
|
if sys.stdout.name == '<stdout>':
|
|
import fcntl
|
|
fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
|
|
fl |= os.O_SYNC
|
|
fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
|
|
#sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
|
|
except:
|
|
pass
|
|
|
|
logger = logging.getLogger("BitBake")
|
|
|
|
worker_pipe = sys.stdout.fileno()
|
|
bb.utils.nonblockingfd(worker_pipe)
|
|
# Need to guard against multiprocessing being used in child processes
|
|
# and multiple processes trying to write to the parent at the same time
|
|
worker_pipe_lock = None
|
|
|
|
handler = bb.event.LogHandler()
|
|
logger.addHandler(handler)
|
|
|
|
if 0:
|
|
# Code to write out a log file of all events passing through the worker
|
|
logfilename = "/tmp/workerlogfile"
|
|
format_str = "%(levelname)s: %(message)s"
|
|
conlogformat = bb.msg.BBLogFormatter(format_str)
|
|
consolelog = logging.FileHandler(logfilename)
|
|
bb.msg.addDefaultlogFilter(consolelog)
|
|
consolelog.setFormatter(conlogformat)
|
|
logger.addHandler(consolelog)
|
|
|
|
worker_queue = queue.Queue()
|
|
|
|
def worker_fire(event, d):
|
|
data = b"<event>" + pickle.dumps(event) + b"</event>"
|
|
worker_fire_prepickled(data)
|
|
|
|
def worker_fire_prepickled(event):
|
|
global worker_queue
|
|
|
|
worker_queue.put(event)
|
|
|
|
#
|
|
# We can end up with write contention with the cooker, it can be trying to send commands
|
|
# and we can be trying to send event data back. Therefore use a separate thread for writing
|
|
# back data to cooker.
|
|
#
|
|
worker_thread_exit = False
|
|
|
|
def worker_flush(worker_queue):
|
|
worker_queue_int = b""
|
|
global worker_pipe, worker_thread_exit
|
|
|
|
while True:
|
|
try:
|
|
worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
|
|
except queue.Empty:
|
|
pass
|
|
while (worker_queue_int or not worker_queue.empty()):
|
|
try:
|
|
(_, ready, _) = select.select([], [worker_pipe], [], 1)
|
|
if not worker_queue.empty():
|
|
worker_queue_int = worker_queue_int + worker_queue.get()
|
|
written = os.write(worker_pipe, worker_queue_int)
|
|
worker_queue_int = worker_queue_int[written:]
|
|
except (IOError, OSError) as e:
|
|
if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
|
|
raise
|
|
if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
|
|
return
|
|
|
|
worker_thread = Thread(target=worker_flush, args=(worker_queue,))
|
|
worker_thread.start()
|
|
|
|
def worker_child_fire(event, d):
|
|
global worker_pipe
|
|
global worker_pipe_lock
|
|
|
|
data = b"<event>" + pickle.dumps(event) + b"</event>"
|
|
try:
|
|
worker_pipe_lock.acquire()
|
|
worker_pipe.write(data)
|
|
worker_pipe_lock.release()
|
|
except IOError:
|
|
sigterm_handler(None, None)
|
|
raise
|
|
|
|
bb.event.worker_fire = worker_fire
|
|
|
|
lf = None
|
|
#lf = open("/tmp/workercommandlog", "w+")
|
|
def workerlog_write(msg):
|
|
if lf:
|
|
lf.write(msg)
|
|
lf.flush()
|
|
|
|
def sigterm_handler(signum, frame):
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
os.killpg(0, signal.SIGTERM)
|
|
sys.exit()
|
|
|
|
def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
|
|
# We need to setup the environment BEFORE the fork, since
|
|
# a fork() or exec*() activates PSEUDO...
|
|
|
|
envbackup = {}
|
|
fakeenv = {}
|
|
umask = None
|
|
|
|
taskdep = workerdata["taskdeps"][fn]
|
|
if 'umask' in taskdep and taskname in taskdep['umask']:
|
|
# umask might come in as a number or text string..
|
|
try:
|
|
umask = int(taskdep['umask'][taskname],8)
|
|
except TypeError:
|
|
umask = taskdep['umask'][taskname]
|
|
|
|
dry_run = cfg.dry_run or dry_run_exec
|
|
|
|
# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
|
|
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
|
|
envvars = (workerdata["fakerootenv"][fn] or "").split()
|
|
for key, value in (var.split('=') for var in envvars):
|
|
envbackup[key] = os.environ.get(key)
|
|
os.environ[key] = value
|
|
fakeenv[key] = value
|
|
|
|
fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
|
|
for p in fakedirs:
|
|
bb.utils.mkdirhier(p)
|
|
logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
|
|
(fn, taskname, ', '.join(fakedirs)))
|
|
else:
|
|
envvars = (workerdata["fakerootnoenv"][fn] or "").split()
|
|
for key, value in (var.split('=') for var in envvars):
|
|
envbackup[key] = os.environ.get(key)
|
|
os.environ[key] = value
|
|
fakeenv[key] = value
|
|
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
try:
|
|
pipein, pipeout = os.pipe()
|
|
pipein = os.fdopen(pipein, 'rb', 4096)
|
|
pipeout = os.fdopen(pipeout, 'wb', 0)
|
|
pid = os.fork()
|
|
except OSError as e:
|
|
logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
|
|
sys.exit(1)
|
|
|
|
if pid == 0:
|
|
def child():
|
|
global worker_pipe
|
|
global worker_pipe_lock
|
|
pipein.close()
|
|
|
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
|
# Let SIGHUP exit as SIGTERM
|
|
signal.signal(signal.SIGHUP, sigterm_handler)
|
|
bb.utils.signal_on_parent_exit("SIGTERM")
|
|
|
|
# Save out the PID so that the event can include it the
|
|
# events
|
|
bb.event.worker_pid = os.getpid()
|
|
bb.event.worker_fire = worker_child_fire
|
|
worker_pipe = pipeout
|
|
worker_pipe_lock = Lock()
|
|
|
|
# Make the child the process group leader and ensure no
|
|
# child process will be controlled by the current terminal
|
|
# This ensures signals sent to the controlling terminal like Ctrl+C
|
|
# don't stop the child processes.
|
|
os.setsid()
|
|
# No stdin
|
|
newsi = os.open(os.devnull, os.O_RDWR)
|
|
os.dup2(newsi, sys.stdin.fileno())
|
|
|
|
if umask:
|
|
os.umask(umask)
|
|
|
|
try:
|
|
bb_cache = bb.cache.NoCache(databuilder)
|
|
(realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
|
|
the_data = databuilder.mcdata[mc]
|
|
the_data.setVar("BB_WORKERCONTEXT", "1")
|
|
the_data.setVar("BB_TASKDEPDATA", taskdepdata)
|
|
if cfg.limited_deps:
|
|
the_data.setVar("BB_LIMITEDDEPS", "1")
|
|
the_data.setVar("BUILDNAME", workerdata["buildname"])
|
|
the_data.setVar("DATE", workerdata["date"])
|
|
the_data.setVar("TIME", workerdata["time"])
|
|
for varname, value in extraconfigdata.items():
|
|
the_data.setVar(varname, value)
|
|
|
|
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
|
|
ret = 0
|
|
|
|
the_data = bb_cache.loadDataFull(fn, appends)
|
|
the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
|
|
|
|
bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
|
|
|
|
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
|
|
# successfully. We also need to unset anything from the environment which shouldn't be there
|
|
exports = bb.data.exported_vars(the_data)
|
|
|
|
bb.utils.empty_environment()
|
|
for e, v in exports:
|
|
os.environ[e] = v
|
|
|
|
for e in fakeenv:
|
|
os.environ[e] = fakeenv[e]
|
|
the_data.setVar(e, fakeenv[e])
|
|
the_data.setVarFlag(e, 'export', "1")
|
|
|
|
task_exports = the_data.getVarFlag(taskname, 'exports')
|
|
if task_exports:
|
|
for e in task_exports.split():
|
|
the_data.setVarFlag(e, 'export', '1')
|
|
v = the_data.getVar(e)
|
|
if v is not None:
|
|
os.environ[e] = v
|
|
|
|
if quieterrors:
|
|
the_data.setVarFlag(taskname, "quieterrors", "1")
|
|
|
|
except Exception:
|
|
if not quieterrors:
|
|
logger.critical(traceback.format_exc())
|
|
os._exit(1)
|
|
try:
|
|
if dry_run:
|
|
return 0
|
|
return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
|
|
except:
|
|
os._exit(1)
|
|
if not profiling:
|
|
os._exit(child())
|
|
else:
|
|
profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
|
|
prof = profile.Profile()
|
|
try:
|
|
ret = profile.Profile.runcall(prof, child)
|
|
finally:
|
|
prof.dump_stats(profname)
|
|
bb.utils.process_profilelog(profname)
|
|
os._exit(ret)
|
|
else:
|
|
for key, value in iter(envbackup.items()):
|
|
if value is None:
|
|
del os.environ[key]
|
|
else:
|
|
os.environ[key] = value
|
|
|
|
return pid, pipein, pipeout
|
|
|
|
class runQueueWorkerPipe():
|
|
"""
|
|
Abstraction for a pipe between a worker thread and the worker server
|
|
"""
|
|
def __init__(self, pipein, pipeout):
|
|
self.input = pipein
|
|
if pipeout:
|
|
pipeout.close()
|
|
bb.utils.nonblockingfd(self.input)
|
|
self.queue = b""
|
|
|
|
def read(self):
|
|
start = len(self.queue)
|
|
try:
|
|
self.queue = self.queue + (self.input.read(102400) or b"")
|
|
except (OSError, IOError) as e:
|
|
if e.errno != errno.EAGAIN:
|
|
raise
|
|
|
|
end = len(self.queue)
|
|
index = self.queue.find(b"</event>")
|
|
while index != -1:
|
|
worker_fire_prepickled(self.queue[:index+8])
|
|
self.queue = self.queue[index+8:]
|
|
index = self.queue.find(b"</event>")
|
|
return (end > start)
|
|
|
|
def close(self):
|
|
while self.read():
|
|
continue
|
|
if len(self.queue) > 0:
|
|
print("Warning, worker child left partial message: %s" % self.queue)
|
|
self.input.close()
|
|
|
|
normalexit = False
|
|
|
|
class BitbakeWorker(object):
|
|
def __init__(self, din):
|
|
self.input = din
|
|
bb.utils.nonblockingfd(self.input)
|
|
self.queue = b""
|
|
self.cookercfg = None
|
|
self.databuilder = None
|
|
self.data = None
|
|
self.extraconfigdata = None
|
|
self.build_pids = {}
|
|
self.build_pipes = {}
|
|
|
|
signal.signal(signal.SIGTERM, self.sigterm_exception)
|
|
# Let SIGHUP exit as SIGTERM
|
|
signal.signal(signal.SIGHUP, self.sigterm_exception)
|
|
if "beef" in sys.argv[1]:
|
|
bb.utils.set_process_name("Worker (Fakeroot)")
|
|
else:
|
|
bb.utils.set_process_name("Worker")
|
|
|
|
def sigterm_exception(self, signum, stackframe):
|
|
if signum == signal.SIGTERM:
|
|
bb.warn("Worker received SIGTERM, shutting down...")
|
|
elif signum == signal.SIGHUP:
|
|
bb.warn("Worker received SIGHUP, shutting down...")
|
|
self.handle_finishnow(None)
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
|
|
def serve(self):
|
|
while True:
|
|
(ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
|
|
if self.input in ready:
|
|
try:
|
|
r = self.input.read()
|
|
if len(r) == 0:
|
|
# EOF on pipe, server must have terminated
|
|
self.sigterm_exception(signal.SIGTERM, None)
|
|
self.queue = self.queue + r
|
|
except (OSError, IOError):
|
|
pass
|
|
if len(self.queue):
|
|
self.handle_item(b"cookerconfig", self.handle_cookercfg)
|
|
self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
|
|
self.handle_item(b"workerdata", self.handle_workerdata)
|
|
self.handle_item(b"runtask", self.handle_runtask)
|
|
self.handle_item(b"finishnow", self.handle_finishnow)
|
|
self.handle_item(b"ping", self.handle_ping)
|
|
self.handle_item(b"quit", self.handle_quit)
|
|
|
|
for pipe in self.build_pipes:
|
|
if self.build_pipes[pipe].input in ready:
|
|
self.build_pipes[pipe].read()
|
|
if len(self.build_pids):
|
|
while self.process_waitpid():
|
|
continue
|
|
|
|
|
|
def handle_item(self, item, func):
|
|
if self.queue.startswith(b"<" + item + b">"):
|
|
index = self.queue.find(b"</" + item + b">")
|
|
while index != -1:
|
|
func(self.queue[(len(item) + 2):index])
|
|
self.queue = self.queue[(index + len(item) + 3):]
|
|
index = self.queue.find(b"</" + item + b">")
|
|
|
|
def handle_cookercfg(self, data):
|
|
self.cookercfg = pickle.loads(data)
|
|
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
|
|
self.databuilder.parseBaseConfiguration()
|
|
self.data = self.databuilder.data
|
|
|
|
def handle_extraconfigdata(self, data):
|
|
self.extraconfigdata = pickle.loads(data)
|
|
|
|
def handle_workerdata(self, data):
|
|
self.workerdata = pickle.loads(data)
|
|
bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
|
|
bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
|
|
bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
|
|
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
|
|
for mc in self.databuilder.mcdata:
|
|
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
|
|
|
|
def handle_ping(self, _):
|
|
workerlog_write("Handling ping\n")
|
|
|
|
logger.warning("Pong from bitbake-worker!")
|
|
|
|
def handle_quit(self, data):
|
|
workerlog_write("Handling quit\n")
|
|
|
|
global normalexit
|
|
normalexit = True
|
|
sys.exit(0)
|
|
|
|
def handle_runtask(self, data):
|
|
fn, task, taskname, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
|
|
workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
|
|
|
|
pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
|
|
|
|
self.build_pids[pid] = task
|
|
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
|
|
|
|
def process_waitpid(self):
|
|
"""
|
|
Return none is there are no processes awaiting result collection, otherwise
|
|
collect the process exit codes and close the information pipe.
|
|
"""
|
|
try:
|
|
pid, status = os.waitpid(-1, os.WNOHANG)
|
|
if pid == 0 or os.WIFSTOPPED(status):
|
|
return False
|
|
except OSError:
|
|
return False
|
|
|
|
workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
|
|
|
|
if os.WIFEXITED(status):
|
|
status = os.WEXITSTATUS(status)
|
|
elif os.WIFSIGNALED(status):
|
|
# Per shell conventions for $?, when a process exits due to
|
|
# a signal, we return an exit code of 128 + SIGNUM
|
|
status = 128 + os.WTERMSIG(status)
|
|
|
|
task = self.build_pids[pid]
|
|
del self.build_pids[pid]
|
|
|
|
self.build_pipes[pid].close()
|
|
del self.build_pipes[pid]
|
|
|
|
worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
|
|
|
|
return True
|
|
|
|
def handle_finishnow(self, _):
|
|
if self.build_pids:
|
|
logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
|
|
for k, v in iter(self.build_pids.items()):
|
|
try:
|
|
os.kill(-k, signal.SIGTERM)
|
|
os.waitpid(-1, 0)
|
|
except:
|
|
pass
|
|
for pipe in self.build_pipes:
|
|
self.build_pipes[pipe].read()
|
|
|
|
try:
|
|
worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
|
|
if not profiling:
|
|
worker.serve()
|
|
else:
|
|
profname = "profile-worker.log"
|
|
prof = profile.Profile()
|
|
try:
|
|
profile.Profile.runcall(prof, worker.serve)
|
|
finally:
|
|
prof.dump_stats(profname)
|
|
bb.utils.process_profilelog(profname)
|
|
except BaseException as e:
|
|
if not normalexit:
|
|
import traceback
|
|
sys.stderr.write(traceback.format_exc())
|
|
sys.stderr.write(str(e))
|
|
|
|
worker_thread_exit = True
|
|
worker_thread.join()
|
|
|
|
workerlog_write("exitting")
|
|
sys.exit(0)
|