Skip to content

Commit 9b53ab0

Browse files
committed
MemoryDB: Implemented direct stream copy, allowing to flush memory db content into any other object db for permanent storage
1 parent 92e6770 commit 9b53ab0

5 files changed

Lines changed: 107 additions & 25 deletions

File tree

db/loose.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from gitdb.stream import (
1414
DecompressMemMapReader,
1515
FDCompressedSha1Writer,
16+
FDStream,
1617
Sha1Writer
1718
)
1819

@@ -43,6 +44,7 @@
4344

4445
import tempfile
4546
import mmap
47+
import sys
4648
import os
4749

4850

@@ -153,13 +155,20 @@ def store(self, istream):
153155
if writer is None:
154156
# open a tmp file to write the data to
155157
fd, tmp_path = tempfile.mkstemp(prefix='obj', dir=self._root_path)
156-
writer = FDCompressedSha1Writer(fd)
158+
159+
if istream.sha is None:
160+
writer = FDCompressedSha1Writer(fd)
161+
else:
162+
writer = FDStream(fd)
163+
# END handle direct stream copies
157164
# END handle custom writer
158165

159166
try:
160167
try:
161168
if istream.sha is not None:
162-
stream_copy(istream.read, writer.write, istream.size, self.stream_chunk_size)
169+
# copy as much as possible, the actual uncompressed item size might
170+
# be smaller than the compressed version
171+
stream_copy(istream.read, writer.write, sys.maxint, self.stream_chunk_size)
163172
else:
164173
# write object with header, we have to make a new one
165174
write_object(istream.type, istream.size, istream.read, writer.write,
@@ -175,18 +184,23 @@ def store(self, istream):
175184
writer.close()
176185
# END assure target stream is closed
177186

178-
sha = istream.sha or writer.sha(as_hex=True)
187+
hexsha = None
188+
if istream.sha:
189+
hexsha = istream.hexsha
190+
else:
191+
hexsha = writer.sha(as_hex=True)
192+
# END handle sha
179193

180194
if tmp_path:
181-
obj_path = self.db_path(self.object_path(sha))
195+
obj_path = self.db_path(self.object_path(hexsha))
182196
obj_dir = dirname(obj_path)
183197
if not isdir(obj_dir):
184198
mkdir(obj_dir)
185199
# END handle destination directory
186200
rename(tmp_path, obj_path)
187201
# END handle dry_run
188202

189-
istream.sha = sha
203+
istream.sha = hexsha
190204
return istream
191205

192206
def sha_iter(self):

db/mem.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
ObjectDBW
66
)
77

8-
from gitdb.base import OStream
8+
from gitdb.base import (
9+
OStream,
10+
IStream,
11+
)
12+
913
from gitdb.util import to_bin_sha
1014
from gitdb.exc import (
1115
BadObject,
@@ -16,6 +20,8 @@
1620
DecompressMemMapReader,
1721
)
1822

23+
from cStringIO import StringIO
24+
1925
__all__ = ("MemoryDB", )
2026

2127
class MemoryDB(ObjectDBR, ObjectDBW):
@@ -78,3 +84,28 @@ def size(self):
7884

7985
def sha_iter(self):
8086
return self._cache.iterkeys()
87+
88+
89+
#{ Interface
90+
def stream_copy(self, sha_iter, odb):
91+
"""Copy the streams as identified by sha's yielded by sha_iter into the given odb
92+
The streams will be copied directly
93+
:note: the object will only be written if it did not exist in the target db
94+
:return: amount of streams actually copied into odb. If smaller than the amount
95+
of input shas, one or more objects did already exist in odb"""
96+
count = 0
97+
for sha in sha_iter:
98+
if odb.has_object(sha):
99+
continue
100+
# END check object existance
101+
102+
ostream = self.stream(sha)
103+
# compressed data including header
104+
sio = StringIO(ostream.stream.data())
105+
istream = IStream(ostream.type, ostream.size, sio, sha)
106+
107+
odb.store(istream)
108+
count += 1
109+
# END for each sha
110+
return count
111+
#} END interface

