Asynchronous Generators in Python
The Wide Finder exercise is all about simple ways of doing parallel processing. For me this means finding a minimalistic yet powerful and flexible way of writing multi-threaded code. There's lots of room for improvement here, and I couldn't resist having a whack at writing a parallelization library. The language, of course, is Python.
Simple syntax
The problem I'm trying to solve is coarse-grained parallelism, where certain large chunks of code get executed in parallel.1 The programmer is responsible for marking out the code to be parallelized, so this should be as simple a task as possible, without seriously compromising flexibility.
There are several Python libraries that tackle this course-grained parallelization problem; among them, Paul Boddie's pprocess is pretty neat. I've used it for my Wide Finder implementation, but I wanted something less verbose and more intuitive.
Python's generators/iterators seem like a great way to express asynchronous execution, they are a natural fit for getting data into and out of an asynchronous worker. Combining that with Python's built-in list-wrangling functions should make for a nice, clean, intuitive syntax. I've started writing this library a few times and I think I finally got it right, both syntax-wise and semantically. Here's some sample code:
# first, an asynchronous worker that expects no input
@async
def worker():
# ... do some processing
yield value1
# ... do some more processing
yield value2
# another asynchronous worker; this one receives input
@async_with_input({'the_data': some_generator})
def worker2(the_data):
for value in the_data:
# ... do some processing
yield out_value
So what happens is that the @async and @async_with_input decorators wrap the function, fork off a child process, and run the function in the child. This approach is inspired from pprocess (in fact, my code uses pprocess behind the scenes). The extra magic is in the input and output: instead of explicitly exchanging messages with the child processes, generators/iterators are used on either side, and the framework moves data using messages behind the scenes.
Pulling data
Using generators has the interesting side-effect of turning the whole implementation into a pull-based system. Assume we have several stages of computation; we can chain several generator functions to increase parallelism. After defining the functions, the programmer requests the result; this is pulled from the final function, which pulls data from its input (a generator, which can be an asynchronous function), which pulls data from another generator, etc. Computation is done on-demand and, assuming we have sufficient parallelism in the code, the load gets spread out nicely.
What doesn't work
The current implementation is pretty rough and has several missing features. One is that each wrapped function launches a single process; the user should be able to specify how many processes are to be launched when invoking the wrapper, and the framework should distribute the load among them. Also, @async_with_input accepts only one argument, so you can't write an asynchronous aggregator of several generators. These two are relatively easy to fix and they're next on my list.
A somewhat trickier problem involves bottlenecks: if the consumer of a generator pulls data unevenly, the provider of that data may sometimes stall (because nobody is pulling data). This can be solved with buffering: the framework requests a few extra results and keeps them buffered until the consumer can use them. This feature is pretty important, and I'll need it if I'm going to max out the processors with my Wide Finder code.
Next steps
- Multiple asynchronous workers:
@async(jobs=5)– start a pool of workers and pull data from all of them - Multiple input sources for a job:
@async_with_input({'i1':g1, 'i2': g2}) - Result buffering:
@async(buffer=10)– the framework pulls and buffers a few results before they are needed - Support for coroutines like in PEP 342
- Pickle/unpickle using temporary files:
@async(pickle_in=True, piclke_out=True)– for large datasets, inter-process communication can be painfully slow; it's much quicker to exchange data via temporary files
There's a project page and a git repository (git clone http://grep.ro/git/asyncgen.git/). I'll have more posts up as work progresses.
1 Contrast this with fine-grained parallelism, which focuses on smaller bits of code like loops and method calls. This is what Erlang does, for example. Stackless Python can also do this, to an extent.