Skip to content

Commit bf4437e

Browse files
committed
DecompressMemMapReader: implemented compressed bytes counting, including test. This is required to properly read packs without the use of an index
1 parent 0650892 commit bf4437e

4 files changed

Lines changed: 82 additions & 50 deletions

File tree

ext/async

Submodule async updated from af0040b to 796b5e9

pack.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ def pack_object_at(data, as_stream):
5454
s += 7
5555
# END character loop
5656
if as_stream:
57-
stream = DecompressMemMapReader(buffer(data, i), False)
57+
stream = DecompressMemMapReader(buffer(data, i), False, uncomp_size)
5858
return ODeltaPackStream(type_id, uncomp_size, delta_offset, stream)
5959
else:
6060
return ODeltaPackInfo(type_id, uncomp_size, delta_offset)
6161
# END handle stream
6262
elif type_id == REF_DELTA:
6363
ref_sha = data[:20]
6464
if as_stream:
65-
stream = DecompressMemMapReader(buffer(data, 20), False)
65+
stream = DecompressMemMapReader(buffer(data, 20), False, uncomp_size)
6666
return ODeltaPackStream(type_id, uncomp_size, ref_sha, stream)
6767
else:
6868
return ODeltaPackInfo(type_id, uncomp_size, ref_sha)
@@ -267,7 +267,7 @@ class PackFile(LazyMixin):
267267
__slots__ = ('_packpath', '_data', '_size', '_version')
268268

269269
# offset into our data at which the first object starts
270-
_first_object_offset = 3*4 + 8
270+
_first_object_offset = 3*4
271271

272272
def __init__(self, packpath):
273273
self._packpath = packpath

stream.py

Lines changed: 69 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11

22
from cStringIO import StringIO
33
import errno
4+
import mmap
5+
import os
46

