diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index a9d97e47e005dff..7b9845e2197cc2c 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -61,6 +61,8 @@ typedef struct _stack_chunk { PyObject * data[1]; /* Variable sized */ } _PyStackChunk; +struct _timeout_block; + /* Minimum size of data stack chunk */ #define _PY_DATA_STACK_CHUNK_SIZE (16*1024) struct _ts { @@ -253,6 +255,9 @@ struct _ts { /* The interpreter guard owned by PyThreadState_EnsureFromView(), if any. */ PyInterpreterGuard *owned_guard; } ensure; + + uintptr_t cancel_flags; + struct _timeout_block *timeout_block; }; /* other API */ diff --git a/Include/internal/pycore_ceval.h b/Include/internal/pycore_ceval.h index 06c4ca1619d7ce1..afaf0e28a1072bf 100644 --- a/Include/internal/pycore_ceval.h +++ b/Include/internal/pycore_ceval.h @@ -349,9 +349,10 @@ _PyEval_SpecialMethodCanSuggest(PyObject *self, int oparg); #define _PY_EVAL_PLEASE_STOP_BIT (1U << 5) #define _PY_EVAL_EXPLICIT_MERGE_BIT (1U << 6) #define _PY_EVAL_JIT_INVALIDATE_COLD_BIT (1U << 7) +#define _PY_CANCEL_REQUESTED_BIT (1U << 8) /* Reserve a few bits for future use */ -#define _PY_EVAL_EVENTS_BITS 8 +#define _PY_EVAL_EVENTS_BITS 9 #define _PY_EVAL_EVENTS_MASK ((1 << _PY_EVAL_EVENTS_BITS)-1) static inline void diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index ca6819d2cd44730..1545af29e20cad6 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -171,6 +171,23 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime); extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp); extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp); +#define _PY_CANCEL_GENERIC (1U << 0) +#define _PY_CANCEL_TIMEOUT (1U << 1) + +extern void _PyThreadState_RequestCancel(PyThreadState *tstate, + uintptr_t reason); +extern int _PyThreadState_RequestCancelByThreadId(PyInterpreterState *interp, + unsigned long id, + uintptr_t reason); +extern int _PyThreadState_CheckCancellation(PyThreadState *tstate); +extern void _PyThreadState_ClearCancellation(PyThreadState *tstate, + uintptr_t reason); + +extern int _PyTimeout_Push(PyThreadState *tstate, PyTime_t timeout); +extern int _PyTimeout_Pop(PyThreadState *tstate); +extern int _PyTimeout_CheckNow(PyThreadState *tstate); +extern void _PyTimeout_ClearThread(PyThreadState *tstate); + static inline void _Py_EnsureFuncTstateNotNULL(const char *func, PyThreadState *tstate) diff --git a/Include/internal/pycore_runtime_structs.h b/Include/internal/pycore_runtime_structs.h index 145e66de9984ca7..3f990cda407d180 100644 --- a/Include/internal/pycore_runtime_structs.h +++ b/Include/internal/pycore_runtime_structs.h @@ -8,7 +8,9 @@ extern "C" { #endif #include "pycore_interp_structs.h" // _PyGC_Head_UNUSED +#include "pycore_condvar.h" // PyCOND_T #include "pycore_obmalloc.h" // struct _obmalloc_global_state +#include "pycore_pythread.h" // PyThread_handle_t /************ Runtime state ************/ @@ -87,6 +89,20 @@ struct _Py_time_runtime_state { #endif }; +struct _timeout_block; + +struct _timeout_scheduler_state { + PyMUTEX_T mutex; + PyCOND_T cond; + PyThread_ident_t ident; + PyThread_handle_t handle; + int initialized; + int running; + int stopping; + struct _timeout_block *head; + _PyOnceFlag once; +}; + struct _Py_cached_objects { // XXX We could statically allocate the hashtable. @@ -270,6 +286,7 @@ struct pyruntimestate { struct _Py_unicode_runtime_state unicode_state; struct _types_runtime_state types; struct _Py_time_runtime_state time; + struct _timeout_scheduler_state timeout_scheduler; #if defined(__EMSCRIPTEN__) && defined(PY_CALL_TRAMPOLINE) // Used in "Python/emscripten_trampoline.c" to choose between wasm-gc diff --git a/Lib/contextlib.py b/Lib/contextlib.py index efc02bfa9243da6..a7db7fedf8f60bc 100644 --- a/Lib/contextlib.py +++ b/Lib/contextlib.py @@ -4,6 +4,7 @@ import os import sys import _collections_abc +import _timeout from collections import deque from functools import wraps lazy from inspect import ( @@ -17,7 +18,7 @@ "AbstractContextManager", "AbstractAsyncContextManager", "AsyncExitStack", "ContextDecorator", "ExitStack", "redirect_stdout", "redirect_stderr", "suppress", "aclosing", - "chdir"] + "chdir", "timeout"] class AbstractContextManager(abc.ABC): @@ -857,3 +858,18 @@ def __enter__(self): def __exit__(self, *excinfo): os.chdir(self._old_cwd.pop()) + + +class timeout(AbstractContextManager): + """Context manager that raises TimeoutError after the given delay.""" + + def __init__(self, seconds): + self._seconds = seconds + + def __enter__(self): + _timeout.enter(self._seconds) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + _timeout.leave() + return False diff --git a/Lib/test/test_contextlib.py b/Lib/test/test_contextlib.py index e291f814edbd930..50c86df98d1c197 100644 --- a/Lib/test/test_contextlib.py +++ b/Lib/test/test_contextlib.py @@ -4,12 +4,19 @@ import os import sys import tempfile +import textwrap import threading import traceback import unittest +import _timeout +try: + import _interpreters +except ModuleNotFoundError: + _interpreters = None from contextlib import * # Tests __all__ from test import support -from test.support import os_helper +from test.support import os_helper, script_helper +from test.support import warnings_helper from test.support.testcase import ExceptionIsLikeMixin import weakref @@ -1579,5 +1586,222 @@ def test_exception(self): self.assertEqual(os.getcwd(), old_cwd) +class TestTimeout(unittest.TestCase): + + @classmethod + def setUpClass(cls): + try: + _timeout.enter(support.SHORT_TIMEOUT) + except RuntimeError as exc: + if "failed to start timeout scheduler thread" in str(exc): + raise unittest.SkipTest("timeout scheduler thread unavailable") + raise + else: + _timeout.leave() + + @unittest.skipIf(_interpreters is None, "subinterpreters required") + @unittest.skipIf( + support.is_android or support.is_apple_mobile, + "Subinterpreters are not supported on Android and iOS" + ) + def test_import_contextlib_in_subinterpreter(self): + interp = _interpreters.create() + try: + excsnap = _interpreters.run_string(interp, "import contextlib") + finally: + _interpreters.destroy(interp) + + self.assertIsNone(excsnap, excsnap) + + def test_normal_exit(self): + with timeout(support.SHORT_TIMEOUT) as cm: + self.assertIsInstance(cm, timeout) + + def test_negative_timeout(self): + with self.assertRaises(ValueError): + with timeout(-1): + pass + + def test_direct_check_and_leave_before_enter(self): + code = textwrap.dedent(""" + import _timeout + + assert _timeout.check() is False + try: + _timeout.leave() + except RuntimeError: + pass + else: + raise AssertionError("inactive timeout did not fail") + print("ok") + """) + _, out, err = script_helper.assert_python_ok("-c", code) + self.assertEqual(out.splitlines(), [b"ok"]) + self.assertEqual(err, b"") + + def test_direct_leave_after_scheduler_init(self): + with timeout(support.SHORT_TIMEOUT): + pass + + with self.assertRaises(RuntimeError): + _timeout.leave() + + def test_direct_cancel_current_thread(self): + with self.assertRaisesRegex(RuntimeError, "thread cancelled"): + _timeout.cancel() + _timeout.check() + + def test_direct_cancel_current_thread_at_eval_breaker(self): + with self.assertRaisesRegex(RuntimeError, "thread cancelled"): + _timeout.cancel() + pass + + def test_cancel_unknown_thread(self): + self.assertEqual(_timeout.cancel(0), 0) + + def test_cancel_thread_by_ident(self): + ready = threading.Event() + stop = threading.Event() + result = [] + + def worker(): + ready.set() + try: + while not stop.is_set(): + pass + except RuntimeError as exc: + result.append(str(exc)) + + thread = threading.Thread(target=worker) + thread.start() + self.addCleanup(thread.join, support.SHORT_TIMEOUT) + self.addCleanup(stop.set) + + self.assertTrue(ready.wait(support.SHORT_TIMEOUT)) + self.assertEqual(_timeout.cancel(thread.ident), 1) + thread.join(support.SHORT_TIMEOUT) + if thread.is_alive(): + stop.set() + thread.join(support.SHORT_TIMEOUT) + + self.assertFalse(thread.is_alive()) + self.assertEqual(result, ["thread cancelled"]) + + def test_timeout_expires_in_pure_python(self): + with self.assertRaises(TimeoutError): + with timeout(0.01): + while True: + pass + + def test_direct_check_expired_timeout(self): + with self.assertRaises(TimeoutError): + with timeout(0): + _timeout.check() + + with self.assertRaises(RuntimeError): + _timeout.leave() + + def test_finally_runs_after_timeout(self): + state = [] + + with self.assertRaises(TimeoutError): + with timeout(0.01): + try: + while True: + pass + finally: + state.append("finally") + + self.assertEqual(state, ["finally"]) + + def test_nested_timeout_uses_earliest_deadline(self): + with self.assertRaises(TimeoutError): + with timeout(0.01): + with timeout(support.SHORT_TIMEOUT): + while True: + pass + + def test_sre_checks_timeout_cancellation(self): + import re + + with self.assertRaises(TimeoutError): + with timeout(0.01): + re.fullmatch(r"(a+)+\Z", "a" * 100_000 + "!") + + def test_thread_clear_removes_active_timeout(self): + code = textwrap.dedent(""" + import _timeout + import threading + + def worker(): + _timeout.enter(3600) + + thread = threading.Thread(target=worker) + thread.start() + thread.join() + print("ok") + """) + _, out, err = script_helper.assert_python_ok("-c", code) + self.assertEqual(out.splitlines(), [b"ok"]) + self.assertEqual(err, b"") + + @support.requires_fork() + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_timeout_works_after_fork_without_active_timeout(self): + with timeout(support.SHORT_TIMEOUT): + pass + + pid = os.fork() + if pid == 0: + try: + with timeout(0.05): + while True: + pass + except TimeoutError: + os._exit(0) + except BaseException: + os._exit(2) + else: + os._exit(1) + + support.wait_process(pid, exitcode=0, timeout=support.SHORT_TIMEOUT) + + @support.requires_fork() + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_active_timeout_works_after_fork(self): + with timeout(0.5): + pid = os.fork() + if pid == 0: + try: + while True: + pass + except TimeoutError: + os._exit(0) + except BaseException: + os._exit(2) + else: + os._exit(1) + + support.wait_process(pid, exitcode=0, timeout=support.SHORT_TIMEOUT) + + @unittest.skipUnless(support.Py_GIL_DISABLED, "requires free-threaded build") + def test_timeout_module_does_not_enable_gil(self): + code = textwrap.dedent(""" + import sys + + assert not sys._is_gil_enabled() + import _timeout + assert not sys._is_gil_enabled() + _timeout.enter(1.0) + _timeout.leave() + assert not sys._is_gil_enabled() + print("ok") + """) + _, out, err = script_helper.assert_python_ok( + "-c", code, PYTHON_GIL="0", __isolated=False) + self.assertEqual(out.splitlines(), [b"ok"]) + self.assertEqual(err, b"") + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_embed.py b/Lib/test/test_embed.py index 2d1533c46b98f33..01ab1742aa6bca8 100644 --- a/Lib/test/test_embed.py +++ b/Lib/test/test_embed.py @@ -429,6 +429,35 @@ def test_finalize_structseq(self): out, err = self.run_embedded_interpreter("test_repeated_init_exec", code) self.assertEqual(out, 'Tests passed\n' * INIT_LOOPS) + def test_timeout_repeated_init(self): + code = textwrap.dedent(""" + from contextlib import timeout + + with timeout(1.0): + pass + try: + with timeout(0.01): + while True: + pass + except TimeoutError: + pass + else: + raise AssertionError("timeout did not fire") + print("timeout-cycle") + """) + out, err = self.run_embedded_interpreter("test_repeated_init_exec", code) + self.assertEqual(out, 'timeout-cycle\n' * INIT_LOOPS) + + def test_timeout_active_block_cleared_at_finalize(self): + code = textwrap.dedent(""" + import _timeout + + _timeout.enter(3600) + print("active-timeout-cycle") + """) + out, err = self.run_embedded_interpreter("test_repeated_init_exec", code) + self.assertEqual(out, 'active-timeout-cycle\n' * INIT_LOOPS) + def test_simple_initialization_api(self): # _testembed now uses Py_InitializeFromConfig by default # This case specifically checks Py_Initialize(Ex) still works diff --git a/Makefile.pre.in b/Makefile.pre.in index cf700e9f2340901..424d57cb4364fe1 100644 --- a/Makefile.pre.in +++ b/Makefile.pre.in @@ -354,7 +354,8 @@ COVERAGE_REPORT_OPTIONS=--rc lcov_branch_coverage=1 --branch-coverage --title "C MODULE_OBJS= \ Modules/config.o \ Modules/main.o \ - Modules/gcmodule.o + Modules/gcmodule.o \ + Modules/_timeoutmodule.o IO_H= Modules/_io/_iomodule.h diff --git a/Modules/_sre/sre.c b/Modules/_sre/sre.c index 32aa06bed4a409c..062d0bebbe0486e 100644 --- a/Modules/_sre/sre.c +++ b/Modules/_sre/sre.c @@ -44,6 +44,7 @@ static const char copyright[] = #include "pycore_long.h" // _PyLong_GetZero() #include "pycore_list.h" // _PyList_AppendTakeRef() #include "pycore_moduleobject.h" // _PyModule_GetState() +#include "pycore_pystate.h" // _PyThreadState_CheckCancellation() #include "pycore_tuple.h" // _PyTuple_FromPairSteal #include "pycore_unicodeobject.h" // _PyUnicode_Copy #include "pycore_weakref.h" // FT_CLEAR_WEAKREFS() diff --git a/Modules/_sre/sre_lib.h b/Modules/_sre/sre_lib.h index df377905bfae0d0..5cb3a9d3eef536d 100644 --- a/Modules/_sre/sre_lib.h +++ b/Modules/_sre/sre_lib.h @@ -550,8 +550,13 @@ typedef struct { #define _MAYBE_CHECK_SIGNALS \ do { \ - if ((0 == (++sigcount & 0xfff)) && PyErr_CheckSignals()) { \ - RETURN_ERROR(SRE_ERROR_INTERRUPTED); \ + if (0 == (++sigcount & 0xfff)) { \ + if (PyErr_CheckSignals()) { \ + RETURN_ERROR(SRE_ERROR_INTERRUPTED); \ + } \ + if (_PyThreadState_CheckCancellation(_PyThreadState_GET())) { \ + RETURN_ERROR(SRE_ERROR_INTERRUPTED); \ + } \ } \ } while (0) diff --git a/Modules/_timeoutmodule.c b/Modules/_timeoutmodule.c new file mode 100644 index 000000000000000..d3d4bf97d3cce62 --- /dev/null +++ b/Modules/_timeoutmodule.c @@ -0,0 +1,101 @@ +#include "Python.h" +#include "pycore_pystate.h" // _PyTimeout_Push() +#include "pycore_time.h" // _PyTime_FromSecondsObject() + + +static PyObject * +timeout_enter(PyObject *Py_UNUSED(module), PyObject *timeout_obj) +{ + PyTime_t timeout; + if (_PyTime_FromSecondsObject(&timeout, timeout_obj, + _PyTime_ROUND_TIMEOUT) < 0) + { + return NULL; + } + + PyThreadState *tstate = _PyThreadState_GET(); + if (_PyTimeout_Push(tstate, timeout) < 0) { + return NULL; + } + + Py_RETURN_NONE; +} + + +static PyObject * +timeout_leave(PyObject *Py_UNUSED(module), PyObject *Py_UNUSED(ignored)) +{ + PyThreadState *tstate = _PyThreadState_GET(); + if (_PyTimeout_Pop(tstate) < 0) { + return NULL; + } + + Py_RETURN_NONE; +} + + +static PyObject * +timeout_check(PyObject *Py_UNUSED(module), PyObject *Py_UNUSED(ignored)) +{ + PyThreadState *tstate = _PyThreadState_GET(); + if (_PyThreadState_CheckCancellation(tstate) < 0) { + return NULL; + } + + Py_RETURN_FALSE; +} + + +static PyObject * +timeout_cancel(PyObject *Py_UNUSED(module), PyObject *args) +{ + PyThreadState *tstate = _PyThreadState_GET(); + if (PyTuple_GET_SIZE(args) == 0) { + _PyThreadState_RequestCancel(tstate, _PY_CANCEL_GENERIC); + Py_RETURN_NONE; + } + + unsigned long id; + if (!PyArg_ParseTuple(args, "k:cancel", &id)) { + return NULL; + } + + int requested = _PyThreadState_RequestCancelByThreadId( + tstate->interp, id, _PY_CANCEL_GENERIC); + return PyLong_FromLong(requested); +} + + +static PyMethodDef timeout_methods[] = { + {"enter", timeout_enter, METH_O, PyDoc_STR("Enter a timeout block.")}, + {"leave", timeout_leave, METH_NOARGS, PyDoc_STR("Leave a timeout block.")}, + {"check", timeout_check, METH_NOARGS, PyDoc_STR("Check for cancellation.")}, + {"cancel", timeout_cancel, METH_VARARGS, + PyDoc_STR("Request cooperative cancellation.")}, + {NULL, NULL} +}; + + +static PyModuleDef_Slot timeout_slots[] = { + _Py_ABI_SLOT, + {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED}, + {Py_mod_gil, Py_MOD_GIL_NOT_USED}, + {0, NULL} +}; + + +static struct PyModuleDef timeoutmodule = { + PyModuleDef_HEAD_INIT, + .m_name = "_timeout", + .m_doc = "Prototype synchronous timeout support.", + .m_size = 0, + .m_methods = timeout_methods, + .m_slots = timeout_slots, +}; + + +PyMODINIT_FUNC +PyInit__timeout(void) +{ + return PyModuleDef_Init(&timeoutmodule); +} diff --git a/Modules/config.c.in b/Modules/config.c.in index 704f58506048a3e..96b8086ad553117 100644 --- a/Modules/config.c.in +++ b/Modules/config.c.in @@ -22,6 +22,7 @@ extern PyObject* PyInit__tokenize(void); extern PyObject* PyInit__contextvars(void); extern PyObject* _PyWarnings_Init(void); extern PyObject* PyInit__string(void); +extern PyObject* PyInit__timeout(void); struct _inittab _PyImport_Inittab[] = { @@ -55,6 +56,9 @@ struct _inittab _PyImport_Inittab[] = { /* This lives in Objects/unicodeobject.c */ {"_string", PyInit__string}, + /* This lives in Modules/_timeoutmodule.c */ + {"_timeout", PyInit__timeout}, + /* Sentinel */ {0, 0} }; diff --git a/PC/config.c b/PC/config.c index 51b46c64d99b816..ef73b7f875cfb59 100644 --- a/PC/config.c +++ b/PC/config.c @@ -83,6 +83,7 @@ extern PyObject* PyInit__string(void); extern PyObject* PyInit__stat(void); extern PyObject* PyInit__opcode(void); extern PyObject* PyInit__contextvars(void); +extern PyObject* PyInit__timeout(void); extern PyObject* PyInit__tokenize(void); extern PyObject* PyInit__suggestions(void); @@ -193,6 +194,7 @@ struct _inittab _PyImport_Inittab[] = { {"_opcode", PyInit__opcode}, {"_contextvars", PyInit__contextvars}, + {"_timeout", PyInit__timeout}, /* Sentinel */ {0, 0} diff --git a/PCbuild/pythoncore.vcxproj b/PCbuild/pythoncore.vcxproj index e255ed5af19125d..8b6fef2b5665c64 100644 --- a/PCbuild/pythoncore.vcxproj +++ b/PCbuild/pythoncore.vcxproj @@ -507,6 +507,7 @@ + diff --git a/PCbuild/pythoncore.vcxproj.filters b/PCbuild/pythoncore.vcxproj.filters index 649ee1859ff9961..05183e4d1bd0b93 100644 --- a/PCbuild/pythoncore.vcxproj.filters +++ b/PCbuild/pythoncore.vcxproj.filters @@ -1043,6 +1043,9 @@ Modules + + Modules + Modules diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 2425bc1b39f0dcc..d747804b5689aa0 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -5,6 +5,7 @@ #include "pycore_optimizer.h" // _Py_Executors_InvalidateCold() #include "pycore_pyerrors.h" // _PyErr_GetRaisedException() #include "pycore_pylifecycle.h" // _PyErr_Print() +#include "pycore_pystate.h" // _PyThreadState_CheckCancellation() #include "pycore_pystats.h" // _Py_PrintSpecializationStats() #include "pycore_runtime.h" // _PyRuntime @@ -1428,6 +1429,12 @@ _Py_HandlePending(PyThreadState *tstate) } } + if ((breaker & _PY_CANCEL_REQUESTED_BIT) != 0) { + if (_PyThreadState_CheckCancellation(tstate) < 0) { + return -1; + } + } + #if defined(Py_REMOTE_DEBUG) && defined(Py_SUPPORTS_REMOTE_DEBUG) _PyRunRemoteDebugger(tstate); #endif diff --git a/Python/pystate.c b/Python/pystate.c index fed1df0173bacf1..96e50e1d2700ecf 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -7,11 +7,13 @@ #include "pycore_backoff.h" // JUMP_BACKWARD_INITIAL_VALUE, SIDE_EXIT_INITIAL_VALUE #include "pycore_ceval.h" // _PyEval_AcquireLock() #include "pycore_codecs.h" // _PyCodec_Fini() +#include "pycore_condvar.h" // PyCOND_T #include "pycore_critical_section.h" // _PyCriticalSection_Resume() #include "pycore_dtoa.h" // _dtoa_state_INIT() #include "pycore_freelist.h" // _PyObject_ClearFreeLists() #include "pycore_initconfig.h" // _PyStatus_OK() #include "pycore_interpframe.h" // _PyThreadState_HasStackSpace() +#include "pycore_lock.h" // _PyOnceFlag_CallOnce() #include "pycore_object.h" // _PyType_InitCache(), _Py_ClearImmortal() #include "pycore_obmalloc.h" // _PyMem_obmalloc_state_on_heap() #include "pycore_opcode_utils.h" // NUM_COMMON_CONSTANTS @@ -20,12 +22,14 @@ #include "pycore_pyerrors.h" // _PyErr_Clear() #include "pycore_pylifecycle.h" // _PyAST_Fini() #include "pycore_pymem.h" // _PyMem_DebugEnabled() +#include "pycore_pythread.h" // PyThread_start_joinable_thread() #include "pycore_runtime.h" // _PyRuntime #include "pycore_runtime_init.h" // _PyRuntimeState_INIT #include "pycore_stackref.h" // PyStackRef_AsPyObjectBorrow() #include "pycore_stats.h" // FT_STAT_WORLD_STOP_INC() #include "pycore_time.h" // _PyTime_Init() #include "pycore_uniqueid.h" // _PyObject_FinalizePerThreadRefcounts() +#include "condvar.h" // PyCOND_WAIT() /* -------------------------------------------------------------------------- @@ -47,6 +51,247 @@ to avoid the expense of doing their own locking). # endif #endif +#include + +typedef struct _timeout_block { + PyThreadState *tstate; + PyTime_t deadline; + int notified; + int fired; + struct _timeout_block *prev; + struct _timeout_block *sched_prev; + struct _timeout_block *sched_next; +} _PyTimeoutBlock; + +static void +timeout_mutex_lock(struct _timeout_scheduler_state *scheduler) +{ + if (PyMUTEX_LOCK(&scheduler->mutex)) { + Py_FatalError("PyMUTEX_LOCK(timeout_scheduler.mutex) failed"); + } +} + +static void +timeout_mutex_unlock(struct _timeout_scheduler_state *scheduler) +{ + if (PyMUTEX_UNLOCK(&scheduler->mutex)) { + Py_FatalError("PyMUTEX_UNLOCK(timeout_scheduler.mutex) failed"); + } +} + +static void +timeout_cond_signal(struct _timeout_scheduler_state *scheduler) +{ + if (PyCOND_SIGNAL(&scheduler->cond)) { + Py_FatalError("PyCOND_SIGNAL(timeout_scheduler.cond) failed"); + } +} + +static void +timeout_cond_broadcast(struct _timeout_scheduler_state *scheduler) +{ + if (PyCOND_BROADCAST(&scheduler->cond)) { + Py_FatalError("PyCOND_BROADCAST(timeout_scheduler.cond) failed"); + } +} + +static void +timeout_cond_wait(struct _timeout_scheduler_state *scheduler) +{ + if (PyCOND_WAIT(&scheduler->cond, &scheduler->mutex)) { + Py_FatalError("PyCOND_WAIT(timeout_scheduler.cond) failed"); + } +} + +static int +timeout_find_next_deadline_unlocked(struct _timeout_scheduler_state *scheduler, + PyTime_t *deadline) +{ + int found = 0; + PyTime_t earliest = 0; + + for (_PyTimeoutBlock *block = scheduler->head; + block != NULL; + block = block->sched_next) + { + if (block->notified || block->fired) { + continue; + } + if (!found || block->deadline < earliest) { + earliest = block->deadline; + found = 1; + } + } + + if (found) { + *deadline = earliest; + } + return found; +} + +static long long +timeout_as_microseconds(PyTime_t ns) +{ + PyTime_t us = _PyTime_AsMicroseconds(ns, _PyTime_ROUND_CEILING); + if (us > LLONG_MAX) { + return LLONG_MAX; + } + if (us < 0) { + return 0; + } + return (long long)us; +} + +static void +timeout_notify_expired_unlocked(struct _timeout_scheduler_state *scheduler, + PyTime_t now) +{ + for (_PyTimeoutBlock *block = scheduler->head; + block != NULL; + block = block->sched_next) + { + if (!block->notified && !block->fired && block->deadline <= now) { + block->notified = 1; + _PyThreadState_RequestCancel(block->tstate, _PY_CANCEL_TIMEOUT); + } + } +} + +static void +timeout_scheduler_thread(void *arg) +{ + struct _timeout_scheduler_state *scheduler = arg; + + timeout_mutex_lock(scheduler); + for (;;) { + PyTime_t deadline; + while (!scheduler->stopping && + !timeout_find_next_deadline_unlocked(scheduler, &deadline)) + { + timeout_cond_wait(scheduler); + } + if (scheduler->stopping) { + break; + } + + PyTime_t now; + (void)PyTime_MonotonicRaw(&now); + PyTime_t remaining = deadline - now; + if (remaining > 0) { + int r = PyCOND_TIMEDWAIT(&scheduler->cond, + &scheduler->mutex, + timeout_as_microseconds(remaining)); + if (r < 0) { + Py_FatalError("PyCOND_TIMEDWAIT(timeout_scheduler.cond) failed"); + } + continue; + } + + timeout_notify_expired_unlocked(scheduler, now); + } + timeout_mutex_unlock(scheduler); +} + +static int +timeout_scheduler_init(void *arg) +{ + struct _timeout_scheduler_state *scheduler = arg; + + if (PyMUTEX_INIT(&scheduler->mutex)) { + PyErr_SetString(PyExc_RuntimeError, "failed to initialize timeout mutex"); + return -1; + } + if (PyCOND_INIT(&scheduler->cond)) { + (void)PyMUTEX_FINI(&scheduler->mutex); + PyErr_SetString(PyExc_RuntimeError, "failed to initialize timeout condition"); + return -1; + } + scheduler->initialized = 1; + scheduler->stopping = 0; + scheduler->head = NULL; + if (PyThread_start_joinable_thread(timeout_scheduler_thread, scheduler, + &scheduler->ident, + &scheduler->handle)) + { + (void)PyCOND_FINI(&scheduler->cond); + (void)PyMUTEX_FINI(&scheduler->mutex); + scheduler->initialized = 0; + scheduler->handle = 0; + scheduler->ident = 0; + PyErr_SetString(PyExc_RuntimeError, "failed to start timeout scheduler thread"); + return -1; + } + scheduler->running = 1; + return 0; +} + +static void +timeout_scheduler_fini(struct _timeout_scheduler_state *scheduler) +{ + if (!scheduler->initialized) { + scheduler->once = (_PyOnceFlag){0}; + return; + } + + if (scheduler->running) { + timeout_mutex_lock(scheduler); + scheduler->stopping = 1; + timeout_cond_broadcast(scheduler); + timeout_mutex_unlock(scheduler); + + if (PyThread_join_thread(scheduler->handle)) { + Py_FatalError("PyThread_join_thread(timeout_scheduler) failed"); + } + scheduler->running = 0; + } + + if (scheduler->head != NULL) { + Py_FatalError("timeout scheduler stopped with active timeouts"); + } + + if (PyCOND_FINI(&scheduler->cond)) { + Py_FatalError("PyCOND_FINI(timeout_scheduler.cond) failed"); + } + if (PyMUTEX_FINI(&scheduler->mutex)) { + Py_FatalError("PyMUTEX_FINI(timeout_scheduler.mutex) failed"); + } + + scheduler->handle = 0; + scheduler->ident = 0; + scheduler->initialized = 0; + scheduler->stopping = 0; + scheduler->once = (_PyOnceFlag){0}; +} + +static void +timeout_schedule_block_unlocked(struct _timeout_scheduler_state *scheduler, + _PyTimeoutBlock *block) +{ + block->sched_prev = NULL; + block->sched_next = scheduler->head; + if (scheduler->head != NULL) { + scheduler->head->sched_prev = block; + } + scheduler->head = block; +} + +static void +timeout_unschedule_block_unlocked(struct _timeout_scheduler_state *scheduler, + _PyTimeoutBlock *block) +{ + if (block->sched_prev != NULL) { + block->sched_prev->sched_next = block->sched_next; + } + else if (scheduler->head == block) { + scheduler->head = block->sched_next; + } + if (block->sched_next != NULL) { + block->sched_next->sched_prev = block->sched_prev; + } + block->sched_prev = NULL; + block->sched_next = NULL; +} + /****************************************/ /* helpers for the current thread state */ @@ -103,6 +348,89 @@ current_fast_clear(_PyRuntimeState *Py_UNUSED(runtime)) _Py_tss_interp = NULL; } +#ifdef HAVE_FORK +static void +timeout_clear_thread_after_fork_unlocked(PyThreadState *tstate) +{ + _PyTimeoutBlock *block = (_PyTimeoutBlock *)tstate->timeout_block; + tstate->timeout_block = NULL; + while (block != NULL) { + _PyTimeoutBlock *prev = block->prev; + PyMem_RawFree(block); + block = prev; + } + _PyThreadState_ClearCancellation(tstate, _PY_CANCEL_TIMEOUT); +} + +static PyStatus +timeout_scheduler_after_fork_child(_PyRuntimeState *runtime) +{ + struct _timeout_scheduler_state *scheduler = &runtime->timeout_scheduler; + PyThreadState *current = current_fast_get(); + + for (PyInterpreterState *interp = runtime->interpreters.head; + interp != NULL; + interp = interp->next) + { + for (PyThreadState *tstate = interp->threads.head; + tstate != NULL; + tstate = tstate->next) + { + if (tstate != current) { + timeout_clear_thread_after_fork_unlocked(tstate); + } + } + } + + _PyTimeoutBlock *block = current != NULL + ? (_PyTimeoutBlock *)current->timeout_block + : NULL; + + memset(scheduler, 0, sizeof(*scheduler)); + + if (block == NULL) { + return _PyStatus_OK(); + } + + _PyThreadState_ClearCancellation(current, _PY_CANCEL_TIMEOUT); + + if (PyMUTEX_INIT(&scheduler->mutex)) { + return _PyStatus_ERR("failed to initialize timeout mutex after fork"); + } + if (PyCOND_INIT(&scheduler->cond)) { + (void)PyMUTEX_FINI(&scheduler->mutex); + return _PyStatus_ERR("failed to initialize timeout condition after fork"); + } + + scheduler->initialized = 1; + scheduler->stopping = 0; + scheduler->head = NULL; + for (; block != NULL; block = block->prev) { + block->tstate = current; + block->notified = 0; + block->sched_prev = NULL; + block->sched_next = NULL; + if (!block->fired) { + timeout_schedule_block_unlocked(scheduler, block); + } + } + + if (PyThread_start_joinable_thread(timeout_scheduler_thread, scheduler, + &scheduler->ident, + &scheduler->handle)) + { + (void)PyCOND_FINI(&scheduler->cond); + (void)PyMUTEX_FINI(&scheduler->mutex); + memset(scheduler, 0, sizeof(*scheduler)); + return _PyStatus_ERR("failed to start timeout scheduler thread after fork"); + } + + scheduler->running = 1; + _Py_atomic_store_uint8(&scheduler->once.v, _Py_ONCE_INITIALIZED); + return _PyStatus_OK(); +} +#endif + #define tstate_verify_not_active(tstate) \ if (tstate == current_fast_get()) { \ _Py_FatalErrorFormat(__func__, "tstate %p is still current", tstate); \ @@ -389,6 +717,7 @@ _PyRuntimeState_Fini(_PyRuntimeState *runtime) /* The count is cleared by _Py_FinalizeRefTotal(). */ assert(runtime->object_state.interpreter_leaks == 0); #endif + timeout_scheduler_fini(&runtime->timeout_scheduler); gilstate_clear(); } @@ -424,6 +753,11 @@ _PyRuntimeState_ReInitThreads(_PyRuntimeState *runtime) _PyThread_AfterFork(&runtime->threads); + PyStatus status = timeout_scheduler_after_fork_child(runtime); + if (_PyStatus_EXCEPTION(status)) { + return status; + } + return _PyStatus_OK(); } #endif @@ -1613,6 +1947,8 @@ init_threadstate(_PyThreadStateImpl *_tstate, tstate->current_executor = NULL; tstate->jit_exit = NULL; tstate->dict_global_version = 0; + tstate->cancel_flags = 0; + tstate->timeout_block = NULL; _tstate->c_stack_soft_limit = UINTPTR_MAX; _tstate->c_stack_top = 0; @@ -1807,6 +2143,8 @@ PyThreadState_Clear(PyThreadState *tstate) _PyErr_Print(tstate); } + _PyTimeout_ClearThread(tstate); + /* At this point tstate shouldn't be used any more, neither to run Python code nor for other uses. @@ -3623,3 +3961,215 @@ PyThreadState_Release(PyThreadStateToken *token) PyInterpreterGuard_Close(owned_guard); } } + +static _PyTimeoutBlock * +timeout_find_expired(PyThreadState *tstate, PyTime_t now) +{ + _PyTimeoutBlock *expired = NULL; + for (_PyTimeoutBlock *block = (_PyTimeoutBlock *)tstate->timeout_block; + block != NULL; + block = block->prev) + { + if (!block->fired && block->deadline <= now) { + if (expired == NULL || block->deadline < expired->deadline) { + expired = block; + } + } + } + return expired; +} + +int +_PyTimeout_Push(PyThreadState *tstate, PyTime_t timeout) +{ + assert(tstate != NULL); + struct _timeout_scheduler_state *scheduler = &tstate->interp->runtime->timeout_scheduler; + + if (timeout < 0) { + PyErr_SetString(PyExc_ValueError, "timeout must be non-negative"); + return -1; + } + if (_PyOnceFlag_CallOnce(&scheduler->once, + timeout_scheduler_init, scheduler) < 0) + { + return -1; + } + + PyTime_t now; + (void)PyTime_MonotonicRaw(&now); + + _PyTimeoutBlock *block = PyMem_RawMalloc(sizeof(_PyTimeoutBlock)); + if (block == NULL) { + PyErr_NoMemory(); + return -1; + } + block->tstate = tstate; + block->deadline = _PyTime_Add(now, timeout); + block->notified = 0; + block->fired = 0; + block->prev = NULL; + block->sched_prev = NULL; + block->sched_next = NULL; + + timeout_mutex_lock(scheduler); + if (scheduler->stopping) { + timeout_mutex_unlock(scheduler); + PyMem_RawFree(block); + PyErr_SetString(PyExc_RuntimeError, "timeout scheduler is shutting down"); + return -1; + } + block->prev = (_PyTimeoutBlock *)tstate->timeout_block; + tstate->timeout_block = (struct _timeout_block *)block; + timeout_schedule_block_unlocked(scheduler, block); + timeout_cond_signal(scheduler); + timeout_mutex_unlock(scheduler); + + return 0; +} + +int +_PyTimeout_Pop(PyThreadState *tstate) +{ + assert(tstate != NULL); + struct _timeout_scheduler_state *scheduler = &tstate->interp->runtime->timeout_scheduler; + + if (!scheduler->initialized) { + PyErr_SetString(PyExc_RuntimeError, "cannot exit inactive timeout"); + return -1; + } + + timeout_mutex_lock(scheduler); + _PyTimeoutBlock *block = (_PyTimeoutBlock *)tstate->timeout_block; + if (block == NULL) { + timeout_mutex_unlock(scheduler); + PyErr_SetString(PyExc_RuntimeError, "cannot exit inactive timeout"); + return -1; + } + + tstate->timeout_block = (struct _timeout_block *)block->prev; + timeout_unschedule_block_unlocked(scheduler, block); + timeout_cond_signal(scheduler); + timeout_mutex_unlock(scheduler); + + PyMem_RawFree(block); + return 0; +} + +int +_PyTimeout_CheckNow(PyThreadState *tstate) +{ + assert(tstate != NULL); + struct _timeout_scheduler_state *scheduler = &tstate->interp->runtime->timeout_scheduler; + + if (!scheduler->initialized) { + return 0; + } + + PyTime_t now; + (void)PyTime_MonotonicRaw(&now); + + timeout_mutex_lock(scheduler); + if (tstate->timeout_block == NULL) { + timeout_mutex_unlock(scheduler); + return 0; + } + + _PyTimeoutBlock *expired = timeout_find_expired(tstate, now); + if (expired == NULL) { + timeout_mutex_unlock(scheduler); + return 0; + } + + expired->fired = 1; + expired->notified = 1; + timeout_cond_signal(scheduler); + timeout_mutex_unlock(scheduler); + PyErr_SetString(PyExc_TimeoutError, "timeout expired"); + return -1; +} + +int +_PyThreadState_CheckCancellation(PyThreadState *tstate) +{ + assert(tstate != NULL); + _Py_unset_eval_breaker_bit(tstate, _PY_CANCEL_REQUESTED_BIT); + uintptr_t cancel_flags = _Py_atomic_exchange_uintptr( + &tstate->cancel_flags, 0); + if ((cancel_flags & _PY_CANCEL_TIMEOUT) != 0) { + if (_PyTimeout_CheckNow(tstate) < 0) { + if ((cancel_flags & _PY_CANCEL_GENERIC) != 0) { + _PyThreadState_RequestCancel(tstate, _PY_CANCEL_GENERIC); + } + return -1; + } + } + if ((cancel_flags & _PY_CANCEL_GENERIC) != 0) { + PyErr_SetString(PyExc_RuntimeError, "thread cancelled"); + return -1; + } + return _PyTimeout_CheckNow(tstate); +} + +void +_PyThreadState_RequestCancel(PyThreadState *tstate, uintptr_t reason) +{ + assert(tstate != NULL); + _Py_atomic_or_uintptr(&tstate->cancel_flags, reason); + _Py_set_eval_breaker_bit(tstate, _PY_CANCEL_REQUESTED_BIT); +} + +int +_PyThreadState_RequestCancelByThreadId(PyInterpreterState *interp, + unsigned long id, + uintptr_t reason) +{ + int requested = 0; + _Py_FOR_EACH_TSTATE_BEGIN(interp, t) { + if (t->thread_id == id) { + _PyThreadState_RequestCancel(t, reason); + requested = 1; + break; + } + } + _Py_FOR_EACH_TSTATE_END(interp); + + return requested; +} + +void +_PyThreadState_ClearCancellation(PyThreadState *tstate, uintptr_t reason) +{ + assert(tstate != NULL); + _Py_atomic_and_uintptr(&tstate->cancel_flags, ~reason); +} + +void +_PyTimeout_ClearThread(PyThreadState *tstate) +{ + assert(tstate != NULL); + struct _timeout_scheduler_state *scheduler = &tstate->interp->runtime->timeout_scheduler; + + if (!scheduler->initialized) { + _PyThreadState_ClearCancellation(tstate, _PY_CANCEL_TIMEOUT); + return; + } + + timeout_mutex_lock(scheduler); + _PyTimeoutBlock *block = (_PyTimeoutBlock *)tstate->timeout_block; + if (block == NULL) { + timeout_mutex_unlock(scheduler); + _PyThreadState_ClearCancellation(tstate, _PY_CANCEL_TIMEOUT); + return; + } + tstate->timeout_block = NULL; + while (block != NULL) { + _PyTimeoutBlock *prev = block->prev; + timeout_unschedule_block_unlocked(scheduler, block); + PyMem_RawFree(block); + block = prev; + } + timeout_cond_signal(scheduler); + timeout_mutex_unlock(scheduler); + + _PyThreadState_ClearCancellation(tstate, _PY_CANCEL_TIMEOUT); +} diff --git a/Python/stdlib_module_names.h b/Python/stdlib_module_names.h index 8937e666bbbdd5b..2527d532c220049 100644 --- a/Python/stdlib_module_names.h +++ b/Python/stdlib_module_names.h @@ -93,6 +93,7 @@ static const char* _Py_stdlib_module_names[] = { "_sysconfig", "_thread", "_threading_local", +"_timeout", "_tkinter", "_tokenize", "_tracemalloc", diff --git a/Tools/timeoutbench/bench_timeout.py b/Tools/timeoutbench/bench_timeout.py new file mode 100644 index 000000000000000..987a07823259eb2 --- /dev/null +++ b/Tools/timeoutbench/bench_timeout.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +"""Benchmark prototype synchronous timeout overhead with pyperf.""" + +from __future__ import annotations + +from contextlib import timeout +import pyperf + + +_sink = 0 +_scheduler_warmed = False + + +def _consume(value: int) -> None: + global _sink + _sink ^= value + + +def _warm_timeout_scheduler(timeout_seconds: float) -> None: + global _scheduler_warmed + if not _scheduler_warmed: + with timeout(timeout_seconds): + pass + _scheduler_warmed = True + + +def _loop_body(inner_loops: int) -> int: + total = 0 + for value in range(inner_loops): + total += value + return total + + +def bench_loop_baseline(loops: int, inner_loops: int) -> float: + t0 = pyperf.perf_counter() + total = 0 + for _ in range(loops): + total += _loop_body(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_loop_timeout_active( + loops: int, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout_scheduler(timeout_seconds) + t0 = pyperf.perf_counter() + total = 0 + with timeout(timeout_seconds): + for _ in range(loops): + total += _loop_body(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_timeout_enter_exit(loops: int, timeout_seconds: float) -> float: + _warm_timeout_scheduler(timeout_seconds) + t0 = pyperf.perf_counter() + for _ in range(loops): + with timeout(timeout_seconds): + pass + return pyperf.perf_counter() - t0 + + +def bench_timeout_expiry_latency(loops: int, timeout_seconds: float) -> float: + _warm_timeout_scheduler(3600.0) + t0 = pyperf.perf_counter() + for _ in range(loops): + try: + with timeout(timeout_seconds): + while True: + pass + except TimeoutError: + pass + return pyperf.perf_counter() - t0 + + +def add_cmdline_args(cmd, args) -> None: + cmd.extend(("--inner-loops", str(args.inner_loops))) + cmd.extend(("--timeout-seconds", str(args.timeout_seconds))) + cmd.extend(("--expiry-seconds", str(args.expiry_seconds))) + if args.expiry: + cmd.append("--expiry") + + +def main() -> None: + runner = pyperf.Runner(add_cmdline_args=add_cmdline_args) + runner.argparser.add_argument( + "--inner-loops", + type=int, + default=1000, + help="work per calibrated pyperf loop for loop benchmarks", + ) + runner.argparser.add_argument( + "--timeout-seconds", + type=float, + default=3600.0, + help="non-expiring timeout duration used by overhead benchmarks", + ) + runner.argparser.add_argument( + "--expiry", + action="store_true", + help="also benchmark actual expiry latency", + ) + runner.argparser.add_argument( + "--expiry-seconds", + type=float, + default=1e-6, + help="timeout duration used by --expiry", + ) + args = runner.parse_args() + + runner.metadata["description"] = "CPython synchronous timeout prototype" + runner.metadata["timeout_inner_loops"] = str(args.inner_loops) + runner.metadata["timeout_seconds"] = str(args.timeout_seconds) + + runner.bench_time_func( + "loop_baseline", + bench_loop_baseline, + args.inner_loops, + ) + runner.bench_time_func( + "loop_timeout_active", + bench_loop_timeout_active, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "timeout_enter_exit", + bench_timeout_enter_exit, + args.timeout_seconds, + ) + if args.expiry: + runner.bench_time_func( + "timeout_expiry_latency", + bench_timeout_expiry_latency, + args.expiry_seconds, + ) + + +if __name__ == "__main__": + main() diff --git a/Tools/timeoutbench/bench_timeout_compare.py b/Tools/timeoutbench/bench_timeout_compare.py new file mode 100644 index 000000000000000..ec260610d44ad83 --- /dev/null +++ b/Tools/timeoutbench/bench_timeout_compare.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +"""Compare timeout prototype overhead against a clean baseline.""" + +from __future__ import annotations + +from contextlib import nullcontext +import pyperf + + +_sink = 0 +_timeout_warmed = False + + +def _consume(value: int) -> None: + global _sink + _sink ^= value + + +def _has_timeout(implementation: str) -> bool: + return implementation != "baseline" + + +def _get_timeout(seconds: float): + if seconds <= 0: + raise ValueError("timeout must be positive for the overhead benchmark") + from contextlib import timeout + return timeout(seconds) + + +def _warm_timeout(implementation: str, timeout_seconds: float) -> None: + global _timeout_warmed + if _has_timeout(implementation) and not _timeout_warmed: + with _get_timeout(timeout_seconds): + pass + _timeout_warmed = True + + +def _pass_loop(inner_loops: int) -> int: + total = 0 + for value in range(inner_loops): + total ^= value + return total + + +def _arithmetic_loop(inner_loops: int) -> int: + total = 0 + for value in range(inner_loops): + total += value + return total + + +def _listcomp_work(inner_loops: int) -> int: + values = [value * value for value in range(inner_loops)] + return values[-1] if values else 0 + + +def bench_pass_loop( + loops: int, + implementation: str, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + total = 0 + t0 = pyperf.perf_counter() + if _has_timeout(implementation): + with _get_timeout(timeout_seconds): + for _ in range(loops): + total ^= _pass_loop(inner_loops) + else: + for _ in range(loops): + total ^= _pass_loop(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_arithmetic_loop( + loops: int, + implementation: str, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + total = 0 + t0 = pyperf.perf_counter() + if _has_timeout(implementation): + with _get_timeout(timeout_seconds): + for _ in range(loops): + total += _arithmetic_loop(inner_loops) + else: + for _ in range(loops): + total += _arithmetic_loop(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_listcomp_work( + loops: int, + implementation: str, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + total = 0 + t0 = pyperf.perf_counter() + if _has_timeout(implementation): + with _get_timeout(timeout_seconds): + for _ in range(loops): + total ^= _listcomp_work(inner_loops) + else: + for _ in range(loops): + total ^= _listcomp_work(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_context_enter_exit( + loops: int, + implementation: str, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + if _has_timeout(implementation): + t0 = pyperf.perf_counter() + for _ in range(loops): + with _get_timeout(timeout_seconds): + pass + else: + t0 = pyperf.perf_counter() + for _ in range(loops): + with nullcontext(): + pass + return pyperf.perf_counter() - t0 + + +def add_cmdline_args(cmd, args) -> None: + cmd.extend(("--implementation", args.implementation)) + cmd.extend(("--inner-loops", str(args.inner_loops))) + cmd.extend(("--timeout-seconds", str(args.timeout_seconds))) + + +def main() -> None: + runner = pyperf.Runner(add_cmdline_args=add_cmdline_args) + runner.argparser.add_argument( + "--implementation", + choices=("baseline", "op", "ours"), + required=True, + help="implementation under test", + ) + runner.argparser.add_argument( + "--inner-loops", + type=int, + default=1000, + help="work per calibrated pyperf loop", + ) + runner.argparser.add_argument( + "--timeout-seconds", + type=float, + default=3600.0, + help="non-expiring timeout duration", + ) + args = runner.parse_args() + + runner.metadata["timeout_implementation"] = args.implementation + runner.metadata["timeout_inner_loops"] = str(args.inner_loops) + runner.metadata["timeout_seconds"] = str(args.timeout_seconds) + + runner.bench_time_func( + "pass_loop", + bench_pass_loop, + args.implementation, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "arithmetic_loop", + bench_arithmetic_loop, + args.implementation, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "listcomp_work", + bench_listcomp_work, + args.implementation, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "context_enter_exit", + bench_context_enter_exit, + args.implementation, + args.timeout_seconds, + ) + + +if __name__ == "__main__": + main() diff --git a/Tools/timeoutbench/discourse_deadline_scheduler_draft.md b/Tools/timeoutbench/discourse_deadline_scheduler_draft.md new file mode 100644 index 000000000000000..585b7be7ed45e21 --- /dev/null +++ b/Tools/timeoutbench/discourse_deadline_scheduler_draft.md @@ -0,0 +1,263 @@ +I tried a different implementation strategy that avoids keeping the eval breaker hot for the whole duration of an active timeout block. I am calling it the deadline-scheduler approach below, because the main difference is that a scheduler thread watches deadlines and only notifies the eval loop once a deadline has actually expired. + +The current PR/prototype sets a timeout bit while inside `with timeout(...)`, so the eval loop enters pending-handling frequently while the timeout is active. The actual clock read can be amortized with a skip counter, but `_Py_HandlePending()` is still reached repeatedly. That appears to be the source of the active-timeout overhead discussed above. + +The deadline-scheduler prototype keeps the eval loop on its normal fast path until the timeout actually expires: + +* `with timeout(seconds)` pushes a deadline block onto the current `PyThreadState`. +* A runtime scheduler thread tracks active deadline blocks. +* While the timeout has not expired, no timeout eval-breaker bit is set. +* When a deadline expires, the scheduler requests cancellation on the target thread state with the timeout reason. +* The target thread raises `TimeoutError` at the next normal eval-breaker check. + +So pure Python code is still interrupted cooperatively at bytecode/eval-breaker boundaries, but an active, non-expired timeout does not force pending handling on every bytecode/check point. + +Prototype branch: https://github.com/blhsing/cpython/tree/feature/cooperative-timeout + +CI passed here: https://github.com/blhsing/cpython/actions/runs/28073335858 + +Benchmark environment: + +* Linux 6.14, x86_64 +* Intel Xeon Silver 4410Y +* GCC 13.3.0, `-O3` +* CPython 3.16.0a0 +* `pyperf` +* `timeout_seconds=3600.0`, so the timeout never expires during the benchmark +* `inner_loops=1000` + +Mean ± std dev: + +| Benchmark | Baseline | OP approach | Deadline-scheduler approach | +| --- | ---: | ---: | ---: | +| `pass_loop` | 22.9 us ± 0.1 us | 33.3 us ± 0.2 us, 1.45x slower | 22.2 us ± 0.1 us, not meaningfully different | +| `arithmetic_loop` | 28.5 us ± 0.2 us | 37.8 us ± 0.2 us, 1.33x slower | 25.5 us ± 0.2 us, not meaningfully different | +| `listcomp_work` | 35.5 us ± 0.7 us | 46.5 us ± 0.4 us, 1.31x slower | 35.8 us ± 0.7 us, not significant | +| empty timeout context enter/exit | 342 ns ± 2 ns, `nullcontext` baseline | 1.06 us ± 0.01 us | 1.73 us ± 0.05 us | + +The small "faster than baseline" results for the deadline-scheduler approach are just build/run noise, not a claimed speedup. The same-binary no-timeout control for the deadline-scheduler build measured `pass_loop` at 22.1 us ± 0.1 us, `arithmetic_loop` at 25.4 us ± 0.1 us, and `listcomp_work` at 35.9 us ± 0.7 us, so the active timeout overhead is effectively lost in noise for these workloads. + +The tradeoff is visible in the empty enter/exit benchmark: the scheduler-based approach pays more to register and unregister a timeout block. For very tiny timeout scopes, the OP approach is cheaper. For timeout scopes around non-trivial Python work, avoiding repeated pending handling while the timeout is active seems to dominate. + +For `_sre`, I made the engine cooperative by reusing its existing periodic signal-check hook. `_sre` already has a `MAYBE_CHECK_SIGNALS` path that runs every 4096 iterations of the matching engine and calls `PyErr_CheckSignals()`. The prototype adds: + +```c +if (_PyThreadState_CheckCancellation(_PyThreadState_GET())) { + RETURN_ERROR(SRE_ERROR_INTERRUPTED); +} +``` + +at the same point. `_PyThreadState_CheckCancellation()` drains the cooperative cancellation request bit, checks whether an expired timeout is the reason, and sets `TimeoutError` when the current thread has an expired timeout block. If no cancellation is pending and the active timeout has not expired, it returns immediately. If it has expired, `_sre` returns through its existing interrupted-error path, and the Python-level regex call propagates the timeout exception. + +That means catastrophic backtracking can be interrupted without adding a public `re`-specific timeout parameter and without adding a separate polling mechanism to the regex engine. It also keeps the polling cadence tied to `_sre`'s existing signal-check cadence instead of checking on every regex VM transition. + +This still has the usual async-exception caveat: a timeout in pure Python is delivered as a normal Python exception at an eval-breaker boundary, so `finally` blocks and context-manager exits run, but arbitrary Python code can still be interrupted between bytecodes just like with `KeyboardInterrupt`. + +
+Benchmark script + +```python +#!/usr/bin/env python3 +"""Compare timeout prototype overhead against a clean baseline.""" + +from __future__ import annotations + +from contextlib import nullcontext +import pyperf + + +_sink = 0 +_timeout_warmed = False + + +def _consume(value: int) -> None: + global _sink + _sink ^= value + + +def _has_timeout(implementation: str) -> bool: + return implementation != "baseline" + + +def _get_timeout(seconds: float): + if seconds <= 0: + raise ValueError("timeout must be positive for the overhead benchmark") + from contextlib import timeout + return timeout(seconds) + + +def _warm_timeout(implementation: str, timeout_seconds: float) -> None: + global _timeout_warmed + if _has_timeout(implementation) and not _timeout_warmed: + with _get_timeout(timeout_seconds): + pass + _timeout_warmed = True + + +def _pass_loop(inner_loops: int) -> int: + total = 0 + for value in range(inner_loops): + total ^= value + return total + + +def _arithmetic_loop(inner_loops: int) -> int: + total = 0 + for value in range(inner_loops): + total += value + return total + + +def _listcomp_work(inner_loops: int) -> int: + values = [value * value for value in range(inner_loops)] + return values[-1] if values else 0 + + +def bench_pass_loop( + loops: int, + implementation: str, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + total = 0 + t0 = pyperf.perf_counter() + if _has_timeout(implementation): + with _get_timeout(timeout_seconds): + for _ in range(loops): + total ^= _pass_loop(inner_loops) + else: + for _ in range(loops): + total ^= _pass_loop(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_arithmetic_loop( + loops: int, + implementation: str, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + total = 0 + t0 = pyperf.perf_counter() + if _has_timeout(implementation): + with _get_timeout(timeout_seconds): + for _ in range(loops): + total += _arithmetic_loop(inner_loops) + else: + for _ in range(loops): + total += _arithmetic_loop(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_listcomp_work( + loops: int, + implementation: str, + inner_loops: int, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + total = 0 + t0 = pyperf.perf_counter() + if _has_timeout(implementation): + with _get_timeout(timeout_seconds): + for _ in range(loops): + total ^= _listcomp_work(inner_loops) + else: + for _ in range(loops): + total ^= _listcomp_work(inner_loops) + dt = pyperf.perf_counter() - t0 + _consume(total) + return dt + + +def bench_context_enter_exit( + loops: int, + implementation: str, + timeout_seconds: float, +) -> float: + _warm_timeout(implementation, timeout_seconds) + if _has_timeout(implementation): + t0 = pyperf.perf_counter() + for _ in range(loops): + with _get_timeout(timeout_seconds): + pass + else: + t0 = pyperf.perf_counter() + for _ in range(loops): + with nullcontext(): + pass + return pyperf.perf_counter() - t0 + + +def add_cmdline_args(cmd, args) -> None: + cmd.extend(("--implementation", args.implementation)) + cmd.extend(("--inner-loops", str(args.inner_loops))) + cmd.extend(("--timeout-seconds", str(args.timeout_seconds))) + + +def main() -> None: + runner = pyperf.Runner(add_cmdline_args=add_cmdline_args) + runner.argparser.add_argument( + "--implementation", + choices=("baseline", "op", "ours"), + required=True, + help="'ours' is the deadline-scheduler prototype", + ) + runner.argparser.add_argument( + "--inner-loops", + type=int, + default=1000, + help="work per calibrated pyperf loop", + ) + runner.argparser.add_argument( + "--timeout-seconds", + type=float, + default=3600.0, + help="non-expiring timeout duration", + ) + args = runner.parse_args() + + runner.metadata["timeout_implementation"] = args.implementation + runner.metadata["timeout_inner_loops"] = str(args.inner_loops) + runner.metadata["timeout_seconds"] = str(args.timeout_seconds) + + runner.bench_time_func( + "pass_loop", + bench_pass_loop, + args.implementation, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "arithmetic_loop", + bench_arithmetic_loop, + args.implementation, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "listcomp_work", + bench_listcomp_work, + args.implementation, + args.inner_loops, + args.timeout_seconds, + ) + runner.bench_time_func( + "context_enter_exit", + bench_context_enter_exit, + args.implementation, + args.timeout_seconds, + ) + + +if __name__ == "__main__": + main() +``` + +