|
| 1 | +"""Contains implementations of database retrieveing objects""" |
| 2 | +from git.utils import IndexFileSHA1Writer |
| 3 | +from git.errors import ( |
| 4 | + InvalidDBRoot, |
| 5 | + BadObject, |
| 6 | + BadObjectType |
| 7 | + ) |
| 8 | + |
| 9 | +from stream import ( |
| 10 | + DecompressMemMapReader, |
| 11 | + FDCompressedSha1Writer, |
| 12 | + Sha1Writer, |
| 13 | + OStream, |
| 14 | + OInfo |
| 15 | + ) |
| 16 | + |
| 17 | +from utils import ( |
| 18 | + ENOENT, |
| 19 | + to_hex_sha, |
| 20 | + exists, |
| 21 | + hex_to_bin, |
| 22 | + isdir, |
| 23 | + mkdir, |
| 24 | + rename, |
| 25 | + dirname, |
| 26 | + join |
| 27 | + ) |
| 28 | + |
| 29 | +from fun import ( |
| 30 | + chunk_size, |
| 31 | + loose_object_header_info, |
| 32 | + write_object, |
| 33 | + stream_copy |
| 34 | + ) |
| 35 | + |
| 36 | +import tempfile |
| 37 | +import mmap |
| 38 | +import os |
| 39 | + |
| 40 | + |
| 41 | +__all__ = ('ObjectDBR', 'ObjectDBW', 'FileDBBase', 'LooseObjectDB', 'PackedDB', |
| 42 | + 'CompoundDB', 'ReferenceDB', 'GitObjectDB' ) |
| 43 | + |
| 44 | +class ObjectDBR(object): |
| 45 | + """Defines an interface for object database lookup. |
| 46 | + Objects are identified either by hex-sha (40 bytes) or |
| 47 | + by sha (20 bytes)""" |
| 48 | + |
| 49 | + def __contains__(self, sha): |
| 50 | + return self.has_obj |
| 51 | + |
| 52 | + #{ Query Interface |
| 53 | + def has_object(self, sha): |
| 54 | + """ |
| 55 | + :return: True if the object identified by the given 40 byte hexsha or 20 bytes |
| 56 | + binary sha is contained in the database |
| 57 | + :raise BadObject:""" |
| 58 | + raise NotImplementedError("To be implemented in subclass") |
| 59 | + |
| 60 | + def info(self, sha): |
| 61 | + """ :return: OInfo instance |
| 62 | + :param sha: 40 bytes hexsha or 20 bytes binary sha |
| 63 | + :raise BadObject:""" |
| 64 | + raise NotImplementedError("To be implemented in subclass") |
| 65 | + |
| 66 | + def info_async(self, input_channel): |
| 67 | + """Retrieve information of a multitude of objects asynchronously |
| 68 | + :param input_channel: Channel yielding the sha's of the objects of interest |
| 69 | + :return: Channel yielding OInfo|InvalidOInfo, in any order""" |
| 70 | + raise NotImplementedError("To be implemented in subclass") |
| 71 | + |
| 72 | + def stream(self, sha): |
| 73 | + """:return: OStream instance |
| 74 | + :param sha: 40 bytes hexsha or 20 bytes binary sha |
| 75 | + :raise BadObject:""" |
| 76 | + raise NotImplementedError("To be implemented in subclass") |
| 77 | + |
| 78 | + def stream_async(self, input_channel): |
| 79 | + """Retrieve the OStream of multiple objects |
| 80 | + :param input_channel: see ``info`` |
| 81 | + :param max_threads: see ``ObjectDBW.store`` |
| 82 | + :return: Channel yielding OStream|InvalidOStream instances in any order""" |
| 83 | + raise NotImplementedError("To be implemented in subclass") |
| 84 | + |
| 85 | + #} END query interface |
| 86 | + |
| 87 | +class ObjectDBW(object): |
| 88 | + """Defines an interface to create objects in the database""" |
| 89 | + |
| 90 | + def __init__(self, *args, **kwargs): |
| 91 | + self._ostream = None |
| 92 | + |
| 93 | + #{ Edit Interface |
| 94 | + def set_ostream(self, stream): |
| 95 | + """Adjusts the stream to which all data should be sent when storing new objects |
| 96 | + :param stream: if not None, the stream to use, if None the default stream |
| 97 | + will be used. |
| 98 | + :return: previously installed stream, or None if there was no override |
| 99 | + :raise TypeError: if the stream doesn't have the supported functionality""" |
| 100 | + cstream = self._ostream |
| 101 | + self._ostream = stream |
| 102 | + return cstream |
| 103 | + |
| 104 | + def ostream(self): |
| 105 | + """:return: overridden output stream this instance will write to, or None |
| 106 | + if it will write to the default stream""" |
| 107 | + return self._ostream |
| 108 | + |
| 109 | + def store(self, istream): |
| 110 | + """Create a new object in the database |
| 111 | + :return: the input istream object with its sha set to its corresponding value |
| 112 | + :param istream: IStream compatible instance. If its sha is already set |
| 113 | + to a value, the object will just be stored in the our database format, |
| 114 | + in which case the input stream is expected to be in object format ( header + contents ). |
| 115 | + :raise IOError: if data could not be written""" |
| 116 | + raise NotImplementedError("To be implemented in subclass") |
| 117 | + |
| 118 | + def store_async(self, input_channel): |
| 119 | + """Create multiple new objects in the database asynchronously. The method will |
| 120 | + return right away, returning an output channel which receives the results as |
| 121 | + they are computed. |
| 122 | + |
| 123 | + :return: Channel yielding your IStream which served as input, in any order. |
| 124 | + The IStreams sha will be set to the sha it received during the process, |
| 125 | + or its error attribute will be set to the exception informing about the error. |
| 126 | + :param input_channel: Channel yielding IStream instance. |
| 127 | + As the same instances will be used in the output channel, you can create a map |
| 128 | + between the id(istream) -> istream |
| 129 | + :note:As some ODB implementations implement this operation as atomic, they might |
| 130 | + abort the whole operation if one item could not be processed. Hence check how |
| 131 | + many items have actually been produced.""" |
| 132 | + raise NotImplementedError("To be implemented in subclass") |
| 133 | + |
| 134 | + #} END edit interface |
| 135 | + |
| 136 | + |
| 137 | +class FileDBBase(object): |
| 138 | + """Provides basic facilities to retrieve files of interest, including |
| 139 | + caching facilities to help mapping hexsha's to objects""" |
| 140 | + |
| 141 | + def __init__(self, root_path): |
| 142 | + """Initialize this instance to look for its files at the given root path |
| 143 | + All subsequent operations will be relative to this path |
| 144 | + :raise InvalidDBRoot: |
| 145 | + :note: The base will not perform any accessablity checking as the base |
| 146 | + might not yet be accessible, but become accessible before the first |
| 147 | + access.""" |
| 148 | + super(FileDBBase, self).__init__() |
| 149 | + self._root_path = root_path |
| 150 | + |
| 151 | + |
| 152 | + #{ Interface |
| 153 | + def root_path(self): |
| 154 | + """:return: path at which this db operates""" |
| 155 | + return self._root_path |
| 156 | + |
| 157 | + def db_path(self, rela_path): |
| 158 | + """ |
| 159 | + :return: the given relative path relative to our database root, allowing |
| 160 | + to pontentially access datafiles""" |
| 161 | + return join(self._root_path, rela_path) |
| 162 | + #} END interface |
| 163 | + |
| 164 | + |
| 165 | + |
| 166 | +class LooseObjectDB(FileDBBase, ObjectDBR, ObjectDBW): |
| 167 | + """A database which operates on loose object files""" |
| 168 | + |
| 169 | + # CONFIGURATION |
| 170 | + # chunks in which data will be copied between streams |
| 171 | + stream_chunk_size = chunk_size |
| 172 | + |
| 173 | + |
| 174 | + def __init__(self, root_path): |
| 175 | + super(LooseObjectDB, self).__init__(root_path) |
| 176 | + self._hexsha_to_file = dict() |
| 177 | + # Additional Flags - might be set to 0 after the first failure |
| 178 | + # Depending on the root, this might work for some mounts, for others not, which |
| 179 | + # is why it is per instance |
| 180 | + self._fd_open_flags = getattr(os, 'O_NOATIME', 0) |
| 181 | + |
| 182 | + #{ Interface |
| 183 | + def object_path(self, hexsha): |
| 184 | + """ |
| 185 | + :return: path at which the object with the given hexsha would be stored, |
| 186 | + relative to the database root""" |
| 187 | + return join(hexsha[:2], hexsha[2:]) |
| 188 | + |
| 189 | + def readable_db_object_path(self, hexsha): |
| 190 | + """ |
| 191 | + :return: readable object path to the object identified by hexsha |
| 192 | + :raise BadObject: If the object file does not exist""" |
| 193 | + try: |
| 194 | + return self._hexsha_to_file[hexsha] |
| 195 | + except KeyError: |
| 196 | + pass |
| 197 | + # END ignore cache misses |
| 198 | + |
| 199 | + # try filesystem |
| 200 | + path = self.db_path(self.object_path(hexsha)) |
| 201 | + if exists(path): |
| 202 | + self._hexsha_to_file[hexsha] = path |
| 203 | + return path |
| 204 | + # END handle cache |
| 205 | + raise BadObject(hexsha) |
| 206 | + |
| 207 | + #} END interface |
| 208 | + |
| 209 | + def _map_loose_object(self, sha): |
| 210 | + """ |
| 211 | + :return: memory map of that file to allow random read access |
| 212 | + :raise BadObject: if object could not be located""" |
| 213 | + db_path = self.db_path(self.object_path(to_hex_sha(sha))) |
| 214 | + try: |
| 215 | + fd = os.open(db_path, os.O_RDONLY|self._fd_open_flags) |
| 216 | + except OSError,e: |
| 217 | + if e.errno != ENOENT: |
| 218 | + # try again without noatime |
| 219 | + try: |
| 220 | + fd = os.open(db_path, os.O_RDONLY) |
| 221 | + except OSError: |
| 222 | + raise BadObject(to_hex_sha(sha)) |
| 223 | + # didn't work because of our flag, don't try it again |
| 224 | + self._fd_open_flags = 0 |
| 225 | + else: |
| 226 | + raise BadObject(to_hex_sha(sha)) |
| 227 | + # END handle error |
| 228 | + # END exception handling |
| 229 | + try: |
| 230 | + return mmap.mmap(fd, 0, access=mmap.ACCESS_READ) |
| 231 | + finally: |
| 232 | + os.close(fd) |
| 233 | + # END assure file is closed |
| 234 | + |
| 235 | + def set_ostream(self, stream): |
| 236 | + """:raise TypeError: if the stream does not support the Sha1Writer interface""" |
| 237 | + if stream is not None and not isinstance(stream, Sha1Writer): |
| 238 | + raise TypeError("Output stream musst support the %s interface" % Sha1Writer.__name__) |
| 239 | + return super(LooseObjectDB, self).set_ostream(stream) |
| 240 | + |
| 241 | + def info(self, sha): |
| 242 | + m = self._map_loose_object(sha) |
| 243 | + try: |
| 244 | + type, size = loose_object_header_info(m) |
| 245 | + return OInfo(sha, type, size) |
| 246 | + finally: |
| 247 | + m.close() |
| 248 | + # END assure release of system resources |
| 249 | + |
| 250 | + def stream(self, sha): |
| 251 | + m = self._map_loose_object(sha) |
| 252 | + type, size, stream = DecompressMemMapReader.new(m, close_on_deletion = True) |
| 253 | + return OStream(sha, type, size, stream) |
| 254 | + |
| 255 | + def has_object(self, sha): |
| 256 | + try: |
| 257 | + self.readable_db_object_path(to_hex_sha(sha)) |
| 258 | + return True |
| 259 | + except BadObject: |
| 260 | + return False |
| 261 | + # END check existance |
| 262 | + |
| 263 | + def store(self, istream): |
| 264 | + """note: The sha we produce will be hex by nature""" |
| 265 | + tmp_path = None |
| 266 | + writer = self.ostream() |
| 267 | + if writer is None: |
| 268 | + # open a tmp file to write the data to |
| 269 | + fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path) |
| 270 | + writer = FDCompressedSha1Writer(fd) |
| 271 | + # END handle custom writer |
| 272 | + |
| 273 | + try: |
| 274 | + try: |
| 275 | + if istream.sha is not None: |
| 276 | + stream_copy(istream.read, writer.write, istream.size, self.stream_chunk_size) |
| 277 | + else: |
| 278 | + # write object with header, we have to make a new one |
| 279 | + write_object(istream.type, istream.size, istream.read, writer.write, |
| 280 | + chunk_size=self.stream_chunk_size) |
| 281 | + # END handle direct stream copies |
| 282 | + except: |
| 283 | + if tmp_path: |
| 284 | + os.remove(tmp_path) |
| 285 | + raise |
| 286 | + # END assure tmpfile removal on error |
| 287 | + finally: |
| 288 | + if tmp_path: |
| 289 | + writer.close() |
| 290 | + # END assure target stream is closed |
| 291 | + |
| 292 | + sha = istream.sha or writer.sha(as_hex=True) |
| 293 | + |
| 294 | + if tmp_path: |
| 295 | + obj_path = self.db_path(self.object_path(sha)) |
| 296 | + obj_dir = dirname(obj_path) |
| 297 | + if not isdir(obj_dir): |
| 298 | + mkdir(obj_dir) |
| 299 | + # END handle destination directory |
| 300 | + rename(tmp_path, obj_path) |
| 301 | + # END handle dry_run |
| 302 | + |
| 303 | + istream.sha = sha |
| 304 | + return istream |
| 305 | + |
| 306 | + |
| 307 | +class PackedDB(FileDBBase, ObjectDBR): |
| 308 | + """A database operating on a set of object packs""" |
| 309 | + |
| 310 | + |
| 311 | +class CompoundDB(ObjectDBR): |
| 312 | + """A database which delegates calls to sub-databases""" |
| 313 | + |
| 314 | + |
| 315 | +class ReferenceDB(CompoundDB): |
| 316 | + """A database consisting of database referred to in a file""" |
| 317 | + |
| 318 | + |
| 319 | +#class GitObjectDB(CompoundDB, ObjectDBW): |
| 320 | +class GitObjectDB(LooseObjectDB): |
| 321 | + """A database representing the default git object store, which includes loose |
| 322 | + objects, pack files and an alternates file |
| 323 | + |
| 324 | + It will create objects only in the loose object database. |
| 325 | + :note: for now, we use the git command to do all the lookup, just until he |
| 326 | + have packs and the other implementations |
| 327 | + """ |
| 328 | + def __init__(self, root_path, git): |
| 329 | + """Initialize this instance with the root and a git command""" |
| 330 | + super(GitObjectDB, self).__init__(root_path) |
| 331 | + self._git = git |
| 332 | + |
| 333 | + def info(self, sha): |
| 334 | + t = self._git.get_object_header(sha) |
| 335 | + return OInfo(*t) |
| 336 | + |
| 337 | + def stream(self, sha): |
| 338 | + """For now, all lookup is done by git itself""" |
| 339 | + t = self._git.stream_object_data(sha) |
| 340 | + return OStream(*t) |
| 341 | + |
0 commit comments