Skip to content

Commit 6a4eee2

Browse files
committed
initial version of delta-apply, but more pedandic testing is required
1 parent 84c4e5a commit 6a4eee2

4 files changed

Lines changed: 188 additions & 20 deletions

File tree

fun.py

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from util import zlib
1010
decompressobj = zlib.decompressobj
1111

12+
import mmap
1213

1314
# INVARIANTS
1415
OFS_DELTA = 6
@@ -34,7 +35,7 @@
3435
)
3536

3637
# used when dealing with larger streams
37-
chunk_size = 1000*1000
38+
chunk_size = 1000*mmap.PAGESIZE
3839

3940
__all__ = ('is_loose_object', 'loose_object_header_info', 'object_header_info',
4041
'write_object' )
@@ -83,6 +84,26 @@ def pack_object_header_info(data):
8384
raise BadObjectType(type_id)
8485
# END handle exceptions
8586

87+
def msb_size(data, offset=0):
88+
""":return: tuple(read_bytes, size) read the msb size from the given random
89+
access data starting at the given byte offset"""
90+
size = 0
91+
i = 0
92+
l = len(data)
93+
hit_msb = False
94+
while i < l:
95+
c = ord(data[i+offset])
96+
size |= (c & 0x7f) << i*7
97+
i += 1
98+
if not c & 0x80:
99+
hit_msb = True
100+
break
101+
# END check msb bit
102+
# END while in range
103+
if not hit_msb:
104+
raise AssertionError("Could not find terminating MSB byte in data stream")
105+
return i+offset, size
106+
86107
def write_object(type, size, read, write, chunk_size=chunk_size):
87108
"""Write the object as identified by type, size and source_stream into the
88109
target_stream
@@ -111,14 +132,78 @@ def stream_copy(read, write, size, chunk_size):
111132
# WRITE ALL DATA UP TO SIZE
112133
while True:
113134
cs = min(chunk_size, size-dbw)
114-
data_len = write(read(cs))
135+
# NOTE: not all write methods return the amount of written bytes, like
136+
# mmap.write. Its bad, but we just deal with it ... perhaps its not
137+
# even less efficient
138+
# data_len = write(read(cs))
139+
# dbw += data_len
140+
data = read(cs)
141+
data_len = len(data)
115142
dbw += data_len
143+
write(data)
116144
if data_len < cs or dbw == size:
117145
break
118146
# END check for stream end
119147
# END duplicate data
120148
return dbw
121149

122150

151+
def apply_delta_data(src_buf, src_buf_size, delta_buf, delta_buf_size, target_file):
152+
"""Apply data from a delta buffer using a source buffer to the target file,
153+
which will be written to
154+
:param src_buf: random access data from which the delta was created
155+
:param src_buf_size: size of the source buffer in bytes
156+
:param delta_buf_size: size fo the delta buffer in bytes
157+
:param delta_buf: random access delta data
158+
:param target_file: file like object to write the result to
159+
:note: transcribed to python from the similar routine in patch-delta.c"""
160+
i = 0
161+
twrite = target_file.write
162+
db = delta_buf
163+
while i < delta_buf_size:
164+
c = ord(db[i])
165+
i += 1
166+
if c & 0x80:
167+
cp_off, cp_size = 0, 0
168+
if (c & 0x01):
169+
cp_off = ord(db[i])
170+
i += 1
171+
if (c & 0x02):
172+
cp_off |= (ord(db[i]) << 8)
173+
i += 1
174+
if (c & 0x04):
175+
cp_off |= (ord(db[i]) << 16)
176+
i += i
177+
if (c & 0x08):
178+
cp_off |= (ord(db[i]) << 24)
179+
i += 1
180+
if (c & 0x10):
181+
cp_size = ord(db[i])
182+
i += 1
183+
if (c & 0x20):
184+
cp_size |= (ord(db[i]) << 8)
185+
i += 1
186+
if (c & 0x40):
187+
cp_size |= (ord(db[i]) << 16)
188+
i += 1
189+
190+
if not cp_size:
191+
cp_size = 0x10000
192+
# maybe skip this check ?
193+
if (cp_off + cp_size < cp_size or
194+
cp_off + cp_size > src_buf_size):
195+
break
196+
twrite(src_buf[cp_off:cp_off+cp_size])
197+
elif c:
198+
twrite(db[i:i+c])
199+
i += c
200+
else:
201+
raise ValueError("unexpected delta opcode 0")
202+
# END handle command byte
203+
# END while processing delta data
204+
205+
# yes, lets use the exact same error message that git uses :)
206+
assert i == delta_buf_size, "delta replay has gone wild"
207+
123208
#} END routines
124209

stream.py

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@
44
import mmap
55
import os
66

7+
from fun import (
8+
msb_size,
9+
stream_copy,
10+
apply_delta_data
11+
)
12+
713
from util import (
14+
allocate_memory,
815
LazyMixin,
916
make_sha,
1017
write,
@@ -300,9 +307,11 @@ class DeltaApplyReader(LazyMixin):
300307
* cmd == 0 - invalid operation ( or error in delta stream )
301308
"""
302309
__slots__ = (
303-
"_streams", # tuple of our stream objects
304-
"_readers", # list of read methods from our streams
310+
"_bstream", # base stream to which to apply the deltas
311+
"_dstreams", # tuple of delta stream readers
305312
"_mm_target", # memory map of the delta-applied data
313+
"_size", # actual number of bytes in _mm_target
314+
"_br" # number of bytes read
306315
)
307316

