Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1861,9 +1861,11 @@ def verify_data(self):
# we must decompress, so it'll call assert_id() in there:
self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE)
except IntegrityErrorBase:
# failed twice -> get rid of this chunk
# failed twice -> get rid of this chunk.
# N=1: the defect chunk is alone in its pack; drop the pack. N>1 needs compaction.
pack_id = self.chunks[defect_chunk].pack_id
del self.chunks[defect_chunk]
self.repository.delete(defect_chunk)
self.repository.store_delete("packs/" + bin_to_hex(pack_id))
logger.debug("chunk %s deleted.", bin_to_hex(defect_chunk))
else:
logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk))
Expand Down
21 changes: 8 additions & 13 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from ..helpers import get_cache_dir
from ..helpers.argparsing import ArgumentParser
from ..constants import * # NOQA
from ..hashindex import ChunkIndex, ChunkIndexEntry
from ..hashindex import ChunkIndex
from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex
from ..helpers import ProgressIndicatorPercent
from ..manifest import Manifest
from ..repository import Repository, repo_lister
from ..repository import Repository

from ..logger import create_logger

Expand Down Expand Up @@ -49,17 +49,11 @@ def garbage_collect(self):

def get_repository_chunks(self) -> ChunkIndex:
"""return a chunks index"""
if self.stats: # slow method: build a fresh chunks index, with stored chunk sizes.
if self.stats:
# slow but thorough: scan the pack headers for real sizes/locations and to catch objects
# missing from the cached index. Start unused (F_NONE); analyze_archives marks used ones.
logger.info("Getting object IDs present in the repository...")
chunks = ChunkIndex()
for pack_id, pack_size in repo_lister(self.repository, limit=LIST_SCAN_LIMIT):
# we add this id to the chunks index (as unused chunk), because
# we do not know yet whether it is actually referenced from some archives.
chunk_id = pack_id # N=1: chunk_id == pack_id
obj_size = pack_size # true for N=1
chunks[chunk_id] = ChunkIndexEntry(
flags=ChunkIndex.F_NONE, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size
)
chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, init_flags=ChunkIndex.F_NONE)
else: # faster: rely on existing chunks index (with flags F_NONE and size 0).
logger.info("Getting object IDs from cached chunks index...")
chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
Expand Down Expand Up @@ -191,7 +185,8 @@ def report_and_delete(self):
)
for i, id in enumerate(unused):
pi.show(i)
self.repository.delete(id)
# N=1: the chunk is alone in its pack, so dropping the pack frees just it; N>1 needs compaction.
self.repository.store_delete("packs/" + bin_to_hex(self.chunks[id].pack_id))
del self.chunks[id]
pi.finish()
repo_size_after = self.repository_size
Expand Down
18 changes: 13 additions & 5 deletions src/borg/archiver/debug_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ..helpers.argparsing import ArgumentParser
from ..manifest import Manifest
from ..platform import get_process_id
from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister
from ..repository import Repository, LIST_SCAN_LIMIT, StoreObjectNotFound, repo_lister
from ..repoobj import RepoObj

from ._common import with_repository, Highlander
Expand Down Expand Up @@ -292,11 +292,19 @@ def do_debug_delete_obj(self, args, repository):
except ValueError:
print("object id %s is invalid." % hex_id)
else:
try:
repository.delete(id)
print("object %s deleted." % hex_id)
except Repository.ObjectNotFound:
entry = repository.chunks.get(id)
if entry is None:
print("object %s not found." % hex_id)
else:
# N=1: one chunk per pack, so dropping the pack removes just this object; N>1 needs compaction.
try:
repository.store_delete("packs/" + bin_to_hex(entry.pack_id))
except StoreObjectNotFound:
# index points at an already-gone pack (stale entry)
print("object %s not found." % hex_id)
else:
del repository.chunks[id]
print("object %s deleted." % hex_id)
print("Done.")

def do_debug_convert_profile(self, args):
Expand Down
45 changes: 27 additions & 18 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
from .manifest import Manifest
from .platform import SaveFile
from .repoobj import RepoObj
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister
from .security import SecurityManager, assert_secure # noqa: F401

