From c7bd567e41fb747ec38487256caa18a11166bf98 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Mon, 2 Feb 2026 12:55:31 +0200 Subject: [PATCH 1/2] gh-75572: Speed up test_xpickle Run a long living subprocess which handles multiple requests instead of running a new subprocess for each request. --- Lib/test/test_xpickle.py | 103 ++++++++++++++++++++++++++----------- Lib/test/xpickle_worker.py | 35 ++++++++++--- 2 files changed, 101 insertions(+), 37 deletions(-) diff --git a/Lib/test/test_xpickle.py b/Lib/test/test_xpickle.py index 158f27dce4fdc2..ae2e293ed9dc2c 100644 --- a/Lib/test/test_xpickle.py +++ b/Lib/test/test_xpickle.py @@ -3,6 +3,7 @@ import io import os import pickle +import struct import subprocess import sys import unittest @@ -83,9 +84,19 @@ def have_python_version(py_version): return py_executable_map.get(py_version, None) -@support.requires_resource('cpu') +def read_exact(f, n): + buf = b'' + while len(buf) < n: + chunk = f.read(n - len(buf)) + if not chunk: + raise EOFError + buf += chunk + return buf + + class AbstractCompatTests(pickletester.AbstractPickleTests): py_version = None + worker = None @classmethod def setUpClass(cls): @@ -93,6 +104,7 @@ def setUpClass(cls): if not have_python_version(cls.py_version): py_version_str = ".".join(map(str, cls.py_version)) raise unittest.SkipTest(f'Python {py_version_str} not available') + cls.addClassCleanup(cls.close_worker) # Override the default pickle protocol to match what xpickle worker # will be running. highest_protocol = highest_proto_for_py_version(cls.py_version) @@ -101,8 +113,31 @@ def setUpClass(cls): cls.enterClassContext(support.swap_attr(pickle, 'HIGHEST_PROTOCOL', highest_protocol)) - @staticmethod - def send_to_worker(python, data): + @classmethod + def start_worker(cls): + target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py') + worker = subprocess.Popen([*python, target], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # For windows bpo-17023. + shell=is_windows) + cls.worker = worker + + @classmethod + def close_worker(cls): + worker = cls.worker + if worker is None: + return + cls.worker = None + worker.stdin.close() + worker.stdout.close() + worker.stderr.close() + worker.terminate() + worker.wait() + + @classmethod + def send_to_worker(cls, python, data): """Bounce a pickled object through another version of Python. This will send data to a child process where it will be unpickled, then repickled and sent back to the parent process. @@ -112,33 +147,40 @@ def send_to_worker(python, data): Returns: The pickled data received from the child process. """ - target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py') - worker = subprocess.Popen([*python, target], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - # For windows bpo-17023. - shell=is_windows) - stdout, stderr = worker.communicate(data) - if worker.returncode == 0: - return stdout - # if the worker fails, it will write the exception to stdout + worker = cls.worker + if worker is None: + target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py') + worker = subprocess.Popen([*python, target], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # For windows bpo-17023. + shell=is_windows) + cls.worker = worker + try: - exception = pickle.loads(stdout) - except (pickle.UnpicklingError, EOFError): + worker.stdin.write(struct.pack('!i', len(data)) + data) + worker.stdin.flush() + + size, = struct.unpack('!i', read_exact(worker.stdout, 4)) + if size > 0: + return read_exact(worker.stdout, size) + # if the worker fails, it will write the exception to stdout + if size < 0: + stdout = read_exact(worker.stdout, -size) + try: + exception = pickle.loads(stdout) + except (pickle.UnpicklingError, EOFError): + pass + else: + if isinstance(exception, Exception): + # To allow for tests which test for errors. + raise exception + _, stderr = worker.communicate() raise RuntimeError(stderr) - else: - if support.verbose > 1: - print() - print(f'{data = }') - print(f'{stdout = }') - print(f'{stderr = }') - if isinstance(exception, Exception): - # To allow for tests which test for errors. - raise exception - else: - raise RuntimeError(stderr) - + except: + cls.close_worker() + raise def dumps(self, arg, proto=0, **kwargs): # Skip tests that require buffer_callback arguments since @@ -148,9 +190,8 @@ def dumps(self, arg, proto=0, **kwargs): self.skipTest('Test does not support "buffer_callback" argument.') f = io.BytesIO() p = self.pickler(f, proto, **kwargs) - p.dump((proto, arg)) - f.seek(0) - data = bytes(f.read()) + p.dump(arg) + data = struct.pack('!i', proto) + f.getvalue() python = py_executable_map[self.py_version] return self.send_to_worker(python, data) diff --git a/Lib/test/xpickle_worker.py b/Lib/test/xpickle_worker.py index 3fd957f4a0b939..1b49515123c6ab 100644 --- a/Lib/test/xpickle_worker.py +++ b/Lib/test/xpickle_worker.py @@ -2,6 +2,7 @@ # pickles in a different Python version. import os import pickle +import struct import sys @@ -24,16 +25,38 @@ sources = f.read() exec(sources, vars(test_module)) +def read_exact(f, n): + buf = b'' + while len(buf) < n: + chunk = f.read(n - len(buf)) + if not chunk: + raise EOFError + buf += chunk + return buf in_stream = getattr(sys.stdin, 'buffer', sys.stdin) out_stream = getattr(sys.stdout, 'buffer', sys.stdout) try: - message = pickle.load(in_stream) - protocol, obj = message - pickle.dump(obj, out_stream, protocol) -except Exception as e: + while True: + size, = struct.unpack('!i', read_exact(in_stream, 4)) + if not size: + break + data = read_exact(in_stream, size) + protocol, = struct.unpack('!i', data[:4]) + obj = pickle.loads(data[4:]) + data = pickle.dumps(obj, protocol) + out_stream.write(struct.pack('!i', len(data)) + data) + out_stream.flush() +except Exception as exc: # dump the exception to stdout and write to stderr, then exit - pickle.dump(e, out_stream) - sys.stderr.write(repr(e)) + try: + data = pickle.dumps(exc) + out_stream.write(struct.pack('!i', -len(data)) + data) + out_stream.flush() + except Exception: + out_stream.write(struct.pack('!i', 0)) + out_stream.flush() + sys.stderr.write(repr(exc)) + sys.stderr.flush() sys.exit(1) From 1081bffebce1e1f79f1e780412279647228aefc5 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Mon, 2 Feb 2026 15:40:10 +0200 Subject: [PATCH 2/2] Cleanup. --- Lib/test/test_xpickle.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_xpickle.py b/Lib/test/test_xpickle.py index ae2e293ed9dc2c..d87c671d4f5394 100644 --- a/Lib/test/test_xpickle.py +++ b/Lib/test/test_xpickle.py @@ -104,7 +104,7 @@ def setUpClass(cls): if not have_python_version(cls.py_version): py_version_str = ".".join(map(str, cls.py_version)) raise unittest.SkipTest(f'Python {py_version_str} not available') - cls.addClassCleanup(cls.close_worker) + cls.addClassCleanup(cls.finish_worker) # Override the default pickle protocol to match what xpickle worker # will be running. highest_protocol = highest_proto_for_py_version(cls.py_version) @@ -114,7 +114,7 @@ def setUpClass(cls): highest_protocol)) @classmethod - def start_worker(cls): + def start_worker(cls, python): target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py') worker = subprocess.Popen([*python, target], stdin=subprocess.PIPE, @@ -123,9 +123,10 @@ def start_worker(cls): # For windows bpo-17023. shell=is_windows) cls.worker = worker + return worker @classmethod - def close_worker(cls): + def finish_worker(cls): worker = cls.worker if worker is None: return @@ -149,14 +150,7 @@ def send_to_worker(cls, python, data): """ worker = cls.worker if worker is None: - target = os.path.join(os.path.dirname(__file__), 'xpickle_worker.py') - worker = subprocess.Popen([*python, target], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - # For windows bpo-17023. - shell=is_windows) - cls.worker = worker + worker = cls.start_worker(python) try: worker.stdin.write(struct.pack('!i', len(data)) + data) @@ -179,7 +173,7 @@ def send_to_worker(cls, python, data): _, stderr = worker.communicate() raise RuntimeError(stderr) except: - cls.close_worker() + cls.finish_worker() raise def dumps(self, arg, proto=0, **kwargs):