Welcome, guest | Sign In | My Account | Store | Cart

The MultiThread module provides a simple abstraction to execute a function on many sets of arguments in parallel using a bounded pool of threads.

Python, 58 lines
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import random
import sys
import threading
import time
import types

class Thread( threading.Thread ):
    def  __init__( self, target, args=() ):
         if type( args ) <> types.TupleType:
            args = (args,)
         threading.Thread.__init__( self, target=target, args=args )

class LockedIterator:
    def __init__( self, iterator ):
        self._lock     = threading.Lock()
        self._iterator = iterator

    def __iter__( self ):
        return self

    def next( self ):
        try:
            self._lock.acquire()
            return self._iterator.next()
        finally:
            self._lock.release()

class MultiThread:
    def __init__( self, function, argsVector, maxThreads=5 ):
        self._function     = function
        self._argsIterator = LockedIterator( iter( argsVector ) )
        self._threadPool   = []
        for i in range( maxThreads ):
            self._threadPool.append( Thread( self._tailRecurse ) )

    def _tailRecurse( self ):
        for args in self._argsIterator:
            self._function( args ) 

    def start( self ):
        for thread in self._threadPool:
            time.sleep( 0 ) # necessary to give other threads a chance to run
            thread.start()

    def join( self, timeout=None ):
        for thread in self._threadPool:
            thread.join( timeout )

def recite_n_times_table( n ):
    for i in range( 1, 13 ):
        print "%d * %d = %d" % (n, i, n * i)
        time.sleep( 0.5 + random.random() )

if __name__=="__main__":
   mt = MultiThread( recite_n_times_table, range( 1, 13 ) )
   mt.start()
   mt.join()
   print "Well done kids!" 

The MultiThread module provides a simple abstraction to execute a function on many sets of arguments in parallel using a bounded pool of threads.

The Thread class is a simple wrapper class for threading.Thread that handles single arguments not being passed as a 1-tuple and reorders the arguments to threading.Thread in a more convenient fashion to allow positional parameters to be used for the common case that you don't want to specify a thread group (which has no effect anyway). This is from someone else's recipe but I forget who. :(

The LockedIterator class is a wrapper class that will make any arbitrary iterator thread-safe by guarding access to its next member function with a threading.Lock.

The MultiThread class takes a function, function, a vector of arguments, argVector, and possibly a bound on the number of threads to use, maxThreads. It supplies a thread-like interface; a constructor and two member functions: start, which starts all the threads in the threadPool; and join, which performs a join on all the threads in the thread pool. The key idea is to use the private member function, _tailRecurse, to have each thread in the thread pool tail recurse on the next available set of arguments supplied by the LockedIterator, _argsIterator, until all the work has been completed.

If run as a program, the module runs a demonstration that simulates a class of schoolchildren recited their multiplication tables as fast as they can.

1 comment

Emilio Monti 13 years, 11 months ago  # | flag

Since Python 2.5 it is much easier to write a ThreadPool: http://code.activestate.com/recipes/577187-python-thread-pool/