Expand Down Expand Up @@ -619,7 +620,9 @@ def read_chunkindex_from_repo(repository, hash):
logger.debug(f"{index_name} is invalid.")


def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
def build_chunkindex_from_repo(
repository, *, disable_caches=False, cache_immediately=False, init_flags=ChunkIndex.F_USED
):
# first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes:
if not disable_caches:
hashes = list_chunkindex_hashes(repository)
Expand All @@ -642,26 +645,32 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
chunks.clear_new()
return chunks
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
logger.debug("querying the chunk IDs list from the repo...")
logger.debug("rebuilding the chunk index from the repo the slow way...")
chunks = ChunkIndex()
t0 = perf_counter()
num_chunks = 0
# The repo says it has these chunks, so we assume they are referenced/used chunks.
# We do not know the plaintext size (!= stored_size), thus we set size = 0.
#
# IMPORTANT (N=1 only): listing yields pack_ids, not per-chunk locations. We can only
# reconstruct the index here under the N=1 assumption -- pack_id == chunk_id, one chunk per
# pack at offset 0 spanning the whole pack. At N>1 this is wrong: a cold rebuild would have to
# open each pack and read its header to recover the per-chunk offsets and sizes. Until that
# exists, Repository.get()'s range-load is only correct while a persisted/cached chunk index
# is available; a cold rebuild from a bare repo listing silently falls back to N=1 semantics.
for pack_id, pack_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
num_chunks += 1
chunk_id = pack_id # N=1: chunk_id == pack_id
obj_size = pack_size # true for N=1
chunks[chunk_id] = ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size
)
# By default we assume the repo's chunks are used; callers that compute usage themselves
# (e.g. compact) pass init_flags=F_NONE. Plaintext size is unknown here (!= stored size), so size=0.
if isinstance(repository, Repository):
# Read the pack object headers at the store level. Don't call Repository.list() here: it
# iterates this same index we are building, so it would recurse. The headers also give each
# object's real (chunk_id, offset, size), so this is not limited to one object per pack.
for info in repository.store_list("packs"):
pack_id = hex_to_bin(info.name)
pack = repository.store_load("packs/" + info.name)
for chunk_id, obj_offset, obj_size in RepoObj.iter_object_headers(pack):
num_chunks += 1
chunks[chunk_id] = ChunkIndexEntry(
flags=init_flags, size=0, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size
)
else:
# Legacy repo: list() reads its own segment index (no recursion). get() routes through that
# index, so the pack_id/offset fields here are just placeholders.
for chunk_id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
num_chunks += 1
chunks[chunk_id] = ChunkIndexEntry(
flags=init_flags, size=0, pack_id=chunk_id, obj_offset=0, obj_size=stored_size
)
# Cache does not contain the manifest.
if not isinstance(repository, Repository):
del chunks[Manifest.MANIFEST_ID]
Expand Down
16 changes: 16 additions & 0 deletions src/borg/repoobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def extract_crypted_data(cls, data: bytes) -> bytes:
raise IntegrityError(f"object size inconsistent: expected {overall_expected_size} bytes, got {len(data)}")
return data[hdr_size + hdr.meta_size :] # crypted data

@classmethod
def iter_object_headers(cls, pack: bytes):
"""Yield (chunk_id, obj_offset, obj_size) for every object stored in a pack.

Each object's identity and extent come from its on-disk header, so callers do not need to
know the pack file name. Works for one object per pack and for several.
"""
hdr_size = cls.obj_header.size
offset = 0
total = len(pack)
while offset + hdr_size <= total:
hdr = cls.ObjHeader(*cls.obj_header.unpack(pack[offset : offset + hdr_size]))
obj_size = hdr_size + hdr.meta_size + hdr.data_size
yield hdr.chunk_id, offset, obj_size
offset += obj_size

