Source code
Revision control
Copy as Markdown
Other Tools
# A unix-oriented process dispatcher. Uses a single thread with select and
# waitpid to dispatch tasks. This avoids several deadlocks that are possible
# with fork/exec + threads + Python.
import errno
import os
import select
import signal
import sys
from collections import deque
from datetime import datetime, timedelta
from .progressbar import ProgressBar
from .results import NullTestOutput, TestOutput, escape_cmdline
class Task:
def __init__(self, test, prefix, tempdir, pid, stdout, stderr):
self.test = test
self.cmd = test.get_command(prefix, tempdir)
self.pid = pid
self.stdout = stdout
self.stderr = stderr
self.start = datetime.now()
self.out = []
self.err = []
def spawn_test(test, prefix, tempdir, passthrough, run_skipped, show_cmd):
"""Spawn one child, return a task struct."""
if not test.enable and not run_skipped:
return None
cmd = test.get_command(prefix, tempdir)
if show_cmd:
print(escape_cmdline(cmd))
if passthrough:
os.execvp(cmd[0], cmd)
return
(rout, wout) = os.pipe()
(rerr, werr) = os.pipe()
file_actions = [
(os.POSIX_SPAWN_CLOSE, rout),
(os.POSIX_SPAWN_CLOSE, rerr),
(os.POSIX_SPAWN_DUP2, wout, 1),
(os.POSIX_SPAWN_DUP2, werr, 2),
]
pid = os.posix_spawnp(cmd[0], cmd, os.environ, file_actions=file_actions)
os.close(wout)
os.close(werr)
return Task(test, prefix, tempdir, pid, rout, rerr)
def get_max_wait(tasks, timeout):
"""
Return the maximum time we can wait before any task should time out.
"""
# If we have a progress-meter, we need to wake up to update it frequently.
wait = ProgressBar.update_granularity()
# If a timeout is supplied, we need to wake up for the first task to
# timeout if that is sooner.
if timeout:
now = datetime.now()
timeout_delta = timedelta(seconds=timeout)
for task in tasks:
remaining = task.start + timeout_delta - now
wait = min(wait, remaining)
# Return the wait time in seconds, clamped between zero and max_wait.
return max(wait.total_seconds(), 0)
def flush_input(fd, frags):
"""
Read any pages sitting in the file descriptor 'fd' into the list 'frags'.
"""
rv = os.read(fd, 4096)
frags.append(rv)
while len(rv) == 4096:
# If read() returns a full buffer, it may indicate there was 1 buffer
# worth of data, or that there is more data to read. Poll the socket
# before we read again to ensure that we will not block indefinitly.
readable, _, _ = select.select([fd], [], [], 0)
if not readable:
return
rv = os.read(fd, 4096)
frags.append(rv)
def read_input(tasks, timeout):
"""
Select on input or errors from the given task list for a max of timeout
seconds.
"""
rlist = []
exlist = []
outmap = {} # Fast access to fragment list given fd.
for t in tasks:
rlist.append(t.stdout)
rlist.append(t.stderr)
outmap[t.stdout] = t.out
outmap[t.stderr] = t.err
# This will trigger with a close event when the child dies, allowing
# us to respond immediately and not leave cores idle.
exlist.append(t.stdout)
readable = []
try:
readable, _, _ = select.select(rlist, [], exlist, timeout)
except OverflowError:
print >> sys.stderr, "timeout value", timeout
raise
for fd in readable:
flush_input(fd, outmap[fd])
def remove_task(tasks, pid):
"""
Remove a task from the tasks list and return it.
"""
index = None
for i, t in enumerate(tasks):
if t.pid == pid:
index = i
break
else:
raise KeyError(f"No such pid: {pid}")
out = tasks[index]
tasks.pop(index)
return out
def timed_out(task, timeout):
"""
Return a timedelta with the amount we are overdue, or False if the timeout
has not yet been reached (or timeout is falsy, indicating there is no
timeout.)
"""
if not timeout:
return False
elapsed = datetime.now() - task.start
over = elapsed - timedelta(seconds=timeout)
return over if over.total_seconds() > 0 else False
def reap_zombies(tasks, timeout):
"""
Search for children of this process that have finished. If they are tasks,
then this routine will clean up the child. This method returns a new task
list that has had the ended tasks removed, followed by the list of finished
tasks.
"""
finished = []
while True:
try:
pid, status = os.waitpid(0, os.WNOHANG)
if pid == 0:
break
except OSError as e:
if e.errno == errno.ECHILD:
break
raise e
ended = remove_task(tasks, pid)
flush_input(ended.stdout, ended.out)
flush_input(ended.stderr, ended.err)
os.close(ended.stdout)
os.close(ended.stderr)
returncode = os.WEXITSTATUS(status)
if os.WIFSIGNALED(status):
returncode = -os.WTERMSIG(status)
finished.append(
TestOutput(
ended.test,
ended.cmd,
b"".join(ended.out).decode("utf-8", "replace"),
b"".join(ended.err).decode("utf-8", "replace"),
returncode,
(datetime.now() - ended.start).total_seconds(),
timed_out(ended, timeout),
{"pid": ended.pid},
)
)
return tasks, finished
def kill_undead(tasks, timeout):
"""
Signal all children that are over the given timeout. Use SIGABRT first to
generate a stack dump. If it still doesn't die for another 30 seconds, kill
with SIGKILL.
"""
for task in tasks:
over = timed_out(task, timeout)
if over:
if over.total_seconds() < 30:
os.kill(task.pid, signal.SIGABRT)
else:
os.kill(task.pid, signal.SIGKILL)
def run_all_tests(tests, prefix, tempdir, pb, options):
max_parallel_heavy_tasks = 1
# Updated by the main function, consulted by the `scheduler` generator.
num_tasks = 0
num_heavy_tasks = 0
task = None
def scheduler(tests, max_tasks):
# Deques of tests seen but not yet spawned, indexed by "weight"
# (a boolean, heavy vs light).
pending = (deque(), deque())
pending_heavy = pending[True] # More readable name
tests = iter(tests)
xdr_mode = "off"
def with_xdr(test, mode):
test.selfhosted_xdr_mode = mode
return test
if options.use_xdr:
xdr_mode = "encode"
# Find a light test to spawn. Use it for XDR encoding,
for test in tests:
if test.heavy:
pending_heavy.append(test)
else:
yield ("spawn", with_xdr(test, "encode"))
if task is not None:
xdr_mode = "decode"
break
# Fall back to using a heavy task.
while xdr_mode != "decode":
if not pending_heavy:
return # No runnable tests.
test = pending_heavy.popleft()
yield ("spawn", with_xdr(test, "encode"))
if task is not None:
xdr_mode = "decode"
# Wait for the encoding task to complete.
yield ("wait-all", None)
# Walk through tests and spawn them, but keep a full set of heavy tasks
# running whenever possible (start them eagerly, and scan ahead to find
# more when the pending list runs out). Wait whenever spawning a task
# would exceed `max_tasks`.
while True:
if num_tasks >= max_tasks:
# Task pool is full.
yield ("wait-one", None)
continue
# The index within pending[], which is whether we want a heavy job.
weight = num_heavy_tasks < max_parallel_heavy_tasks
# If a test with the desired weight is pending, spawn it.
if pending[weight]:
test = pending[weight].popleft()
yield ("spawn", with_xdr(test, xdr_mode))
continue
# Scan for a test with the desired weight.
for test in tests:
pending[test.heavy].append(test)
if test.heavy == weight:
break
else:
# No tests of this weight left, so spawn one of the others.
other = not weight
if pending[other]:
if other:
# We wanted a light but only have heavy. But wanting a
# light means too many heavies are running. Wait.
assert num_heavy_tasks >= max_parallel_heavy_tasks
yield ("wait-one", None)
test = pending[other].popleft()
yield ("spawn", with_xdr(test, xdr_mode))
else:
assert not any(pending)
break
# All tests have been spawned.
yield ("wait-all", None)
# The set of currently running tests.
tasks = []
for action, test in scheduler(tests, options.worker_count):
if action == "spawn":
if not test.enable and not options.run_skipped:
task = None
yield NullTestOutput(test)
continue
task = spawn_test(
test,
prefix,
tempdir,
options.passthrough,
options.run_skipped,
options.show_cmd,
)
if task:
tasks.append(task)
if test.heavy:
num_heavy_tasks += 1
num_tasks += 1
else:
yield NullTestOutput(test)
continue
assert action.startswith("wait-")
wait_until_down_to = 0 if action == "wait-all" else num_tasks - 1
while num_tasks > wait_until_down_to:
timeout = get_max_wait(tasks, options.timeout)
read_input(tasks, timeout)
kill_undead(tasks, options.timeout)
tasks, finished = reap_zombies(tasks, options.timeout)
for out in finished:
yield out
if out.test.heavy:
num_heavy_tasks -= 1
num_tasks -= 1
# If we did not finish any tasks, poke the progress bar to show that
# the test harness is at least not frozen.
if len(finished) == 0:
pb.poke()