Welcome, guest | Sign In | My Account | Store | Cart

This recipe is the core of something called FDR (Full Duplex RPC). FDR has not been completed at this time, but the "spots" module takes most of the work off of other modules in network chatter. This is the second version of "spots" and simplifies the ZSP class while getting rid of the base255 module.

Python, 232 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
'''Module for Send Python Object Through Socket (SPOTS).

This module implements the Zero SPOTS Protocol, the Query/Reply Protocol,
the Query/Reply Interface, and a shortcut for constructing QRI objects.'''

################################################################################

__version__ = '$Revision: 0 $'
__date__ = 'March 27, 2007'
__author__ = 'Stephen "Zero" Chappell <my.bios@gmail.com>'
__credits__ = '''\
T. Parker, for testing code that led to this module.
B. Brown, for teaching me some math courses.
G. Rossum, for allowing thread support in Python.'''

################################################################################

import cPickle as _cPickle
import sys as _sys
import thread as _thread
import time as _time

################################################################################

class ZSP:

    'ZSP(socket) -> ZSP'

    def __init__(self, socket):
        'Initialize the Zero SPOTS Protocol object.'
        self.__file = socket.makefile('b', 0)

    def send(self, obj):
        'Send one object.'
        _cPickle.dump(obj, self.__file, _cPickle.HIGHEST_PROTOCOL)

    def recv(self):
        'Receive one object.'
        return _cPickle.load(self.__file)

################################################################################

class QRP:

    'QRP(ZSP) -> QRP'

    def __init__(self, ZSP):
        'Initialize the Query/Reply Protocol object.'
        self.__ZSP = ZSP
        self.__error = None
        self.__Q_anchor = []
        self.__Q_packet = []
        self.__R_anchor = {}
        self.__Q_lock = _thread.allocate_lock()
        self.__R_lock = _thread.allocate_lock()
        _thread.start_new_thread(self.__thread, ())

    def send_Q(self, ID, obj):
        'Send one query.'
        if self.__error:
            raise self.__error
        self.__ZSP.send((False, ID, obj))

    def recv_Q(self, timeout=None):
        'Receive one query.'
        if self.__error:
            raise self.__error
        if timeout is not None:
            if not isinstance(timeout, (float, int, long)):
                raise TypeError, 'timeout must be of type float, int, or long'
            if not timeout >= 0:
                raise ValueError, 'timeout must be greater than or equal to 0'
        self.__Q_lock.acquire()
        try:
            try:
                if self.__Q_packet:
                    Q = True
                    ID, obj = self.__Q_packet.pop()
                else:
                    Q = False
                    anchor = [_thread.allocate_lock()]
                    anchor[0].acquire()
                    self.__Q_anchor.append(anchor)
            finally:
                self.__Q_lock.release()
        except AttributeError:
            raise self.__error
        if Q:
            return ID, obj
        if timeout:
            _thread.start_new_thread(self.__Q_thread, (timeout, anchor))
        anchor[0].acquire()
        try:
            Q = anchor[1]
        except IndexError:
            if self.__error:
                raise self.__error
            raise Warning
        return Q

    def send_R(self, ID, obj):
        'Send one reply.'
        if self.__error:
            raise self.__error
        self.__ZSP.send((True, ID, obj))

    def recv_R(self, ID, timeout=None):
        'Receive one reply.'
        if self.__error:
            raise self.__error
        if timeout is not None:
            if not isinstance(timeout, (float, int, long)):
                raise TypeError, 'timeout must be of type float, int, or long'
            if not timeout >= 0:
                raise ValueError, 'timeout must be greater than or equal to 0'
        anchor = [_thread.allocate_lock()]
        anchor[0].acquire()
        self.__R_lock.acquire()
        try:
            try:
                self.__R_anchor[ID] = anchor
            finally:
                self.__R_lock.release()
        except AttributeError:
            raise self.__error
        if timeout:
            _thread.start_new_thread(self.__R_thread, (timeout, ID))
        anchor[0].acquire()
        try:
            R = anchor[1]
        except IndexError:
            if self.__error:
                raise self.__error
            raise Warning
        return R

    def __thread(self):
        'Private class method.'
        try:
            while True:
                R, ID, obj = self.__ZSP.recv()
                if R:
                    self.__R_lock.acquire()
                    if self.__R_anchor.has_key(ID):
                        self.__R_anchor[ID].append(obj)
                        self.__R_anchor[ID][0].release()
                        del self.__R_anchor[ID]
                    self.__R_lock.release()
                else:
                    self.__Q_lock.acquire()
                    if self.__Q_anchor:
                        anchor = self.__Q_anchor.pop()
                        anchor.append((ID, obj))
                        anchor[0].release()
                    else:
                        self.__Q_packet.append((ID, obj))
                    self.__Q_lock.release()
        except Exception, error:
            if isinstance(error, EOFError):
                self.__error = EOFError
            else:
                self.__error = IOError
            self.__Q_lock.acquire()
            for anchor in self.__Q_anchor:
                anchor[0].release()
            del self.__Q_anchor
            del self.__Q_packet
            self.__Q_lock.release()
            self.__R_lock.acquire()
            for key in self.__R_anchor:
                self.__R_anchor[key][0].release()
            del self.__R_anchor
            self.__R_lock.release()

    def __Q_thread(self, timeout, anchor):
        'Private class method.'
        _time.sleep(timeout)
        self.__Q_lock.acquire()
        if not self.__error and anchor in self.__Q_anchor:
            anchor[0].release()
            self.__Q_anchor.remove(anchor)
        self.__Q_lock.release()

    def __R_thread(self, timeout, ID):
        'Private class method.'
        _time.sleep(timeout)
        self.__R_lock.acquire()
        if not self.__error and self.__R_anchor.has_key(ID):
            self.__R_anchor[ID][0].release()
            del self.__R_anchor[ID]
        self.__R_lock.release()

################################################################################

class QRI:

    'QRI(QRP) -> QRI'

    def __init__(self, QRP):
        'Initialize the Query/Reply Interface object.'
        self.__QRP = QRP
        self.__ID = 0
        self.__lock = _thread.allocate_lock()

    def call(self, obj, timeout=None):
        'Send one query and receive one reply.'
        self.__lock.acquire()
        ID = ''.join(chr(self.__ID >> shift & 0xFF) for shift in xrange(24, -8, -8))
        self.__ID = (self.__ID + 1) % (2 ** 32)
        self.__lock.release()
        self.__QRP.send_Q(ID, obj)
        return self.__QRP.recv_R(ID, timeout)

    def query(self, timeout=None):
        'Receive one query.'
        return self.__QRP.recv_Q(timeout)

    def reply(self, ID, obj):
        'Send one reply.'
        self.__QRP.send_R(ID, obj)

################################################################################

def qri(socket):
    'Construct a QRI object.'
    return QRI(QRP(ZSP(socket)))

################################################################################

if __name__ == '__main__':
    _sys.stdout.write('Content-Type: text/plain\n\n')
    _sys.stdout.write(file(_sys.argv[0]).read())

The ZSP class is demonstrated in "Paint 2.0" to show one simple use of the "spots" module. This recipe demonstrates how network communications can be easier in a Python program by using a simple layer to handle data conversion on the fly.