def __init__(self, key):
self.key = key
# Some commands write new chunks (e.g. rename) but don't take a --compression argument. This duplicates
Expand Down
64 changes: 31 additions & 33 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,13 +658,15 @@ def check_object(obj):
# add all existing objects to the index.
# borg check: the index may have corrupted objects (we did not delete them)
# borg check --repair: the index will only have non-corrupted objects.
# the pack file name is the pack_id (sha256(pack) at N>1 or with the
# BORG_TESTONLY_SHA256_PACK_ID switch), which is not the chunk_id, so recover
# each object's real (chunk_id, offset, size) from its on-disk header rather
# than assuming pack file name == chunk_id.
pack_id = hex_to_bin(info.name)
pack_size = info.size
chunk_id = pack_id # N=1: chunk_id == pack_id
obj_size = pack_size # correct for N=1
chunks[chunk_id] = ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size
)
for chunk_id, obj_offset, obj_size in RepoObj.iter_object_headers(obj):
chunks[chunk_id] = ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size
)
now = time.monotonic()
if now > t_last_checkpoint + 300: # checkpoint every 5 mins
t_last_checkpoint = now
Expand Down Expand Up @@ -705,28 +707,22 @@ def list(self, limit=None, marker=None):
list <limit> infos starting from after id <marker>.
each info is a tuple (id, storage_size).
"""
collect = True if marker is None else False
# Yield chunk_ids from the chunk index. (Listing the packs/ dir would yield pack file names,
# i.e. pack_ids, which are not chunk_ids.) iteritems() has no marker arg, so we skip to
# <marker> ourselves; index order is stable unless the index is mutated, which is all the
# marker pagination needs.
self._lock_refresh()
collect = marker is None
result = []
infos = self.store.list("packs") # generator yielding ItemInfos
while True:
self._lock_refresh()
try:
info = next(infos)
except StoreObjectNotFound:
break # can happen e.g. if "packs" does not exist, pointless to continue in that case
except StopIteration:
break
else:
pack_id = hex_to_bin(info.name)
chunk_id = pack_id # N=1: chunk_id == pack_id
if collect:
chunk_size = info.size # only correct for N=1
result.append((chunk_id, chunk_size))
if len(result) == limit:
break
elif chunk_id == marker:
collect = True
# note: do not collect the marker id
for chunk_id, entry in self.chunks.iteritems():
if entry.pack_id == UNKNOWN_BYTES32:
continue # buffered in PackWriter, not flushed to a pack yet
if collect:
result.append((chunk_id, entry.obj_size))
if len(result) == limit:
break
elif chunk_id == marker:
collect = True # start collecting after the marker; do not include the marker itself
return result

def get(self, id, read_data=True, raise_missing=True):
Expand Down Expand Up @@ -809,12 +805,14 @@ def delete(self, id, wait=True):
deal with async results / exceptions later.
"""
self._lock_refresh()
pack_id = id # N=1: pack_id == chunk_id
key = "packs/" + bin_to_hex(pack_id)
try:
self.store.delete(key)
except StoreObjectNotFound:
raise self.ObjectNotFound(id, str(self._location)) from None
# We can not remove one object by dropping its whole pack without losing the pack's other
# objects; real removal is store_delete at the pack level (compact). For now just check the
# object exists (ObjectNotFound contract), log, and do nothing.
# TODO: delete a single object once a pack can hold more than one (N>1).
entry = self.chunks.get(id)
if entry is None:
raise self.ObjectNotFound(id, str(self._location))
logger.warning("ignoring deletion of %s in %s", bin_to_hex(id), bin_to_hex(entry.pack_id))

def async_response(self, wait=True):
"""Get one async result (only applies to remote repositories).
Expand Down
12 changes: 12 additions & 0 deletions src/borg/testsuite/archiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ...constants import * # NOQA
from ...helpers import Location, umount
from ...helpers import EXIT_SUCCESS
from ...helpers import bin_to_hex
from ...helpers import init_ec_warnings
from ...logger import flush_logging
from ...manifest import Manifest
Expand Down Expand Up @@ -179,6 +180,17 @@ def open_archive(repo_path, name):
return archive, repository


def delete_chunk(repository, id):
"""Drop the pack holding chunk `id` (test damage helper).

