diff --git a/src/borg/archive.py b/src/borg/archive.py index 317ffb04e9..98140eefa3 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1861,9 +1861,11 @@ def verify_data(self): # we must decompress, so it'll call assert_id() in there: self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase: - # failed twice -> get rid of this chunk + # failed twice -> get rid of this chunk. + # N=1: the defect chunk is alone in its pack; drop the pack. N>1 needs compaction. + pack_id = self.chunks[defect_chunk].pack_id del self.chunks[defect_chunk] - self.repository.delete(defect_chunk) + self.repository.store_delete("packs/" + bin_to_hex(pack_id)) logger.debug("chunk %s deleted.", bin_to_hex(defect_chunk)) else: logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk)) diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 0820d803a9..5fd19dc1a4 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -7,11 +7,11 @@ from ..helpers import get_cache_dir from ..helpers.argparsing import ArgumentParser from ..constants import * # NOQA -from ..hashindex import ChunkIndex, ChunkIndexEntry +from ..hashindex import ChunkIndex from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest -from ..repository import Repository, repo_lister +from ..repository import Repository from ..logger import create_logger @@ -49,17 +49,11 @@ def garbage_collect(self): def get_repository_chunks(self) -> ChunkIndex: """return a chunks index""" - if self.stats: # slow method: build a fresh chunks index, with stored chunk sizes. + if self.stats: + # slow but thorough: scan the pack headers for real sizes/locations and to catch objects + # missing from the cached index. Start unused (F_NONE); analyze_archives marks used ones. logger.info("Getting object IDs present in the repository...") - chunks = ChunkIndex() - for pack_id, pack_size in repo_lister(self.repository, limit=LIST_SCAN_LIMIT): - # we add this id to the chunks index (as unused chunk), because - # we do not know yet whether it is actually referenced from some archives. - chunk_id = pack_id # N=1: chunk_id == pack_id - obj_size = pack_size # true for N=1 - chunks[chunk_id] = ChunkIndexEntry( - flags=ChunkIndex.F_NONE, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size - ) + chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, init_flags=ChunkIndex.F_NONE) else: # faster: rely on existing chunks index (with flags F_NONE and size 0). logger.info("Getting object IDs from cached chunks index...") chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) @@ -191,7 +185,8 @@ def report_and_delete(self): ) for i, id in enumerate(unused): pi.show(i) - self.repository.delete(id) + # N=1: the chunk is alone in its pack, so dropping the pack frees just it; N>1 needs compaction. + self.repository.store_delete("packs/" + bin_to_hex(self.chunks[id].pack_id)) del self.chunks[id] pi.finish() repo_size_after = self.repository_size diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index 723d413af5..0ef84804ad 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -13,7 +13,7 @@ from ..helpers.argparsing import ArgumentParser from ..manifest import Manifest from ..platform import get_process_id -from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister +from ..repository import Repository, LIST_SCAN_LIMIT, StoreObjectNotFound, repo_lister from ..repoobj import RepoObj from ._common import with_repository, Highlander @@ -292,11 +292,19 @@ def do_debug_delete_obj(self, args, repository): except ValueError: print("object id %s is invalid." % hex_id) else: - try: - repository.delete(id) - print("object %s deleted." % hex_id) - except Repository.ObjectNotFound: + entry = repository.chunks.get(id) + if entry is None: print("object %s not found." % hex_id) + else: + # N=1: one chunk per pack, so dropping the pack removes just this object; N>1 needs compaction. + try: + repository.store_delete("packs/" + bin_to_hex(entry.pack_id)) + except StoreObjectNotFound: + # index points at an already-gone pack (stale entry) + print("object %s not found." % hex_id) + else: + del repository.chunks[id] + print("object %s deleted." % hex_id) print("Done.") def do_debug_convert_profile(self, args): diff --git a/src/borg/cache.py b/src/borg/cache.py index b06ef4cdff..6f2e7bc30d 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -32,6 +32,7 @@ from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError from .manifest import Manifest from .platform import SaveFile +from .repoobj import RepoObj from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister from .security import SecurityManager, assert_secure # noqa: F401 @@ -619,7 +620,9 @@ def read_chunkindex_from_repo(repository, hash): logger.debug(f"{index_name} is invalid.") -def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): +def build_chunkindex_from_repo( + repository, *, disable_caches=False, cache_immediately=False, init_flags=ChunkIndex.F_USED +): # first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes: if not disable_caches: hashes = list_chunkindex_hashes(repository) @@ -642,26 +645,32 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi chunks.clear_new() return chunks # if we didn't get anything from the cache, compute the ChunkIndex the slow way: - logger.debug("querying the chunk IDs list from the repo...") + logger.debug("rebuilding the chunk index from the repo the slow way...") chunks = ChunkIndex() t0 = perf_counter() num_chunks = 0 - # The repo says it has these chunks, so we assume they are referenced/used chunks. - # We do not know the plaintext size (!= stored_size), thus we set size = 0. - # - # IMPORTANT (N=1 only): listing yields pack_ids, not per-chunk locations. We can only - # reconstruct the index here under the N=1 assumption -- pack_id == chunk_id, one chunk per - # pack at offset 0 spanning the whole pack. At N>1 this is wrong: a cold rebuild would have to - # open each pack and read its header to recover the per-chunk offsets and sizes. Until that - # exists, Repository.get()'s range-load is only correct while a persisted/cached chunk index - # is available; a cold rebuild from a bare repo listing silently falls back to N=1 semantics. - for pack_id, pack_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): - num_chunks += 1 - chunk_id = pack_id # N=1: chunk_id == pack_id - obj_size = pack_size # true for N=1 - chunks[chunk_id] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size - ) + # By default we assume the repo's chunks are used; callers that compute usage themselves + # (e.g. compact) pass init_flags=F_NONE. Plaintext size is unknown here (!= stored size), so size=0. + if isinstance(repository, Repository): + # Read the pack object headers at the store level. Don't call Repository.list() here: it + # iterates this same index we are building, so it would recurse. The headers also give each + # object's real (chunk_id, offset, size), so this is not limited to one object per pack. + for info in repository.store_list("packs"): + pack_id = hex_to_bin(info.name) + pack = repository.store_load("packs/" + info.name) + for chunk_id, obj_offset, obj_size in RepoObj.iter_object_headers(pack): + num_chunks += 1 + chunks[chunk_id] = ChunkIndexEntry( + flags=init_flags, size=0, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size + ) + else: + # Legacy repo: list() reads its own segment index (no recursion). get() routes through that + # index, so the pack_id/offset fields here are just placeholders. + for chunk_id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + num_chunks += 1 + chunks[chunk_id] = ChunkIndexEntry( + flags=init_flags, size=0, pack_id=chunk_id, obj_offset=0, obj_size=stored_size + ) # Cache does not contain the manifest. if not isinstance(repository, Repository): del chunks[Manifest.MANIFEST_ID] diff --git a/src/borg/repoobj.py b/src/borg/repoobj.py index 65d530ca65..8b5be86386 100644 --- a/src/borg/repoobj.py +++ b/src/borg/repoobj.py @@ -38,6 +38,22 @@ def extract_crypted_data(cls, data: bytes) -> bytes: raise IntegrityError(f"object size inconsistent: expected {overall_expected_size} bytes, got {len(data)}") return data[hdr_size + hdr.meta_size :] # crypted data + @classmethod + def iter_object_headers(cls, pack: bytes): + """Yield (chunk_id, obj_offset, obj_size) for every object stored in a pack. + + Each object's identity and extent come from its on-disk header, so callers do not need to + know the pack file name. Works for one object per pack and for several. + """ + hdr_size = cls.obj_header.size + offset = 0 + total = len(pack) + while offset + hdr_size <= total: + hdr = cls.ObjHeader(*cls.obj_header.unpack(pack[offset : offset + hdr_size])) + obj_size = hdr_size + hdr.meta_size + hdr.data_size + yield hdr.chunk_id, offset, obj_size + offset += obj_size + def __init__(self, key): self.key = key # Some commands write new chunks (e.g. rename) but don't take a --compression argument. This duplicates diff --git a/src/borg/repository.py b/src/borg/repository.py index 1592ff2314..7e9437a621 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -658,13 +658,15 @@ def check_object(obj): # add all existing objects to the index. # borg check: the index may have corrupted objects (we did not delete them) # borg check --repair: the index will only have non-corrupted objects. + # the pack file name is the pack_id (sha256(pack) at N>1 or with the + # BORG_TESTONLY_SHA256_PACK_ID switch), which is not the chunk_id, so recover + # each object's real (chunk_id, offset, size) from its on-disk header rather + # than assuming pack file name == chunk_id. pack_id = hex_to_bin(info.name) - pack_size = info.size - chunk_id = pack_id # N=1: chunk_id == pack_id - obj_size = pack_size # correct for N=1 - chunks[chunk_id] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size - ) + for chunk_id, obj_offset, obj_size in RepoObj.iter_object_headers(obj): + chunks[chunk_id] = ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size + ) now = time.monotonic() if now > t_last_checkpoint + 300: # checkpoint every 5 mins t_last_checkpoint = now @@ -705,28 +707,22 @@ def list(self, limit=None, marker=None): list infos starting from after id . each info is a tuple (id, storage_size). """ - collect = True if marker is None else False + # Yield chunk_ids from the chunk index. (Listing the packs/ dir would yield pack file names, + # i.e. pack_ids, which are not chunk_ids.) iteritems() has no marker arg, so we skip to + # ourselves; index order is stable unless the index is mutated, which is all the + # marker pagination needs. + self._lock_refresh() + collect = marker is None result = [] - infos = self.store.list("packs") # generator yielding ItemInfos - while True: - self._lock_refresh() - try: - info = next(infos) - except StoreObjectNotFound: - break # can happen e.g. if "packs" does not exist, pointless to continue in that case - except StopIteration: - break - else: - pack_id = hex_to_bin(info.name) - chunk_id = pack_id # N=1: chunk_id == pack_id - if collect: - chunk_size = info.size # only correct for N=1 - result.append((chunk_id, chunk_size)) - if len(result) == limit: - break - elif chunk_id == marker: - collect = True - # note: do not collect the marker id + for chunk_id, entry in self.chunks.iteritems(): + if entry.pack_id == UNKNOWN_BYTES32: + continue # buffered in PackWriter, not flushed to a pack yet + if collect: + result.append((chunk_id, entry.obj_size)) + if len(result) == limit: + break + elif chunk_id == marker: + collect = True # start collecting after the marker; do not include the marker itself return result def get(self, id, read_data=True, raise_missing=True): @@ -809,12 +805,14 @@ def delete(self, id, wait=True): deal with async results / exceptions later. """ self._lock_refresh() - pack_id = id # N=1: pack_id == chunk_id - key = "packs/" + bin_to_hex(pack_id) - try: - self.store.delete(key) - except StoreObjectNotFound: - raise self.ObjectNotFound(id, str(self._location)) from None + # We can not remove one object by dropping its whole pack without losing the pack's other + # objects; real removal is store_delete at the pack level (compact). For now just check the + # object exists (ObjectNotFound contract), log, and do nothing. + # TODO: delete a single object once a pack can hold more than one (N>1). + entry = self.chunks.get(id) + if entry is None: + raise self.ObjectNotFound(id, str(self._location)) + logger.warning("ignoring deletion of %s in %s", bin_to_hex(id), bin_to_hex(entry.pack_id)) def async_response(self, wait=True): """Get one async result (only applies to remote repositories). diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index 64422cb769..90b0d1dfd6 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -20,6 +20,7 @@ from ...constants import * # NOQA from ...helpers import Location, umount from ...helpers import EXIT_SUCCESS +from ...helpers import bin_to_hex from ...helpers import init_ec_warnings from ...logger import flush_logging from ...manifest import Manifest @@ -179,6 +180,17 @@ def open_archive(repo_path, name): return archive, repository +def delete_chunk(repository, id): + """Drop the pack holding chunk `id` (test damage helper). + + Repository.delete is a no-op now, so tests that need a chunk to really vanish drop its whole + pack at the store level. Works at N=1 (one chunk per pack). The pack is resolved through the + chunk index, since the pack file name is the pack_id, which need not equal the chunk_id. + """ + entry = repository.chunks.get(id) + repository.store_delete("packs/" + bin_to_hex(entry.pack_id)) + + def open_repository(archiver): if archiver.get_kind() == "remote": return Repository(Location(archiver.repository_location), exclusive=True) diff --git a/src/borg/testsuite/archiver/check_cmd_test.py b/src/borg/testsuite/archiver/check_cmd_test.py index fdd7651afd..880631ea01 100644 --- a/src/borg/testsuite/archiver/check_cmd_test.py +++ b/src/borg/testsuite/archiver/check_cmd_test.py @@ -11,7 +11,7 @@ from ...manifest import Manifest from ...repository import Repository from ..repository_test import fchunk -from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION +from . import cmd, src_file, create_src_archive, open_archive, delete_chunk, generate_archiver_tests, RK_ENCRYPTION pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -162,7 +162,7 @@ def test_missing_file_chunk(archivers, request): if item.path.endswith(src_file): valid_chunks = item.chunks killed_chunk = valid_chunks[-1] - repository.delete(killed_chunk.id) + delete_chunk(repository, killed_chunk.id) break else: pytest.fail("should not happen") # convert 'fail' @@ -198,7 +198,7 @@ def test_missing_archive_item_chunk(archivers, request): check_cmd_setup(archiver) archive, repository = open_archive(archiver.repository_path, "archive1") with repository: - repository.delete(archive.metadata.items[0]) + delete_chunk(repository, archive.metadata.items[0]) cmd(archiver, "check", exit_code=1) cmd(archiver, "check", "--repair", exit_code=0) cmd(archiver, "check", exit_code=0) @@ -209,7 +209,7 @@ def test_missing_archive_metadata(archivers, request): check_cmd_setup(archiver) archive, repository = open_archive(archiver.repository_path, "archive1") with repository: - repository.delete(archive.id) + delete_chunk(repository, archive.id) cmd(archiver, "check", exit_code=1) cmd(archiver, "check", "--repair", exit_code=0) cmd(archiver, "check", exit_code=0) @@ -445,6 +445,9 @@ def test_empty_repository(archivers, request): pytest.skip("only works locally") check_cmd_setup(archiver) with Repository(archiver.repository_location, exclusive=True) as repository: - for id, _ in repository.list(): - repository.delete(id) + # empty the repo by dropping every pack file directly via the store. We iterate the actual + # packs/ listing (the file names are the pack_ids), so this does not depend on what list() + # yields or on pack_id == chunk_id. + for info in repository.store_list("packs"): + repository.store_delete("packs/" + info.name) cmd(archiver, "check", exit_code=1) diff --git a/src/borg/testsuite/archiver/extract_cmd_test.py b/src/borg/testsuite/archiver/extract_cmd_test.py index 7a19d46b5a..d1b1683a11 100644 --- a/src/borg/testsuite/archiver/extract_cmd_test.py +++ b/src/borg/testsuite/archiver/extract_cmd_test.py @@ -29,6 +29,7 @@ generate_archiver_tests, create_src_archive, open_archive, + delete_chunk, src_file, ) @@ -800,7 +801,7 @@ def test_extract_file_with_missing_chunk(archivers, request): for item in archive.iter_items(): if item.path.endswith(src_file): chunk = item.chunks[-1] - repository.delete(chunk.id) + delete_chunk(repository, chunk.id) break else: assert False # missed the file diff --git a/src/borg/testsuite/archiver/mount_cmds_test.py b/src/borg/testsuite/archiver/mount_cmds_test.py index c979ba4e3d..f55b1e4bd9 100644 --- a/src/borg/testsuite/archiver/mount_cmds_test.py +++ b/src/borg/testsuite/archiver/mount_cmds_test.py @@ -20,6 +20,7 @@ from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported from ..platform.platform_test import fakeroot_detected from . import RK_ENCRYPTION, cmd, assert_dirs_equal, create_regular_file, create_src_archive, open_archive, src_file +from . import delete_chunk from . import requires_hardlinks, _extract_hardlinks_setup, fuse_mount, create_test_files, generate_archiver_tests pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -234,7 +235,7 @@ def test_fuse_allow_damaged_files(archivers, request): with repository: for item in archive.iter_items(): if item.path.endswith(src_file): - repository.delete(item.chunks[-1].id) + delete_chunk(repository, item.chunks[-1].id) path = item.path # store full path for later break else: diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 5f5c8bdad9..a0f6978555 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -5,7 +5,7 @@ import pytest from ..helpers import IntegrityError, Location, bin_to_hex from ..hashindex import ChunkIndex -from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter +from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter, FORCE_SHA256_PACK_ID from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -54,7 +54,9 @@ def reopen(repository, exclusive: bool | None = True, create=False): def fchunk(data, meta=b"", chunk_id=b"\x00" * 32): - # Format chunk: create a raw chunk that has a valid RepoObj layout, but does not use encryption or compression. + # Build a raw chunk with a valid RepoObj layout but no encryption or compression. Pass a unique + # chunk_id when objects must not share a pack: identical bytes hash to the same sha256 pack id, + # so under BORG_TESTONLY_SHA256_PACK_ID they would otherwise collapse into one pack. hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, chunk_id, len(meta), len(data)) assert isinstance(data, bytes) chunk = hdr + meta + data @@ -79,20 +81,16 @@ def pdchunk(chunk): def test_basic_operations(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - repository.put(H(x), fchunk(b"SOMEDATA")) # put() updates _chunks via PackWriter + repository.put(H(x), fchunk(b"SOMEDATA", chunk_id=H(x))) # put() updates _chunks via PackWriter key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" + # delete is a no-op now (see Repository.delete), so the object stays retrievable. repository.delete(key50) - with pytest.raises(Repository.ObjectNotFound): - repository.get(key50) + assert pdchunk(repository.get(key50)) == b"SOMEDATA" # no manual hand-off of the index across reopen: close() persisted it to the repo cache, # and the freshly opened repo rebuilds .chunks from there (or by listing the repo) on its own. with reopen(repository) as repository: - with pytest.raises(Repository.ObjectNotFound): - repository.get(key50) for x in range(100): - if x == 50: - continue assert pdchunk(repository.get(H(x))) == b"SOMEDATA" @@ -142,15 +140,15 @@ def test_consistency(repo_fixtures, request): assert pdchunk(repository.get(H(0))) == b"foo2" repository.put(H(0), fchunk(b"bar")) assert pdchunk(repository.get(H(0))) == b"bar" + # delete is a no-op for now (see Repository.delete): the latest put still wins. repository.delete(H(0)) - with pytest.raises(Repository.ObjectNotFound): - repository.get(H(0)) + assert pdchunk(repository.get(H(0))) == b"bar" def test_list(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - repository.put(H(x), fchunk(b"SOMEDATA")) + repository.put(H(x), fchunk(b"SOMEDATA", chunk_id=H(x))) # unique bytes -> unique pack id repo_list = repository.list() assert len(repo_list) == 100 first_half = repository.list(limit=50) @@ -227,7 +225,10 @@ def test_pack_writer_n1_flush(): assert len(results) == 1 stored_id, pack_id, obj_offset, obj_size = results[0] assert stored_id == chunk_id - assert pack_id == chunk_id # N=1: pack_id == chunk_id + if FORCE_SHA256_PACK_ID: + assert pack_id == sha256(cdata).digest() # sha256 switch: pack is named by its content + else: + assert pack_id == chunk_id # N=1: pack_id == chunk_id assert obj_offset == 0 assert obj_size == len(cdata) @@ -346,7 +347,11 @@ def test_put_marks_id_in_chunk_index(tmp_path): repository.put(id1, fchunk(b"ZEROS")) entry = repository._chunks.get(id1) assert entry is not None - assert entry.pack_id == id1 # N=1: pack_id == chunk_id, set by update_pack_info in put() + if FORCE_SHA256_PACK_ID: + # sha256 switch: the pack is named by its content, not by the chunk_id. + assert entry.pack_id == sha256(fchunk(b"ZEROS")).digest() + else: + assert entry.pack_id == id1 # N=1: pack_id == chunk_id, set by update_pack_info in put() assert entry.size == 0 # uncompressed size filled in by cache layer