Welcome, guest | Sign In | My Account | Store | Cart

A framework for easy (2-way) inter-thread communication resembling normal function calling.

Especially useful for non-blocking UI techniques and for load distribution on jerky resources. Can replace stiff Queue.Queue techniques in most cases - making threading code more readable and functional.

CallQueue lets you express function directly in local context, but execute things in a target thread. It focuses naturally on 2-way communication (with return value responses) and includes a fluid concept for inter-thread exception (transfer) issues. Supports also multi-producer, multi-consumer communication.

A target thread just has to do callqueue.receive() periodically without worrying about any data passing. Thus CallQueue also supports naturally a high-level bulk threading concept with anonymous "default consumer threads": Allocated "thread resources" can be thrown efficently on bunches of jobs.

Python, 206 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def example_CallQueue():
    # 3 default consumer threads work and collect on 6 I/O jobs
    import urllib,time
    l=['http://www.python.org/',
       'http://www.python.org/about/gettingstarted',
       'http://www.python.org/about/apps/',
       'http://www.python.org/about/quotes',
       'http://www.python.org/about/website',
       'http://www.python.org/about/help', ]
    def work(url):
        return len(urllib.urlopen(url).read())
    #
    cq=CallQueue(max_default_consumer_threads=3)
    for url in l:
        cqitem=cq.call_and_collect(work,(url,))      #schedule
    while not cq.is_done():
        for cqitem in cq.get_next_collected():       #harvest
            print cqitem.args, cqitem.get_return()
        print ".",
        time.sleep(0.001)
    print "example_CallQueue done."

def example2_CallQueue():
    # interacts with my_thread and finally kills it ..
    import sys,thread,time,copy,random
    cq=CallQueue()
    l=[]            # to be filled by my_thread
    def my_thread():
        try: 
            while 1:
                cq.receive()
                l.append(random.random())
        finally:
            print "my_thread terminated."
    thread.start_new(my_thread,())
    time.sleep(0.020)
    l_snapshot=cq.call(lambda:copy.copy(l),wait=1)  #synchronous call
    print "1st snapshot:", len(l_snapshot),"numbers"
    cqitem=cq.call(lambda:copy.copy(l))             #non-blocking call
    while 1:
        if cqitem.is_done():
            l_snapshot2=cqitem.get_return()
            print "2n snapshot:", len(l_snapshot2),"numbers"
            break
        else:
            print ".",
            time.sleep(0.001)
    cq.call(sys.exit, wait=1, raise_exception=2)    #raises SystemExit inside my_thread
    print "end state:",len(l),"numbers"
    print "example2_CallQueue done."


import sys
from time import time as _time, sleep as _sleep

class Full(Exception):pass
class Empty(Exception):pass

class CQItem:
    args=None
    kwargs=None
    done=0              # 1=return value; 2=exception
    delivered=0
    raise_exception=1
    def get_return(self,alt_return=None):
        """delivers the return value or (by default) echoes the exception of the call job
        """
        if self.done==2:
            if self.raise_exception & 1:    #by default exception is raised
                exc=self.exc
                del self.exc
                raise exc[0],exc[1],exc[2]
            else:
                return alt_return
        return self.ret
    def get_exception(self):
        return self.exc
    def is_done(self):
        """returns 1, if the call return's a value; 2, if an exception was raised
        """
        return self.done