Repository.delete is a no-op now, so tests that need a chunk to really vanish drop its whole
pack at the store level. Works at N=1 (one chunk per pack). The pack is resolved through the
chunk index, since the pack file name is the pack_id, which need not equal the chunk_id.
"""
entry = repository.chunks.get(id)
repository.store_delete("packs/" + bin_to_hex(entry.pack_id))


def open_repository(archiver):
if archiver.get_kind() == "remote":
return Repository(Location(archiver.repository_location), exclusive=True)
Expand Down
15 changes: 9 additions & 6 deletions src/borg/testsuite/archiver/check_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ...manifest import Manifest
from ...repository import Repository
from ..repository_test import fchunk
from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
from . import cmd, src_file, create_src_archive, open_archive, delete_chunk, generate_archiver_tests, RK_ENCRYPTION

pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA

Expand Down Expand Up @@ -162,7 +162,7 @@ def test_missing_file_chunk(archivers, request):
if item.path.endswith(src_file):
valid_chunks = item.chunks
killed_chunk = valid_chunks[-1]
repository.delete(killed_chunk.id)
delete_chunk(repository, killed_chunk.id)
break
else:
pytest.fail("should not happen") # convert 'fail'
Expand Down Expand Up @@ -198,7 +198,7 @@ def test_missing_archive_item_chunk(archivers, request):
check_cmd_setup(archiver)
archive, repository = open_archive(archiver.repository_path, "archive1")
with repository:
repository.delete(archive.metadata.items[0])
delete_chunk(repository, archive.metadata.items[0])
cmd(archiver, "check", exit_code=1)
cmd(archiver, "check", "--repair", exit_code=0)
cmd(archiver, "check", exit_code=0)
Expand All @@ -209,7 +209,7 @@ def test_missing_archive_metadata(archivers, request):
check_cmd_setup(archiver)
archive, repository = open_archive(archiver.repository_path, "archive1")
with repository:
repository.delete(archive.id)
delete_chunk(repository, archive.id)
cmd(archiver, "check", exit_code=1)
cmd(archiver, "check", "--repair", exit_code=0)
cmd(archiver, "check", exit_code=0)
Expand Down Expand Up @@ -445,6 +445,9 @@ def test_empty_repository(archivers, request):
pytest.skip("only works locally")
check_cmd_setup(archiver)
with Repository(archiver.repository_location, exclusive=True) as repository:
for id, _ in repository.list():
repository.delete(id)
# empty the repo by dropping every pack file directly via the store. We iterate the actual
# packs/ listing (the file names are the pack_ids), so this does not depend on what list()
# yields or on pack_id == chunk_id.
for info in repository.store_list("packs"):
repository.store_delete("packs/" + info.name)
cmd(archiver, "check", exit_code=1)
3 changes: 2 additions & 1 deletion src/borg/testsuite/archiver/extract_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
generate_archiver_tests,
create_src_archive,
open_archive,
delete_chunk,
src_file,
)

Expand Down Expand Up @@ -800,7 +801,7 @@ def test_extract_file_with_missing_chunk(archivers, request):
for item in archive.iter_items():
if item.path.endswith(src_file):
chunk = item.chunks[-1]
repository.delete(chunk.id)
delete_chunk(repository, chunk.id)
break
else:
assert False # missed the file
Expand Down
3 changes: 2 additions & 1 deletion src/borg/testsuite/archiver/mount_cmds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported
from ..platform.platform_test import fakeroot_detected
from . import RK_ENCRYPTION, cmd, assert_dirs_equal, create_regular_file, create_src_archive, open_archive, src_file
from . import delete_chunk
from . import requires_hardlinks, _extract_hardlinks_setup, fuse_mount, create_test_files, generate_archiver_tests

pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA
Expand Down Expand Up @@ -234,7 +235,7 @@ def test_fuse_allow_damaged_files(archivers, request):
with repository:
for item in archive.iter_items():
if item.path.endswith(src_file):
repository.delete(item.chunks[-1].id)
delete_chunk(repository, item.chunks[-1].id)
path = item.path # store full path for later
break
else:
Expand Down
Loading
Loading