From 06b10043ecb67df7b3af739e5044e69a15193d23 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 16 Jun 2026 11:38:35 +0200 Subject: [PATCH] repository: drop the dead "wait" arg, make put/delete always synchronous The new borgstore-based Repository inherited put(..., wait=) / delete(..., wait=) and async_response() from the legacy RemoteRepository's pipelined RPC protocol. In the new architecture these are dead: borgstore's API is strictly synchronous, so put/delete ignored wait and ran synchronously, and async_response() was an empty stub always returning None. Remove wait and async_response() so the synchronous behavior is explicit, and clean up every caller that still threaded wait=False / drained async_response() (cache.add_chunk, archive.py, transfer_cmd.py, and the archive_test mock). The legacy repository/remote keep their real wait/async implementation. Co-Authored-By: Claude Opus 4.8 --- src/borg/archive.py | 13 +++---------- src/borg/archiver/transfer_cmd.py | 9 ++------- src/borg/cache.py | 15 ++------------- src/borg/repository.py | 24 +++--------------------- src/borg/testsuite/archive_test.py | 5 ++--- 5 files changed, 12 insertions(+), 54 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 317ffb04e9..fd7d84d8e6 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -382,11 +382,8 @@ def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS): self.stats = stats def write_chunk(self, chunk): - id_, _ = self.cache.add_chunk( - self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False, ro_type=ROBJ_ARCHIVE_STREAM - ) + id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, ro_type=ROBJ_ARCHIVE_STREAM) logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}") - self.cache.repository.async_response(wait=False) return id_ @@ -688,8 +685,6 @@ def save(self, name=None, comment=None, timestamp=None, stats=None, additional_m raise Error("%s - archive too big (issue #1473)!" % err_msg) else: raise - while self.repository.async_response(wait=True) is not None: - pass self.manifest.archives.create(name, self.id, metadata.time) self.manifest.write() return metadata @@ -1176,8 +1171,7 @@ def chunk_processor(chunk): started_hashing = time.monotonic() chunk_id, data = cached_hash(chunk, self.key.id_hash) stats.hashing_time += time.monotonic() - started_hashing - chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False, ro_type=ROBJ_FILE_STREAM) - self.cache.repository.async_response(wait=False) + chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, ro_type=ROBJ_FILE_STREAM) return chunk_entry item.chunks = [] @@ -2271,8 +2265,7 @@ def chunk_processor(self, target, chunk): size = len(data) if chunk_id in self.seen_chunks: return self.cache.reuse_chunk(chunk_id, size, target.stats) - chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) - self.cache.repository.async_response(wait=False) + chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, ro_type=ROBJ_FILE_STREAM) self.seen_chunks.add(chunk_entry.id) return chunk_entry diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index d9af6803d5..d51e6261f0 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -62,10 +62,7 @@ def transfer_chunks( present += size else: # Add the new chunk to the repository - chunk_entry = cache.add_chunk( - chunk_id, {}, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM - ) - cache.repository.async_response(wait=False) + chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=archive.stats, ro_type=ROBJ_FILE_STREAM) transfer += size chunks.append(chunk_entry) else: @@ -101,7 +98,6 @@ def transfer_chunks( meta, data, stats=archive.stats, - wait=False, compress=False, size=size, ctype=meta["ctype"], @@ -112,11 +108,10 @@ def transfer_chunks( # always decompress and re-compress file data chunks meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM) chunk_entry = cache.add_chunk( - chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM + chunk_id, meta, data, stats=archive.stats, ro_type=ROBJ_FILE_STREAM ) else: raise ValueError(f"unsupported recompress mode: {recompress}") - cache.repository.async_response(wait=False) chunks.append(chunk_entry) transfer += size else: diff --git a/src/borg/cache.py b/src/borg/cache.py index b06ef4cdff..f6713b0f54 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -722,18 +722,7 @@ def reuse_chunk(self, id, size, stats): return ChunkListEntry(id, size) def add_chunk( - self, - id, - meta, - data, - *, - stats, - wait=True, - compress=True, - size=None, - ctype=None, - clevel=None, - ro_type=ROBJ_FILE_STREAM, + self, id, meta, data, *, stats, compress=True, size=None, ctype=None, clevel=None, ro_type=ROBJ_FILE_STREAM ): assert ro_type is not None if size is None: @@ -752,7 +741,7 @@ def add_chunk( cdata = self.repo_objs.format( id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type ) - pack_results = self.repository.put(id, cdata, wait=wait) + pack_results = self.repository.put(id, cdata) self.last_refresh_dt = now # .put also refreshed the lock self.chunks.add(id, size) self.chunks.update_pack_info(pack_results) diff --git a/src/borg/repository.py b/src/borg/repository.py index 1592ff2314..2190de6225 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -785,12 +785,9 @@ def get_many(self, ids, read_data=True, raise_missing=True): for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) - def put(self, id, data, wait=True): + def put(self, id, data): """put a repo object - Note: when doing calls with wait=False this gets async and caller must - deal with async results / exceptions later. - Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for every chunk written to disk this call. At max_count=1 this is always one entry. @@ -802,12 +799,8 @@ def put(self, id, data, wait=True): # PackWriter shares this repository's index, so add() triggers the lazy build itself. return self._pack_writer.add(id, data) - def delete(self, id, wait=True): - """delete a repo object - - Note: when doing calls with wait=False this gets async and caller must - deal with async results / exceptions later. - """ + def delete(self, id): + """delete a repo object""" self._lock_refresh() pack_id = id # N=1: pack_id == chunk_id key = "packs/" + bin_to_hex(pack_id) @@ -816,17 +809,6 @@ def delete(self, id, wait=True): except StoreObjectNotFound: raise self.ObjectNotFound(id, str(self._location)) from None - def async_response(self, wait=True): - """Get one async result (only applies to remote repositories). - - async commands (== calls with wait=False, e.g. delete and put) have no results, - but may raise exceptions. These async exceptions must get collected later via - async_response() calls. Repeat the call until it returns None. - The previous calls might either return one (non-None) result or raise an exception. - If wait=True is given and there are outstanding responses, it will wait for them - to arrive. With wait=False, it will only return already received responses. - """ - def break_lock(self): Lock(self.store).break_lock() diff --git a/src/borg/testsuite/archive_test.py b/src/borg/testsuite/archive_test.py index 3c37f31795..d7cb7caa9c 100644 --- a/src/borg/testsuite/archive_test.py +++ b/src/borg/testsuite/archive_test.py @@ -145,14 +145,13 @@ def test_timestamp_parsing(monkeypatch, isoformat, expected): class MockCache: class MockRepo: - def async_response(self, wait=True): - pass + pass def __init__(self): self.objects = {} self.repository = self.MockRepo() - def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None): + def add_chunk(self, id, meta, data, stats=None, ro_type=None): assert ro_type is not None self.objects[id] = data return id, len(data)