57
from util import (
68
LazyMixin,
@@ -13,10 +15,6 @@
1315
__all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer')
1416

1517

16-
# ZLIB configuration
17-
# used when compressing objects - 1 to 9 ( slowest )
18-
Z_BEST_SPEED = 1
19-
2018
#{ RO Streams
2119

2220
class DecompressMemMapReader(LazyMixin):
@@ -36,7 +34,8 @@ class DecompressMemMapReader(LazyMixin):
3634
times we actually allocate. An own zlib implementation would be good here
3735
to better support streamed reading - it would only need to keep the mmap
3836
and decompress it into chunks, thats all ... """
39-
__slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close')
37+
__slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close',
38+
'_cbr', '_phi')
4039

4140
max_read_size = 512*1024 # currently unused
4241

@@ -52,6 +51,8 @@ def __init__(self, m, close_on_deletion, size=None):
5251
self._br = 0 # num uncompressed bytes read
5352
self._cws = 0 # start byte of compression window
5453
self._cwe = 0 # end byte of compression window
54+
self._cbr = 0 # number of compressed bytes read
55+
self._phi = False # is True if we parsed the header info
5556
self._close = close_on_deletion # close the memmap on deletion ?
5657

5758
def _set_cache_(self, attr):
@@ -85,6 +86,8 @@ def _parse_header_info(self):
8586
self._buf = StringIO(hdr[hdrend:])
8687
self._buflen = len(hdr) - hdrend
8788

89+
self._phi = True
90+
8891
return type, size
8992

9093
@classmethod
@@ -98,7 +101,55 @@ def new(self, m, close_on_deletion=False):
98101
inst = DecompressMemMapReader(m, close_on_deletion, 0)
99102
type, size = inst._parse_header_info()
100103
return type, size, inst
104+
105+
def compressed_bytes_read(self):
106+
""":return: number of compressed bytes read. This includes the bytes it
107+
took to decompress the header ( if there was one )"""
108+
# ABSTRACT: When decompressing a byte stream, it can be that the first
109+
# x bytes which were requested match the first x bytes in the loosely
110+
# compressed datastream. This is the worst-case assumption that the reader
111+
# does, it assumes that it will get at least X bytes from X compressed bytes
112+
# in call cases.
113+
# The caveat is that the object, according to our known uncompressed size,
114+
# is already complete, but there are still some bytes left in the compressed
115+
# stream that contribute to the amount of compressed bytes.
116+
# How can we know that we are truly done, and have read all bytes we need
117+
# to read ?
118+
# Without help, we cannot know, as we need to obtain the status of the
119+
# decompression. If it is not finished, we need to decompress more data
120+
# until it is finished, to yield the actual number of compressed bytes
121+
# belonging to the decompressed object
122+
# We are using a custom zlib module for this, if its not present,
123+
# we can only hope it works.
124+
# Only scrub the stream forward if we are officially done with the
125+
# bytes we were to have.
126+
if self._br == self._s and hasattr(self._zip, 'status') and self._zip.status == zlib.Z_OK:
127+
# manipulate the bytes-read to allow our own read method to coninute
128+
# but keep the window at its current position
129+
self._br = 0
130+
while self._zip.status == zlib.Z_OK:
131+
self.read(mmap.PAGESIZE)
132+
# END scrub-loop
133+
# reset bytes read, just to be sure
134+
self._br = self._s
135+
# END handle stream scrubbing
136+
137+
return self._cbr - len(self._zip.unused_data)
101138

139+
def seek(self, offset, whence=os.SEEK_SET):
140+
"""Allows to reset the stream to restart reading
141+
:raise ValueError: If offset and whence are not 0"""
142+
if offset != 0 or whence != os.SEEK_SET:
143+
raise ValueError("Can only seek to position 0")
144+
# END handle offset
145+
146+
self._zip = zlib.decompressobj()
147+
self._br = self._cws = self._cwe = self._cbr = 0
148+
if self._phi:
149+
self._phi = False
150+
del(self._s) # trigger header parsing on first access
151+
# END skip header
152+
102153
def read(self, size=-1):
103154
if size < 1:
104155
size = self._s - self._br
@@ -109,33 +160,8 @@ def read(self, size=-1):
109160
if size == 0:
110161
return str()
111162
# END handle depletion
112-
113-
# protect from memory peaks
114-
# If he tries to read large chunks, our memory patterns get really bad
115-
# as we end up copying a possibly huge chunk from our memory map right into
116-
# memory. This might not even be possible. Nonetheless, try to dampen the
117-
# effect a bit by reading in chunks, returning a huge string in the end.
118-
# Our performance now depends on StringIO. This way we don't need two large
119-
# buffers in peak times, but only one large one in the end which is
120-
# the return buffer
121-
# NO: We don't do it - if the user thinks its best, he is right. If he
122-
# has trouble, he will start reading in chunks. According to our tests
123-
# its still faster if we read 10 Mb at once instead of chunking it.
124-
125-
# if size > self.max_read_size:
126-
# sio = StringIO()
127-
# while size:
128-
# read_size = min(self.max_read_size, size)
129-
# data = self.read(read_size)
130-
# sio.write(data)
131-
# size -= len(data)
132-
# if len(data) < read_size:
133-
# break
134-
# # END data loop
135-
# sio.seek(0)
136-
# return sio.getvalue()
137-
# # END handle maxread
138-
#
163+
164+
139165
# deplete the buffer, then just continue using the decompress object
140166
# which has an own buffer. We just need this to transparently parse the
141167
# header from the zlib stream
@@ -186,8 +212,7 @@ def read(self, size=-1):
186212

187213

188214
# if window is too small, make it larger so zip can decompress something
189-
win_size = self._cwe - self._cws
190-
if win_size < 8:
215+
if self._cwe - self._cws < 8:
191216
self._cwe = self._cws + 8
192217
# END adjust winsize
193218

@@ -196,10 +221,18 @@ def read(self, size=-1):
196221

197222
# get the actual window end to be sure we don't use it for computations
198223
self._cwe = self._cws + len(indata)
199-
224+
200225
dcompdat = self._zip.decompress(indata, size)
201226

227+
# update the amount of compressed bytes read
228+
# We feed possibly overlapping chunks, which is why the unconsumed tail
229+
# has to be taken into consideration, as well as the unused data
230+
# if we hit the end of the stream
231+
self._cbr += len(indata) - len(self._zip.unconsumed_tail)
202232
self._br += len(dcompdat)
233+
234+
print size, self._br, self._cbr, len(indata), self._cws, self._cwe, len(self._zip.unused_data), len(self._zip.unconsumed_tail)
235+
203236
if dat:
204237
dcompdat = dat + dcompdat
205238

@@ -252,7 +285,7 @@ class FDCompressedSha1Writer(Sha1Writer):
252285
def __init__(self, fd):
253286
super(FDCompressedSha1Writer, self).__init__()
254287
self.fd = fd
255-
self.zip = zlib.compressobj(Z_BEST_SPEED)
288+
self.zip = zlib.compressobj(zlib.Z_BEST_SPEED)
256289

257290
#{ Stream Interface
258291

test/test_stream.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,20 @@ def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
4949
assert rest == cdata[-len(rest):]
5050
# END handle rest
5151

52+
if isinstance(stream, DecompressMemMapReader):
53+
assert len(stream._m) == stream.compressed_bytes_read()
54+
# END handle special type
55+
5256
rewind_stream(stream)
5357

5458
# read everything
5559
rdata = stream.read()
5660
assert rdata == cdata
5761

62+
if isinstance(stream, DecompressMemMapReader):
63+
assert len(stream._m) == stream.compressed_bytes_read()
64+
# END handle special type
65+
5866
def test_decompress_reader(self):
5967
for close_on_deletion in range(2):
6068
for with_size in range(2):
@@ -82,15 +90,7 @@ def test_decompress_reader(self):
8290
assert reader._s == len(cdata)
8391
# END get reader
8492

85-
def rewind(r):
86-
r._zip = zlib.decompressobj()
87-
r._br = r._cws = r._cwe = 0
88-
if with_size:
89-
r._parse_header_info()
90-
# END skip header
91-
# END make rewind func
92-
93-
self._assert_stream_reader(reader, cdata, rewind)
93+
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
9494

9595
# put in a dummy stream for closing
9696
dummy = DummyStream()
@@ -99,7 +99,6 @@ def rewind(r):
9999
assert not dummy.closed
100100
del(reader)
101101
assert dummy.closed == close_on_deletion
102-
#zdi#
103102
# END for each datasize
104103
# END whether size should be used
105104
# END whether stream should be closed when deleted

0 commit comments

Comments
 (0)