class CallQueue:
    closed=0
    exc=None
    max_dthreads=0
    dthreads_count=0
    def __init__(self,maxsize=None,max_default_consumer_threads=0):
        self.fifo=[]            # self.fifo=Queue.Queue() not necessary, if .append() and .pop(0) Python atomic
        self.collected=[]
        self.maxsize=maxsize    # approximate guarantee, if Queue.Queue is not used
        self.max_dthreads=max_default_consumer_threads
    def call( self, func, args=(), kwargs={}, wait=0, timeout=None, raise_exception=1, alt_return=None ):
        """Puts a call into the queue and optionally waits for return.
        
        wait:  0=asynchronous call. A call queue item is returned
               1=waits for return value or exception
               callable -> waits and wait()-call's while waiting for return
        raise_exception: 1=raise in caller, 2=raise in receiver, 3=raise in both, 
                         0=silent replace with alt_return
        """
        if self.dthreads_count<self.max_dthreads:
            self.add_default_consumer_threads(n=1)
        if self.closed:
            raise Full, "queue already closed"
        cqitem=CQItem()
        cqitem.func=func
        cqitem.args=args
        cqitem.kwargs=kwargs
        cqitem.wait=wait
        cqitem.raise_exception=raise_exception
        if self.maxsize and len(self.fifo)>=self.maxsize:
            raise Full, "queue's maxsize exceeded"
        self.fifo.append( cqitem )
        if self.closed:
            raise Full, "queue already closed"
        if wait:
            starttime = _time()
            delay=0.0005
            while not cqitem.is_done():
                if timeout:
                    remaining = starttime + timeout - _time()
                    if remaining <= 0:  #time is over and no element arrived
                        if raise_exception:
                            raise Empty, "return timed out"
                        else:
                            return alt_return
                    delay = min(delay * 2, remaining, .05)
                else:
                    delay = min(delay * 2, .05)
                if callable(wait): wait()
                _sleep(delay)       #reduce CPU usage by using a sleep
            return cqitem.get_return()
        return cqitem
    def call_and_collect(self,*args,**kwargs):
        r=self.call(*args,**kwargs)
        self.collected.append(r)
        return r
    def add_default_consumer_threads(self,n=1,maxdelay=0.016):
        import thread, weakref
        weak_self=weakref.proxy(self)
        for i in range(n):
            self.dthreads_count+=1
            tid=thread.start_new(_default_consumer_thread,(weak_self,maxdelay))
    def is_done(self):
        """check if call-queue and collected are flushed"""
        if self.fifo or self.collected:
            return False
        return True
    def get_next_collected(self):
        next=[]
        for cqitem in self.collected[:]:
            if not isinstance(cqitem,CQItem) or cqitem.is_done():
                next.append(cqitem)
                self.collected.remove(cqitem)
        return next

    def receive(self):
        """To be called (periodically) by target thread(s). Returns number of calls handled.
        """
        count=0
        while self.fifo:
            try:
                cqitem=self.fifo.pop(0)
            except IndexError:
                break  # multi-consumer race lost
            try:
                cqitem.ret=cqitem.func(*cqitem.args,**cqitem.kwargs)
                cqitem.done=1
            except:
                if cqitem.raise_exception & 1:
                    cqitem.exc=sys.exc_info()
                cqitem.done=2
                if cqitem.raise_exception & 2:
                    raise
            count+=1
        return count
            
    def qsize(self):
        """Returns current number of unconsumed calls in the queue
        """
        return len(self.fifo)
    def close(self):
        """stops further attempts for calling and terminates default consumer threads
        """
        self.closed=1
    def close_and_receive_last(self):
        self.close()
        self.receive()
    def __del__(self):
        self.close()

def _default_consumer_thread(cq,maxdelay=0.016):
    delay=0.001
    try: 
        while not cq.closed:
            count=cq.receive()
            if count: delay=0.001
            _sleep(delay)
            delay=min(delay*2,maxdelay)
    except ReferenceError:
        pass

if __name__=='__main__':
    example_CallQueue()
    example2_CallQueue()

While bare Queue.Queue techniques require the design of a a full blown client-server like protocol in the style of inter-process communication (or clumsy Java-ish alter ego classes), the function-oriented CallQueue makes inter-thread communication a snap and treading code remains readable and cohesive.

See also twin-brother recipe "BackgroundCall" http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/491280 .

Have requested that kind of layout for addition to Python's Queue module. Feedback / Suggestions?