stream.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,6 @@
2525

2626
#{ RO Streams
2727

28-
class NullStream(object):
29-
"""A stream that does nothing but providing a stream interface.
30-
Use it like /dev/null"""
31-
__slots__ = tuple()
32-
33-
def read(self, size=0):
34-
return ''
35-
36-
def close(self):
37-
pass
38-
39-
def write(self, data):
40-
return len(data)
41-
42-
4328
class DecompressMemMapReader(LazyMixin):
4429
"""Reads data in chunks from a memory map and decompresses it. The client sees
4530
only the uncompressed data, respective file-like read calls are handling on-demand
@@ -113,6 +98,8 @@ def _parse_header_info(self):
11398

11499
return type, size
115100

101+
#{ Interface
102+
116103
@classmethod
117104
def new(self, m, close_on_deletion=False):
118105
"""Create a new DecompressMemMapReader instance for acting as a read-only stream
@@ -125,6 +112,10 @@ def new(self, m, close_on_deletion=False):
125112
type, size = inst._parse_header_info()
126113
return type, size, inst
127114

115+
def data(self):
116+
""":return: random access compatible data we are working on"""
117+
return self._m
118+
128119
def compressed_bytes_read(self):
129120
""":return: number of compressed bytes read. This includes the bytes it
130121
took to decompress the header ( if there was one )"""
@@ -171,6 +162,8 @@ def compressed_bytes_read(self):
171162
# from the count already
172163
return self._cbr
173164

165+
#} END interface
166+
174167
def seek(self, offset, whence=os.SEEK_SET):
175168
"""Allows to reset the stream to restart reading
176169
:raise ValueError: If offset and whence are not 0"""
@@ -567,4 +560,32 @@ def close(self):
567560

568561
#} END stream interface
569562

563+
class FDStream(object):
564+
"""Simple wrapper around a file descriptor"""
565+
__slots__ = "_fd"
566+
def __init__(self, fd):
567+
self._fd = fd
568+
569+
def write(self, data):
570+
return write(self._fd, data)
571+
572+
def close(self):
573+
close(self._fd)
574+
575+
576+
577+
class NullStream(object):
578+
"""A stream that does nothing but providing a stream interface.
579+
Use it like /dev/null"""
580+
__slots__ = tuple()
581+
582+
def read(self, size=0):
583+
return ''
584+
585+
def close(self):
586+
pass
587+
588+
def write(self, data):
589+
return len(data)
590+
570591
#} END W streams

test/db/test_mem.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
11
from lib import *
2-
from gitdb.db import MemoryDB
2+
from gitdb.db import (
3+
MemoryDB,
4+
LooseObjectDB
5+
)
36

47
class TestMemoryDB(TestDBBase):
58

6-
def test_writing(self):
9+
@with_rw_directory
10+
def test_writing(self, path):
711
mdb = MemoryDB()
812

913
# write data
1014
self._assert_object_writing_simple(mdb)
15+
16+
# test stream copy
17+
ldb = LooseObjectDB(path)
18+
assert ldb.size() == 0
19+
num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
20+
assert num_streams_copied == mdb.size()
21+
22+
assert ldb.size() == mdb.size()
23+
for sha in mdb.sha_iter():
24+
assert ldb.has_object(sha)
25+
assert ldb.stream(sha).read() == mdb.stream(sha).read()
26+
# END verify objects where copied and are equal

test/test_stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
5050
# END handle rest
5151

5252
if isinstance(stream, DecompressMemMapReader):
53-
assert len(stream._m) == stream.compressed_bytes_read()
53+
assert len(stream.data()) == stream.compressed_bytes_read()
5454
# END handle special type
5555

5656
rewind_stream(stream)
@@ -60,7 +60,7 @@ def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
6060
assert rdata == cdata
6161

6262
if isinstance(stream, DecompressMemMapReader):
63-
assert len(stream._m) == stream.compressed_bytes_read()
63+
assert len(stream.data()) == stream.compressed_bytes_read()
6464
# END handle special type
6565

6666
def test_decompress_reader(self):

0 commit comments

Comments
 (0)