308317
def __init__(self, stream_list):
@@ -311,31 +320,81 @@ def __init__(self, stream_list):
311320
base object onto which to apply the deltas"""
312321
assert len(stream_list) > 1, "Need at least one delta and one base stream"
313322

314-
self._streams = tuple(stream_list)
315-
self._readers = None # TODO
323+
self._bstream = stream_list[-1]
324+
self._dstreams = tuple(stream_list[:-1])
325+
self._br = 0
316326

317327
def _set_cache_(self, attr):
318328
"""If we are here, we apply the actual deltas"""
319329
# fill in delta info structures, providing the source and target buffer
320330
# sizes.
331+
buffer_offset_list = list()
332+
final_target_size = None
333+
max_target_size = 0
334+
for dstream in self._dstreams:
335+
buf = dstream.read(512) # read the header information + X
336+
offset, src_size = msb_size(buf)
337+
offset, target_size = msb_size(buf, offset)
338+
if final_target_size is None:
339+
final_target_size = target_size
340+
# END set final target size
341+
buffer_offset_list.append((buffer(buf, offset), offset))
342+
max_target_size = max(max_target_size, target_size)
343+
# END for each delta stream
344+
345+
# sanity check - the first delta to apply should have the same source
346+
# size as our actual base stream
347+
base_size = self._bstream.size
348+
target_size = max_target_size
349+
350+
# if we have more than 1 delta to apply, we will swap buffers, hence we must
351+
# assure that all buffers we use are large enough to hold all the results
352+
if len(self._dstreams) > 1:
353+
base_size = target_size = max(base_size, max_target_size)
354+
# END adjust buffer sizes
355+
321356

322357
# Allocate private memory map big enough to hold the first base buffer
323-
# It can be swapped out if it is too large. We need random access to it
358+
# We need random access to it
359+
bbuf = allocate_memory(base_size)
324360

325361
# allocate memory map large enough for the largest (intermediate) target
326362
# We will use it as scratch space for all delta ops. If the final
327363
# target buffer is smaller than our allocated space, we just use parts
328-
# of it
364+
# of it upon return.
365+
tbuf = allocate_memory(target_size)
329366

330367
# for each delta to apply, memory map the decompressed delta and
331368
# work on the op-codes to reconstruct everything.
332369
# For the actual copying, we use a seek and write pattern of buffer
333370
# slices.
334-
335-
# NOTE: on py pre 2.5, all memory maps must actually be some kind
336-
# of memory buffer,like StringIO ( ouch ;) )
337-
338-
371+
for (dbuf, offset), dstream in reversed(zip(buffer_offset_list, self._dstreams)):
372+
# allocate a buffer to hold all delta data - fill in the data for
373+
# fast access. We do this as we know that reading individual bytes
374+
# from our stream would be slower than necessary ( although possible )
375+
# The dbuf buffer contains commands after the first two MSB sizes, the
376+
# offset specifies the amount of bytes read to get the sizes.
377+
ddata = allocate_memory(dstream.size - offset)
378+
ddata.write(dbuf)
379+
# read the rest from the stream. The size we give is larger than necessary
380+
stream_copy(dstream.read, ddata.write, dstream.size, 256*mmap.PAGESIZE)
381+
382+
################################################################
383+
apply_delta_data(bbuf, len(bbuf), ddata, len(ddata), tbuf)
384+
################################################################
385+
386+
# finally, swap out source and target buffers. The target is now the
387+
# base for the next delta to apply
388+
bbuf, tbuf = tbuf, bbuf
389+
bbuf.seek(0)
390+
tbuf.seek(0)
391+
# END for each delta to apply
392+
393+
# its already seeked to 0, constrain it to the actual size
394+
# NOTE: in the end of the loop, it swaps buffers, hence our target buffer
395+
# is not tbuf, but bbuf !
396+
self._mm_target = bbuf
397+
self._size = final_target_size
339398

340399
# TODO: Once that works, figure out the ordering of the opcodes. If they
341400
# are always in-order/sequential, an alternate implementation could
@@ -344,10 +403,21 @@ def _set_cache_(self, attr):
344403
# concatenated opcode list which indicates what to copy from which delta
345404
# to which position. This preprocessing would allow true streaming
346405

347-
def read(self, size=0):
348-
# pass the call to our lazy-loaded delta-applied data
349-
return self._mm_target.read(size)
350-
406+
def read(self, count=0):
407+
bl = self._size - self._br # bytes left
408+
if count < 1 or count > bl:
409+
count = bl
410+
data = self._mm_target.read(count)
411+
self._br += len(data)
412+
return data
413+
414+
def seek(self, offset, whence=os.SEEK_SET):
415+
"""Allows to reset the stream to restart reading
416+
:raise ValueError: If offset and whence are not 0"""
417+
if offset != 0 or whence != os.SEEK_SET:
418+
raise ValueError("Can only seek to position 0")
419+
# END handle offset
420+
self._size
351421
#} END RO streams
352422

353423

test/test_pack.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ def _assert_pack_file(self, pack, version, size):
7878
continue
7979
# END get deltastream
8080

81-
# TODO: TestStream._assert_stream_reader does that already, should
82-
# be used instead
8381
# read all
84-
dstream.read()
82+
assert len(dstream.read())
8583

8684
# read chunks
85+
# NOTE: the current implementation is safe, it basically transfers
86+
# all calls to the underlying memory map
8787

8888
# END for each object
8989
assert num_obj == size

util.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,19 @@ def stream_copy(source, destination, chunk_size=512*1024):
9494
# END reading output stream
9595
return br
9696

97+
def allocate_memory(size):
98+
""":return: a file-protocol accessible memory block of the given size"""
99+
try:
100+
return mmap.mmap(-1, size) # read-write by default
101+
except EnvironmentError:
102+
# setup real memory instead
103+
# this of course may fail if the amount of memory is not available in
104+
# one chunk - would only be the case in python 2.4, being more likely on
105+
# 32 bit systems.
106+
return cStringIO.StringIO("\0"*size)
107+
# END handle memory allocation
108+
109+
97110
def file_contents_ro(fd, stream=False, allow_mmap=True):
98111
""":return: read-only contents of the file represented by the file descriptor fd
99112
:param fd: file descriptor opened for reading

0 commit comments

Comments
 (0)