diff --git a/README.md b/README.md index 8d393ba8..fc9ae6d5 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,7 @@ One thing you need to pay attention to here is, that the unmount command continu The [`ltfs_ordered_copy`](https://github.com/LinearTapeFileSystem/ltfs/wiki/ltfs_ordered_copy) is a program to copy files from source to destination with LTFS order optimization. It is written in python and it can work with both python2 and python3 (Python 2.7 or later is strongly recommended). You need to install the `pyxattr` module for both python2 and python3. +For nicer-looking progress bars while copying, it is also recommended you install `tqdm`. # Building the LTFS from this GitHub project diff --git a/src/utils/ltfs_ordered_copy b/src/utils/ltfs_ordered_copy index 5537964e..5ea70754 100755 --- a/src/utils/ltfs_ordered_copy +++ b/src/utils/ltfs_ordered_copy @@ -44,6 +44,12 @@ import threading from logging import getLogger, basicConfig, NOTSET, CRITICAL, ERROR, WARNING, INFO, DEBUG from collections import deque +try: + from tqdm import tqdm + USE_TQDM = True +except ImportError: + USE_TQDM = False + class CopyItem: """""" def __init__(self, src, dst, vea_pre, cp_attr, cp_xattr, logger): #initialization @@ -77,15 +83,20 @@ class CopyItem: return (self.vuuid, self.part, self.start) - def run(self): + def _run_copy(self, progress): + with open(self.src, 'rb') as srcf, open(self.dst, 'wb') as dstf: + copyfileobj(srcf, dstf, progress.update) + + def run(self, progress): try: if len(self.vuuid): logger.debug('"{0}" ({2}) -> "{1}"'.format(self.src, self.dst, str(self.start))) else: logger.debug('"{0}" -> "{1}"'.format(self.src, self.dst)) - + progress.update_file(self.src) if self.cp_attr: #Copy data and metadata - shutil.copy2(self.src, self.dst) + self._run_copy(progress) + shutil.copystat(self.src, self.dst) if self.cp_xattr: # Capture EAs of the source file src_attributes = {} @@ -96,7 +107,8 @@ class CopyItem: for key in src_attributes: xattr.set(self.dst, key, src_attributes[key]) else: #Only copy data - shutil.copy(self.src, self.dst) + self._run_copy(progress) + shutil.copymode(self.src, self.dst) except Exception as e: self.logger.error('Failed to copy "{0}" to "{1}": {2}'.format(self.src, self.dst, str(str(e)))) return False @@ -116,9 +128,11 @@ class CopyQueue: self.items = 0 self.logger = logger self.sort_files = sort_files + self.total_bytes = 0 def add_copy_item(self, c): (u, p, s) = c.eval() + self.total_bytes += os.path.getsize(c.src) if u == '': # Source is not on LTFS self.direct.append(c) @@ -206,24 +220,83 @@ class CopyQueue: def get_size(self): return self.items +RESULT_LOCK = threading.Lock() + +BUFSIZE = 256 * 1024 * 1024 # 256MiB + +# Based on shutil's code +def copyfileobj(fsrc, fdst, callback, length=BUFSIZE): + try: + # check for optimisation opportunity + if "b" in fsrc.mode and "b" in fdst.mode and fsrc.readinto: + return _copyfileobj_readinto(fsrc, fdst, callback, length) + except AttributeError: + # one or both file objects do not support a .mode or .readinto attribute + pass + + fsrc_read = fsrc.read + fdst_write = fdst.write + + while True: + buf = fsrc_read(length) + if not buf: + break + fdst_write(buf) + callback(len(buf)) + +def _copyfileobj_readinto(fsrc, fdst, callback, length=BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while True: + n = fsrc_readinto(mv) + if not n: + break + elif n < length: + with mv[:n] as smv: + fdst.write(smv) + else: + fdst_write(mv) + callback(n) + class Progress: - def __init__(self, logger, title, num): #initialization + def __init__(self, logger, title, num_f, num_b): #initialization self.logger = logger self.title = title - self.num = num - self.cur = 0 + self.num_f = num_f + self.num_b = num_b + self.cur_f = 0 + self.tqdm = None + + def update_file(self, name): + # Delay the initialization of tqdm to prevent console spam + if self.logger.getEffectiveLevel() == INFO and USE_TQDM and self.tqdm is None: + self.tqdm = tqdm(total=self.num_b, unit='B', unit_scale=True, unit_divisor=1024) - def update(self, step = 1): + self.cur_f += 1 if self.logger.getEffectiveLevel() == INFO: - self.cur = self.cur + 1 - sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur, self.num)) - sys.stderr.flush() + if self.tqdm is not None: + self.tqdm.set_description(f'{name} [{self.cur_f} / {self.num_f}]') + else: + sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur_f, self.num_f)) + sys.stderr.flush() + + def update(self, bytes_add): + if self.tqdm: + self.tqdm.update(bytes_add) def finish(self): - if self.logger.getEffectiveLevel() == INFO: - logger.info("") + if self.tqdm is not None: + self.tqdm.close() + else: + if self.logger.getEffectiveLevel() == INFO: + logger.info("") -RESULT_LOCK = threading.Lock() def writer(logger, prog, q, r): while True: @@ -236,9 +309,7 @@ def writer(logger, prog, q, r): logger.error('writer thread error: ' + str(e)) exit(1) - prog.update() - - result = ci.run() + result = ci.run(prog) with RESULT_LOCK: if result: @@ -363,7 +434,7 @@ direct_write_threads = 8 try: sig = xattr.get(args.DEST, VEA_PREFIX + LTFS_SIG_VEA) - if sig.startswith("LTFS"): + if sig.startswith(b"LTFS"): logger.info("Destination {0} is LTFS".format(args.DEST)) direct_write_threads = 1 else: @@ -425,7 +496,7 @@ success = 0 fail = 0 direct = copyq.pop_direct() -prog_disk = Progress(logger, 'File copy from disk is on going', len(direct)) +prog_disk = Progress(logger, 'File copy from disk is on going', len(direct), copyq.total_bytes) if len(direct): logger.info("Copying on {} disk files with {} threads".format(len(direct), direct_write_threads)) writers = [] @@ -444,7 +515,7 @@ if len(direct): prog_disk.finish() # Copy files on LTFS -prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size()) +prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size(), copyq.total_bytes) (tape_key, tape) = copyq.pop_tape() while tape != None: logger.log(NOTSET + 1, "Processing {}".format(len(tape))) @@ -461,8 +532,7 @@ while tape != None: for start_block_key in start_block_list: file_ind = partition[start_block_key] for cp in file_ind: - prog_tape.update() - result = cp.run() + result = cp.run(prog_tape) if result: success = success + 1 else: