Skip to content

Commit c64a974

Browse files
committed
Implemented all async methods, including test which shows how to chain the async method together
1 parent 10fef8f commit c64a974

7 files changed

Lines changed: 161 additions & 19 deletions

File tree

__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
"""Initialize the object database module"""
22

3+
import sys
4+
import os
5+
6+
#{ Initialization
7+
def _init_externals():
8+
"""Initialize external projects by putting them into the path"""
9+
sys.path.append(os.path.join(os.path.dirname(__file__), 'ext'))
10+
11+
#} END initialization
12+
13+
_init_externals()
14+
315
# default imports
416
from db import *
517
from stream import *

db.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
)
1515

1616
from util import (
17+
pool,
1718
ENOENT,
1819
to_hex_sha,
1920
exists,
@@ -32,6 +33,11 @@
3233
stream_copy
3334
)
3435

36+
37+
from async import (
38+
ChannelThreadTask
39+
)
40+
3541
import tempfile
3642
import mmap
3743
import os
@@ -40,6 +46,7 @@
4046
__all__ = ('ObjectDBR', 'ObjectDBW', 'FileDBBase', 'LooseObjectDB', 'PackedDB',
4147
'CompoundDB', 'ReferenceDB', 'GitObjectDB' )
4248

49+
4350
class ObjectDBR(object):
4451
"""Defines an interface for object database lookup.
4552
Objects are identified either by hex-sha (40 bytes) or
@@ -52,34 +59,48 @@ def __contains__(self, sha):
5259
def has_object(self, sha):
5360
"""
5461
:return: True if the object identified by the given 40 byte hexsha or 20 bytes
55-
binary sha is contained in the database
56-
:raise BadObject:"""
62+
binary sha is contained in the database"""
5763
raise NotImplementedError("To be implemented in subclass")
5864

65+
def has_object_async(self, reader):
66+
"""Return a reader yielding information about the membership of objects
67+
as identified by shas
68+
:param reader: Reader yielding 20 byte or 40 byte shas.
69+
:return: async.Reader yielding tuples of (sha, bool) pairs which indicate
70+
whether the given sha exists in the database or not"""
71+
task = ChannelThreadTask(reader, str(self.has_object_async), lambda sha: (sha, self.has_object(sha)))
72+
return pool.add_task(task)
73+
5974
def info(self, sha):
6075
""" :return: OInfo instance
6176
:param sha: 40 bytes hexsha or 20 bytes binary sha
6277
:raise BadObject:"""
6378
raise NotImplementedError("To be implemented in subclass")
6479

65-
def info_async(self, input_channel):
80+
def info_async(self, reader):
6681
"""Retrieve information of a multitude of objects asynchronously
67-
:param input_channel: Channel yielding the sha's of the objects of interest
68-
:return: Channel yielding OInfo|InvalidOInfo, in any order"""
69-
raise NotImplementedError("To be implemented in subclass")
82+
:param reader: Channel yielding the sha's of the objects of interest
83+
:return: async.Reader yielding OInfo|InvalidOInfo, in any order"""
84+
task = ChannelThreadTask(reader, str(self.info_async), self.info)
85+
return pool.add_task(task)
7086

7187
def stream(self, sha):
7288
""":return: OStream instance
7389
:param sha: 40 bytes hexsha or 20 bytes binary sha
7490
:raise BadObject:"""
7591
raise NotImplementedError("To be implemented in subclass")
7692

77-
def stream_async(self, input_channel):
93+
def stream_async(self, reader):
7894
"""Retrieve the OStream of multiple objects
79-
:param input_channel: see ``info``
95+
:param reader: see ``info``
8096
:param max_threads: see ``ObjectDBW.store``
81-
:return: Channel yielding OStream|InvalidOStream instances in any order"""
82-
raise NotImplementedError("To be implemented in subclass")
97+
:return: async.Reader yielding OStream|InvalidOStream instances in any order
98+
:note: depending on the system configuration, it might not be possible to
99+
read all OStreams at once. Instead, read them individually using reader.read(x)
100+
where x is small enough."""
101+
# base implementation just uses the stream method repeatedly
102+
task = ChannelThreadTask(reader, str(self.stream_async), self.stream)
103+
return pool.add_task(task)
83104

84105
#} END query interface
85106

