ASPN ActiveState Programmer Network  
ActiveState, a division of Sophos
/ Home / Perl / PHP / Python / Tcl / XSLT /
/ Safari / My ASPN /
Cookbooks | Documentation | Mailing Lists | Modules | News Feeds | Products | User Groups
Submit Recipe
My Recipes

All Recipes
All Cookbooks


View by Category

Title: EventQueue class to queue function calls
Submitter: Not specified Not specified (other recipes)
Last Updated: 2006/07/18
Version no: 1.0
Category: Threads

 

Not Rated yet


Description:

Synchronize/sequentialize sensitive parts of an asynchronous event solution with EventQueue.
For example, sequentialize changes to a document buffer which may come from multiple threads.

Source: Text Source

import threading

class EventQueue(threading.Thread):
    ''' queues function calls, saves their return values in self.return_values.
        >>> eq = EventQueue()    # start a thread watching self.queue
        >>> eq.push(slow_func); eq.push(slow_func, args); eq.push(slow_func, args, kwargs)
        # push some slow [but terminating!] function calls, which will be executed in the order in which they were pushed
        >>> eq.stop()    # stop the thread as soon as the current call returns, possibly preventing some calls from being executed.
    '''
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue = []
        self.nap_time = .1
        self.start()
        self.n = 0
        self.serving = None
        self.keep_history = True
        self.return_values = {}    # maps numbers of calls to their return values; unused if not self.keep_history
    
    def error(self, error, function, a=(), kw=None):
        ''' called when function raises error.
        '''
        print >> sys.stderr, 'EventQueue event raised exception (', function, a, kw or {}, '):', error
    
    def push(self, function, a=(), kw=None):
        ''' queue the call to function, return the number of the call.
        '''
        self.queue.append( (function, a, kw or {}) )
        return self.n + len(self.queue)
    
    def stop(self):
        self.running = False
    
    def get(self, n):
        ''' Block until self.n >= n; return whatever the n-th call returned [assuming self.keep_history].
        '''
        while self.n < n:
            time.sleep(self.nap_time)
        return self.return_values.get(n, None)
    
    def run(self):
        ''' a blocking loop which continually calls functions as specified in self.queue.
        '''
        self.running = True
        while self.running:
            if len(self.queue) == 0:
                time.sleep(self.nap_time)
            else:
                function, a, kw = self.queue.pop(0)
                self.serving = (function, a, kw)
                try:
                    if self.keep_history:
                        self.return_values[self.n] = function(*a, **kw)
                    else:
                        function(*a, **kw)
                except Exception, error:
                    self.error(error, function, a, kw)
                self.n += 1



def queue_event(f):
    ''' decorator which queues method/function calls in
        self.eventqueue [if f is a method whose first argument is 'self'],
        otherwise f.eventqueue.
    '''
    args = inspect.getargspec(f)[0]
    if args and (args[0] == 'self'):
        def decorated(self, *a, **kw):
            self.eventqueue.push(f, (self,) + a, kw)
    else:
        f.eventqueue = EventQueue()
        def decorated(*a, **kw):
            f.eventqueue.push(f, a, kw)
    decorated.__name__ = f.__name__
    decorated.__doc__ = f.__doc__
    return decorated

Discussion:

I'm building a pure-python implementation of a generalization of libobby.
http://gobby.0x539.de/index.html
Google around for pysynob (Python Synchronized Objects) in a few weeks. ;-)

I know that list.append is atomic, so I assume there are no issues with EventQueue.push, though I have not tested it extensively.

It is likely that you will want to queue all calls to particular methods or functions, hence the decorator.



Add comment

No comments.



Highest rated recipes:

1. A simple XML-RPC server

2. Web service accessible ...

3. IPy Notify

4. Changing return value ...

5. Quantum Superposition

6. Pickle objects under ...

7. Generalized delegates ...

8. Reorder a sequence (uses ...

9. Setting Win32 System ...

10. ObjectMerger




Privacy Policy | Email Opt-out | Feedback | Syndication
© 2006 ActiveState Software Inc. All rights reserved.