Skip to content

Commit 05cee2e

Browse files
committed
Added multi-threading performance tests which show that, during compression and decompression, it is not a tiny bit faster than without, which is due to the GIL and even separately locked zlib module implementations. The only way to make this faster update the resepctive c modules to drop the gil, and their own locks where possible
1 parent c64a974 commit 05cee2e

2 files changed

Lines changed: 101 additions & 3 deletions

File tree

ext/async

Submodule async updated from 164bb70 to 8cfa254

test/performance/test_stream.py

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@
33
from lib import TestBigRepoR
44
from gitdb.db import *
55
from gitdb.stream import *
6+
from gitdb.util import pool
7+
from gitdb.typ import str_blob_type
8+
from gitdb.fun import chunk_size
9+
10+
from async import (
11+
IteratorReader,
12+
ChannelThreadTask,
13+
)
614

715
from cStringIO import StringIO
816
from time import time
@@ -19,6 +27,30 @@
1927
)
2028

2129

30+
#{ Utilities
31+
def read_chunked_stream(stream):
32+
total = 0
33+
while True:
34+
chunk = stream.read(chunk_size)
35+
total += len(chunk)
36+
if len(chunk) < chunk_size:
37+
break
38+
# END read stream loop
39+
assert total == stream.size
40+
return stream
41+
42+
43+
class TestStreamReader(ChannelThreadTask):
44+
"""Expects input streams and reads them in chunks. It will read one at a time,
45+
requireing a queue chunk of size 1"""
46+
def __init__(self, *args):
47+
super(TestStreamReader, self).__init__(*args)
48+
self.fun = read_chunked_stream
49+
self.max_chunksize = 1
50+
51+
52+
#} END utilities
53+
2254
class TestObjDBPerformance(TestBigRepoR):
2355

2456
large_data_size_bytes = 1000*1000*10 # some MiB should do it
@@ -27,14 +59,17 @@ class TestObjDBPerformance(TestBigRepoR):
2759
@with_rw_directory
2860
def test_large_data_streaming(self, path):
2961
ldb = LooseObjectDB(path)
62+
string_ios = list() # list of streams we previously created
3063

64+
# serial mode
3165
for randomize in range(2):
3266
desc = (randomize and 'random ') or ''
3367
print >> sys.stderr, "Creating %s data ..." % desc
3468
st = time()
3569
size, stream = make_memory_file(self.large_data_size_bytes, randomize)
3670
elapsed = time() - st
3771
print >> sys.stderr, "Done (in %f s)" % elapsed
72+
string_ios.append(stream)
3873

3974
# writing - due to the compression it will seem faster than it is
4075
st = time()
@@ -78,7 +113,70 @@ def test_large_data_streaming(self, path):
78113
cs_kib = cs / 1000
79114
print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
80115

81-
# del db file so git has something to do
116+
# del db file so we keep something to do
82117
os.remove(db_file)
83-
84118
# END for each randomization factor
119+
120+
121+
# multi-threaded mode
122+
# want two, should be supported by most of todays cpus
123+
pool.set_size(2)
124+
total_kib = 0
125+
nsios = len(string_ios)
126+
for stream in string_ios:
127+
stream.seek(0)
128+
total_kib += len(stream.getvalue()) / 1000
129+
# END rewind
130+
131+
def istream_iter():
132+
for stream in string_ios:
133+
stream.seek(0)
134+
yield IStream(str_blob_type, len(stream.getvalue()), stream)
135+
# END for each stream
136+
# END util
137+
138+
# write multiple objects at once, involving concurrent compression
139+
reader = IteratorReader(istream_iter())
140+
istream_reader = ldb.store_async(reader)
141+
istream_reader.task().max_chunksize = 1
142+
143+
st = time()
144+
istreams = istream_reader.read(nsios)
145+
assert len(istreams) == nsios
146+
elapsed = time() - st
147+
148+
print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
149+
150+
151+
# decompress multiple at once, by reading them
152+
istream_reader = IteratorReader(iter([ i.sha for i in istreams ]))
153+
ostream_reader = ldb.stream_async(istream_reader)
154+
155+
chunk_task = TestStreamReader(ostream_reader, "chunker", None)
156+
output_reader = pool.add_task(chunk_task)
157+
158+
st = time()
159+
assert len(output_reader.read(nsios)) == nsios
160+
elapsed = time() - st
161+
162+
print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
163+
164+
# store the files, and read them back. For the reading, we use a task
165+
# as well which is chunked into one item per task. Reading all will
166+
# very quickly result in two threads handling two bytestreams of
167+
# chained compression/decompression streams
168+
reader = IteratorReader(istream_iter())
169+
istream_reader = ldb.store_async(reader)
170+
171+
istream_to_sha = lambda items: [ i.sha for i in items ]
172+
istream_reader.set_post_cb(istream_to_sha)
173+
174+
ostream_reader = ldb.stream_async(istream_reader)
175+
chunk_task = TestStreamReader(ostream_reader, "chunker", None)
176+
output_reader = pool.add_task(chunk_task)
177+
178+
st = time()
179+
assert len(output_reader.read(nsios)) == nsios
180+
elapsed = time() - st
181+
182+
print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)

0 commit comments

Comments
 (0)