@@ -114,21 +135,23 @@ def store(self, istream):
114135
:raise IOError: if data could not be written"""
115136
raise NotImplementedError("To be implemented in subclass")
116137

117-
def store_async(self, input_channel):
138+
def store_async(self, reader):
118139
"""Create multiple new objects in the database asynchronously. The method will
119140
return right away, returning an output channel which receives the results as
120141
they are computed.
121142
122143
:return: Channel yielding your IStream which served as input, in any order.
123144
The IStreams sha will be set to the sha it received during the process,
124145
or its error attribute will be set to the exception informing about the error.
125-
:param input_channel: Channel yielding IStream instance.
126-
As the same instances will be used in the output channel, you can create a map
127-
between the id(istream) -> istream
128-
:note:As some ODB implementations implement this operation as atomic, they might
146+
:param reader: async.Reader yielding IStream instances.
147+
The same instances will be used in the output channel as were received
148+
in by the Reader.
149+
:note:As some ODB implementations implement this operation atomic, they might
129150
abort the whole operation if one item could not be processed. Hence check how
130151
many items have actually been produced."""
131-
raise NotImplementedError("To be implemented in subclass")
152+
# base implementation uses store to perform the work
153+
task = ChannelThreadTask(reader, str(self.store_async), self.store)
154+
return pool.add_task(task)
132155

133156
#} END edit interface
134157

ext/async

Submodule async updated from 5a13dc5 to 164bb70

test/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,12 @@
11

2+
import gitdb.util
3+
4+
#{ Initialization
5+
def _init_pool():
6+
"""Assure the pool is actually threaded"""
7+
size = 2
8+
print "Setting ThreadPool to %i" % size
9+
gitdb.util.pool.set_size(size)
10+
11+
12+
#} END initialization

test/lib.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def with_rw_directory(func):
3030
"""Create a temporary directory which can be written to, remove it if the
3131
test suceeds, but leave it otherwise to aid additional debugging"""
3232
def wrapper(self):
33-
path = tempfile.mktemp(suffix=func.__name__)
33+
path = tempfile.mktemp(prefix=func.__name__)
3434
os.mkdir(path)
3535
try:
3636
return func(self, path)

test/test_db.py

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from gitdb.exc import BadObject
1111
from gitdb.typ import str_blob_type
1212

13+
from async import IteratorReader
14+
1315
from cStringIO import StringIO
1416
import os
1517

@@ -78,15 +80,99 @@ def _assert_object_writing(self, db):
7880
new_ostream = db.ostream()
7981

8082
# note: only works as long our store write uses the same compression
81-
# level, which is zip
83+
# level, which is zip_best
8284
assert ostream.getvalue() == new_ostream.getvalue()
8385
# END for each data set
8486
# END for each dry_run mode
87+
88+
def _assert_object_writing_async(self, db):
89+
"""Test generic object writing using asynchronous access"""
90+
ni = 5000
91+
def istream_generator(offset=0, ni=ni):
92+
for data_src in xrange(ni):
93+
data = str(data_src + offset)
94+
yield IStream(str_blob_type, len(data), StringIO(data))
95+
# END for each item
96+
# END generator utility
97+
98+
# for now, we are very trusty here as we expect it to work if it worked
99+
# in the single-stream case
100+
101+
# write objects
102+
reader = IteratorReader(istream_generator())
103+
istream_reader = db.store_async(reader)
104+
istreams = istream_reader.read() # read all
105+
assert istream_reader.task().error() is None
106+
assert len(istreams) == ni
107+
108+
for stream in istreams:
109+
assert stream.error is None
110+
assert len(stream.sha) == 40
111+
assert isinstance(stream, IStream)
112+
# END assert each stream
113+
114+
# test has-object-async - we must have all previously added ones
115+
reader = IteratorReader( istream.sha for istream in istreams )
116+
hasobject_reader = db.has_object_async(reader)
117+
count = 0
118+
for sha, has_object in hasobject_reader:
119+
assert has_object
120+
count += 1
121+
# END for each sha
122+
assert count == ni
123+
124+
# read the objects we have just written
125+
reader = IteratorReader( istream.sha for istream in istreams )
126+
ostream_reader = db.stream_async(reader)
127+
128+
# read items individually to prevent hitting possible sys-limits
129+
count = 0
130+
for ostream in ostream_reader:
131+
assert isinstance(ostream, OStream)
132+
count += 1
133+
# END for each ostream
134+
assert ostream_reader.task().error() is None
135+
assert count == ni
136+
137+
# get info about our items
138+
reader = IteratorReader( istream.sha for istream in istreams )
139+
info_reader = db.info_async(reader)
140+
141+
count = 0
142+
for oinfo in info_reader:
143+
assert isinstance(oinfo, OInfo)
144+
count += 1
145+
# END for each oinfo instance
146+
assert count == ni
147+
148+
149+
# combined read-write using a converter
150+
# add 2500 items, and obtain their output streams
151+
nni = 2500
152+
reader = IteratorReader(istream_generator(offset=ni, ni=nni))
153+
istream_to_sha = lambda istreams: [ istream.sha for istream in istreams ]
154+
155+
istream_reader = db.store_async(reader)
156+
istream_reader.set_post_cb(istream_to_sha)
157+
158+
ostream_reader = db.stream_async(istream_reader)
159+
160+
count = 0
161+
# read it individually, otherwise we might run into the ulimit
162+
for ostream in ostream_reader:
163+
assert isinstance(ostream, OStream)
164+
count += 1
165+
# END for each ostream
166+
assert count == nni
167+
168+
169+
85170

86171
@with_rw_directory
87172
def test_writing(self, path):
88173
ldb = LooseObjectDB(path)
89174

90175
# write data
91176
self._assert_object_writing(ldb)
177+
self._assert_object_writing_async(ldb)
92178

util.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,21 @@
22
import os
33
import errno
44

5+
from async import ThreadPool
6+
57
try:
68
import hashlib
79
except ImportError:
810
import sha
911

12+
#{ Globals
13+
14+
# A pool distributing tasks, initially with zero threads, hence everything
15+
# will be handled in the main thread
16+
pool = ThreadPool(0)
17+
18+
#} END globals
19+
1020

1121
#{ Aliases
1222

0 commit comments

Comments
 (0)