Welcome, guest | Sign In | My Account | Store | Cart

A thread-like interface for those who want to "use" threads in Python with PyGTK. I use it when loading files and display a nice progress bar. The function or method you give to GIdleThread should "yield" every now and then. This makes it simpler to write code, since you do not have to care about nasty locks.

Python, 189 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# vim:sw=4:et:
"""This module contains some helpers that can be used to execute generator
functions in the GObject main loop.

This module provided the following classes:
GIdleThread - Thread like behavior for generators in a main loop
Queue - A simple queue implementation suitable for use with GIdleThread

Exceptions:
QueueEmpty - raised when one tried to get a value of an empty queue
QueueFull - raised when the queue reaches it's max size and the oldest item
            may not be disposed.
"""

from __future__ import generators

import gobject
import time
import traceback

class GIdleThread(object):
    """This is a pseudo-"thread" for use with the GTK+ main loop.

    This class does act a bit like a thread, all code is executed in
    the callers thread though. The provided function should be a generator
    (or iterator).

    It can be started with start(). While the "thread" is running is_alive()
    can be called to see if it's alive. wait([timeout]) will wait till the
    generator is finished, or timeout seconds.
    
    If an exception is raised from within the generator, it is stored in
    the error property. Execution of the generator is finished.

    Note that this routine runs in the current thread, so there is no need
    for nasty locking schemes.

    Example (runs a counter through the GLib main loop routine):
    >>> def counter(max): for x in xrange(max): yield x
    >>> t = GIdleThread(counter(123))
    >>> t.start()
    >>> while gen.is_alive():
    ...     main.iteration(False)
    """

    def __init__(self, generator, queue=None):
        assert hasattr(generator, 'next'), 'The generator should be an iterator'
        self._generator = generator
        self._queue = queue
        self._idle_id = 0
        self._error = None

    def start(self, priority=gobject.PRIORITY_LOW):
        """Start the generator. Default priority is low, so screen updates
        will be allowed to happen.
        """
        idle_id = gobject.idle_add(self.__generator_executer,
                                   priority=priority)
        self._idle_id = idle_id
        return idle_id

    def wait(self, timeout=0):
        """Wait until the corouine is finished or return after timeout seconds.
        This is achieved by running the GTK+ main loop.
        """
        clock = time.clock
        start_time = clock()
        main = gobject.main_context_default()
        while self.is_alive():
            main.iteration(False)
            if timeout and (clock() - start_time >= timeout):
                return

    def interrupt(self):
        """Force the generator to stop running.
        """
        if self.is_alive():
            gobject.source_remove(self._idle_id)
            self._idle_id = 0

    def is_alive(self):
        """Returns True if the generator is still running.
        """
        return self._idle_id != 0

    error = property(lambda self: self._error,
                     doc="Return a possible exception that had occured "\
                         "during execution of the generator")

    def __generator_executer(self):
        try:
            result = self._generator.next()
            if self._queue:
                try:
                    self._queue.put(result)
                except QueueFull:
                    self.wait(0.5)
                    # If this doesn't work...
                    self._queue.put(result)
            return True
        except StopIteration:
            self._idle_id = 0
            return False
        except Exception, e:
            self._error = e
            traceback.print_exc()
            self._idle_id = 0
            return False


class QueueEmpty(Exception):
    """Exception raised whenever the queue is empty and someone tries to fetch
    a value.
    """
    pass


class QueueFull(Exception):
    """Exception raised when the queue is full and the oldest item may not be
    disposed.
    """
    pass


class Queue(object):
    """A FIFO queue. If the queue has a max size, the oldest item on the
    queue is dropped if that size id exceeded.
    """

    def __init__(self, size=0, dispose_oldest=True):
        self._queue = []
        self._size = size
        self._dispose_oldest = dispose_oldest

    def put(self, item):
        """Put item on the queue. If the queue size is limited ...
        """
        if self._size > 0 and len(self._queue) >= self._size:
            if self._dispose_oldest:
                self.get()
            else:
                raise QueueFull

        self._queue.insert(0, item)

    def get(self):
        """Get the oldest item off the queue.
        QueueEmpty is raised if no items are left on the queue.
        """
        try:
            return self._queue.pop()
        except IndexError:
            raise QueueEmpty


if __name__ == '__main__':
    def counter(max):
        for i in range(max):
            yield i

    def shower(queue):
        # Never stop reading the queue:
        while True:
            try:
                cnt = queue.get()
                print 'cnt =', cnt
            except QueueEmpty:
                pass
            yield None

    print 'Test 1: (should print range 0..22)'
    queue = Queue()
    c = GIdleThread(counter(23), queue)
    s = GIdleThread(shower(queue))
    
    main = gobject.main_context_default()
    c.start()
    s.start()
    s.wait(2)

    print 'Test 2: (should only print 22)'
    queue = Queue(size=1)
    c = GIdleThread(counter(23), queue)
    s = GIdleThread(shower(queue))
    
    main = gobject.main_context_default()
    c.start(priority=gobject.PRIORITY_DEFAULT)
    s.start()
    s.wait(3)

I started using Threads, but this seemed not very stable. Using generators was a logical alternative. This "thread" runs in an isolated environment. If an exception occurs it is stored in the thread object.