Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 137 additions & 17 deletions src/pytest_forked/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
import os
import sys
import warnings

import py
import tempfile
import marshal
import pytest
from _pytest import runner
import multiprocessing

# we know this bit is bad, but we cant help it with the current pytest setup


# copied from xdist remote
def serialize_report(rep):
import py

d = rep.__dict__.copy()
if hasattr(rep.longrepr, "toterminal"):
d["longrepr"] = str(rep.longrepr)
else:
d["longrepr"] = rep.longrepr
for name in d:
if isinstance(d[name], py.path.local):
d[name] = str(d[name])
if isinstance(d[name], os.PathLike):
d[name] = os.fspath(d[name])
elif name == "result":
d[name] = None # for now
return d
Expand Down Expand Up @@ -55,13 +54,61 @@ def pytest_runtest_protocol(item):
return True


class _ForkedResult:
"""Mimics py.process.ForkedFunc result object."""
def __init__(self):
self.retval = None
self.exitstatus = 0
self.signal = 0
self.out = ""
self.err = ""


def _worker(runforked_fn, stdout_path, stderr_path, retval_path):
"""
Child process entry point.
Redirects OS-level fds 1 and 2 to files before running the test,
so output is captured even if the process is killed by a signal.
"""
EXITSTATUS_EXCEPTION = 3

# Redirect stdout/stderr at the OS fd level (survives hard crashes)
stdout_fd = os.open(stdout_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
stderr_fd = os.open(stderr_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.dup2(stdout_fd, 1)
os.dup2(stderr_fd, 2)
os.close(stdout_fd)
os.close(stderr_fd)

# redirect Python-level streams so print() etc. work
sys.stdout = open(stdout_path, "w", buffering=1)
sys.stderr = open(stderr_path, "w", buffering=1)

try:
retval = runforked_fn()
with open(retval_path, "wb") as f:
f.write(retval)
except KeyboardInterrupt:
os._exit(4) # EXITSTATUS_TESTEXIT
except SystemExit as e:
code = e.code if e.code is not None else 0
os._exit(int(code))
except Exception:
os._exit(EXITSTATUS_EXCEPTION)
finally:
try:
sys.stdout.flush()
sys.stderr.flush()
except Exception:
pass

os._exit(0)


def forked_run_report(item):
# for now, we run setup/teardown in the subprocess
# XXX optionally allow sharing of setup/teardown
from _pytest.runner import runtestprotocol

EXITSTATUS_TESTEXIT = 4
import marshal

def runforked():
try:
Expand All @@ -70,8 +117,55 @@ def runforked():
os._exit(EXITSTATUS_TESTEXIT)
return marshal.dumps([serialize_report(x) for x in reports])

ff = py.process.ForkedFunc(runforked)
result = ff.waitfinish()
# Use temp files for stdout/stderr — captured at OS fd level, so they
# survive a SIGKILL/SIGTERM just like the original ForkedFunc did.
with tempfile.TemporaryDirectory() as tmpdir:
stdout_path = os.path.join(tmpdir, "stdout")
stderr_path = os.path.join(tmpdir, "stderr")
retval_path = os.path.join(tmpdir, "retval")

# Pre-create files so reads don't fail if child never writes
open(stdout_path, "w").close()
open(stderr_path, "w").close()

proc = multiprocessing.Process(
target=_worker,
args=(runforked, stdout_path, stderr_path, retval_path),
)
proc.start()
proc.join()

result = _ForkedResult()
result.exitstatus = proc.exitcode if proc.exitcode is not None else 0

# Decode signal number from exit code the same way waitpid does:
# multiprocessing sets exitcode = -signum for signal-killed children
if proc.exitcode is not None and proc.exitcode < 0:
result.signal = -proc.exitcode

# Read captured output — available even after a crash
try:
with open(stdout_path, "r") as f:
result.out = f.read()
except OSError:
result.out = ""

try:
with open(stderr_path, "r") as f:
result.err = f.read()
except OSError:
result.err = ""

# Read return value only if child exited cleanly (no signal, no error)
if result.signal == 0 and result.exitstatus == 0:
try:
with open(retval_path, "rb") as f:
retval_data = f.read()
if retval_data:
result.retval = retval_data
except OSError:
result.retval = None

if result.retval is not None:
report_dumps = marshal.loads(result.retval)
return [runner.TestReport(**x) for x in report_dumps]
Expand All @@ -82,24 +176,45 @@ def runforked():


def report_process_crash(item, result):
from _pytest._code import getfslineno
import signal as signal_module

path, lineno = getfslineno(item)
# getfslineno returns -1 when called from the parent process on an item
# whose source is only resolvable in the child. Use the item's own
# location (nodeid path + fspath) which is always populated by pytest.
try:
from _pytest._code import getfslineno
path, lineno = getfslineno(item)
if lineno == -1:
raise ValueError("unresolvable")
except Exception:
path = getattr(item, "fspath", None) or item.nodeid.split("::")[0]
lineno = item.location[1] if item.location[1] is not None else 0

if result.signal:
sig_name = signal_module.Signals(result.signal).name
try:
sig_name = signal_module.Signals(result.signal).name
except ValueError:
sig_name = "UNKNOWN"
info = "%s:%s: running the test CRASHED with signal %d (%s)" % (
path,
lineno,
result.signal,
sig_name,
)

info_bare = "%s:%s: running the test CRASHED with signal %d" % (
path,
lineno,
result.signal,
)
else:
info = "%s:%s: running the test EXITED with status %d" % (
path,
lineno,
result.exitstatus,
)
info_bare = info

from _pytest import runner

# pytest >= 4.1
Expand All @@ -120,11 +235,16 @@ def report_process_crash(item, result):
return rep

rep.outcome = "skipped"

xfail_reason = xfail_marker.kwargs.get(
"reason",
xfail_marker.args[0] if xfail_marker.args else "",
)
rep.wasxfail = (
"reason: {xfail_reason}; "
"pytest-forked reason: {crash_info}".format(
xfail_reason=xfail_marker.kwargs["reason"],
crash_info=info,
xfail_reason=xfail_reason,
crash_info=info_bare,
)
)
warnings.warn(
Expand Down