|
Description:
This shared lock implementation supports timeouts so that an attempt to acquire a lock occasionally times out. It also preserves FIFO ordering for threads waiting for exclusive lock and has other valuable features.
Source: Text Source
__all__ = [ "SharedLock" ]
from threading import Lock, currentThread, Event
from random import randint
from exc_string import trace_string
if not hasattr(__builtins__, "sorted"):
def sorted(seq):
result = [ x for x in seq ]
result.sort()
return result
class SharedLock(object):
def __init__(self, log = None, debug = False):
"""
Takes two optional parameters, (1) log is an external log function the
lock would use to send its messages to, ex: lambda s: xprint(s),
(2) debug is a boolean value, if it's True the lock would be checking
its internal invariant before and after each call.
"""
self.__log, self.__debug, self.lckLock = log, debug, Lock()
self.thrOwner, self.intOwnerDepth, self.dicUsers = None, 0, {}
self.lstOwners, self.lstUsers, self.lstPooledEvents = [], [], []
def _log(self, s):
thrCurrent = currentThread()
self.__log("%s @ %.08x %s %s @ %.08x in %s"
% (thrCurrent.getName(), id(thrCurrent), s,
self._debug_dump(), id(self), trace_string()))
def _debug_dump(self):
return "SharedLock(Ex:[%s] (%s), Sh:[%s] (%s))" \
% (self.thrOwner is not None
and "%s:%d" % (self.thrOwner.getName(),
self.intOwnerDepth)
or "",
", ".join([ "%s:%d" % (th.getName(), dp)
for th, evt, dp in self.lstOwners ]),
", ".join(sorted([ "%s:%d" % (th.getName(), dp)
for th, dp in self.dicUsers.iteritems() ])),
", ".join([ "%s:%d" % (th.getName(), dp)
for th, evt, dp in self.lstUsers ]))
def debug_dump(self):
"""
Returns a printable string describing the current lock state.
"""
self._lock()
try:
return self._debug_dump()
finally:
self._unlock()
def _has_owner(self):
return self.thrOwner is not None
def _has_pending_owners(self):
return len(self.lstOwners) > 0
def _has_users(self):
return len(self.dicUsers) > 0
def _has_pending_users(self):
return len(self.lstUsers) > 0
def _invariant(self):
if self._has_owner() and self._has_users() \
and self.dicUsers.keys() != [self.thrOwner]:
return False
if not self._has_owner() and not self._has_users():
return not self._has_pending_owners() \
and not self._has_pending_users()
if (self._has_owner() and self.intOwnerDepth <= 0) \
or (not self._has_owner() and self.intOwnerDepth > 0):
return False
if len(filter(lambda dp: dp <= 0, self.dicUsers.values())) > 0:
return False
if not self._has_owner() and not self._has_pending_owners() \
and self._has_pending_users():
return False
if not self._has_owner() and not self._has_users() \
and self._has_pending_owners():
return False
lstPendingThreads = sorted(map(lambda t: t[0], self.lstUsers) +
map(lambda t: t[0], self.lstOwners))
for i in range(len(lstPendingThreads) - 1):
if lstPendingThreads[i] is lstPendingThreads[i+1]:
return False
return True
def _lock(self):
self.lckLock.acquire()
def _unlock(self):
self.lckLock.release()
def _pick_event(self):
if len(self.lstPooledEvents):
return self.lstPooledEvents.pop(0)
else:
return Event()
def _unpick_event(self, _evtEvent):
self.lstPooledEvents.append(_evtEvent)
def _acquire_event(self, _evtEvent, timeout):
if timeout is None:
_evtEvent.wait()
result = True
else:
_evtEvent.wait(timeout)
result = _evtEvent.isSet()
thrCurrent = currentThread()
self._lock()
try:
if not result:
result = _evtEvent.isSet()
boolReAcquireShared = False
if not result:
for i, (thrUser, evtEvent, intSharedDepth) in enumerate(self.lstUsers):
if thrUser is thrCurrent and evtEvent is _evtEvent:
assert intSharedDepth == 1
del self.lstUsers[i]
break
else:
for i, (thrOwner, evtEvent, intSharedDepth) in enumerate(self.lstOwners):
if thrOwner is thrCurrent and evtEvent is _evtEvent:
del self.lstOwners[i]
if intSharedDepth > 0:
if not self._has_owner():
self.dicUsers[thrCurrent] = intSharedDepth
else:
self.lstUsers.append((thrCurrent, _evtEvent, intSharedDepth))
boolReAcquireShared = True
break
else:
assert False, "Invalid thread for %s in %s" % \
(self._debug_dump(), trace_string())
self._release_threads()
if not boolReAcquireShared:
_evtEvent.clear()
self._unpick_event(_evtEvent)
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if result:
if self.__log: self._log("acquired")
else:
if self.__log: self._log("timed out in %.02f second(s) waiting for" % timeout)
if boolReAcquireShared:
if self.__log: self._log("acquiring %d previously owned shared lock(s) for" % intSharedDepth)
finally:
self._unlock()
if boolReAcquireShared:
assert self._acquire_event(_evtEvent, None)
return False
return result
def _release_events(self, _lstEvents):
for evtEvent in _lstEvents:
evtEvent.set()
def acquire(self, timeout = None):
"""
Attempts to acquire the lock exclusively within the optional timeout.
If the timeout is not specified, waits for the lock infinitely.
Returns True if the lock has been acquired, False otherwise.
"""
thrCurrent = currentThread()
self._lock()
try:
if self.__log: self._log("acquiring exclusive")
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if thrCurrent is self.thrOwner:
self.intOwnerDepth += 1
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("acquired exclusive")
return True
elif thrCurrent in self.dicUsers:
if self.dicUsers.keys() == [thrCurrent] \
and not self._has_pending_users() and not self._has_pending_owners():
self.thrOwner = thrCurrent
self.intOwnerDepth = 1
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("acquired exclusive")
return True
intSharedDepth = self.dicUsers.pop(thrCurrent)
evtEvent = self._pick_event()
self.lstOwners.append((thrCurrent, evtEvent, intSharedDepth))
self._release_threads()
elif not self._has_owner() and not self._has_users():
self.thrOwner = thrCurrent
self.intOwnerDepth = 1
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("acquired exclusive")
return True
else:
evtEvent = self._pick_event()
self.lstOwners.append((thrCurrent, evtEvent, 0))
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("waiting for exclusive")
finally:
self._unlock()
return self._acquire_event(evtEvent, timeout)
def acquire_shared(self, timeout = None):
"""
Attempts to acquire the lock in shared mode within the optional
timeout. If the timeout is not specified, waits for the lock
infinitely. Returns True if the lock has been acquired, False
otherwise.
"""
thrCurrent = currentThread()
self._lock()
try:
if self.__log: self._log("acquiring shared")
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if thrCurrent in self.dicUsers:
self.dicUsers[thrCurrent] += 1
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("acquired shared")
return True
elif thrCurrent is self.thrOwner:
if thrCurrent in self.dicUsers:
self.dicUsers[thrCurrent] += 1
else:
self.dicUsers[thrCurrent] = 1
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("acquired shared")
return True
elif not self._has_owner() and not self._has_pending_owners():
self.dicUsers[thrCurrent] = 1
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("acquired shared")
return True
else:
evtEvent = self._pick_event()
self.lstUsers.append((thrCurrent, evtEvent, 1))
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("waiting for shared")
finally:
self._unlock()
return self._acquire_event(evtEvent, timeout)
def _release_threads(self):
if self._has_owner():
boolWakeUpOwner = False
boolWakeUpUsers = False
elif not self._has_pending_owners():
boolWakeUpOwner = False
boolWakeUpUsers = self._has_pending_users()
elif not self._has_users():
boolWakeUpOwner = not self._has_pending_users() \
or randint(0, 1) == 0
boolWakeUpUsers = self._has_pending_users() and not boolWakeUpOwner
else:
boolWakeUpOwner = False
boolWakeUpUsers = False
lstEvents = []
if boolWakeUpOwner:
self.thrOwner, evtEvent, intSharedDepth = self.lstOwners.pop(0)
self.intOwnerDepth = 1
if intSharedDepth > 0:
self.dicUsers[self.thrOwner] = intSharedDepth
lstEvents.append(evtEvent)
elif boolWakeUpUsers:
for thrUser, evtEvent, intSharedDepth in self.lstUsers:
self.dicUsers[thrUser] = intSharedDepth
lstEvents.append(evtEvent)
del self.lstUsers[:]
self._release_events(lstEvents)
def release(self):
"""
Releases the lock previously locked by a call to acquire().
Returns None.
"""
thrCurrent = currentThread()
self._lock()
try:
if self.__log: self._log("releasing exclusive")
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if thrCurrent is not self.thrOwner:
raise Exception("Current thread has not acquired the lock")
self.intOwnerDepth -= 1
if self.intOwnerDepth > 0:
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("released exclusive")
return
self.thrOwner = None
self._release_threads()
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("released exclusive")
finally:
self._unlock()
def release_shared(self):
"""
Releases the lock previously locked by a call to acquire_shared().
Returns None.
"""
thrCurrent = currentThread()
self._lock()
try:
if self.__log: self._log("releasing shared")
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if thrCurrent not in self.dicUsers:
raise Exception("Current thread has not acquired the lock")
self.dicUsers[thrCurrent] -= 1
if self.dicUsers[thrCurrent] > 0:
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("released shared")
return
else:
del self.dicUsers[thrCurrent]
self._release_threads()
if self.__debug:
assert self._invariant(), "SharedLock invariant failed: %s in %s" % \
(self._debug_dump(), trace_string())
if self.__log: self._log("released shared")
finally:
self._unlock()
Discussion:
There is the only issue that I know and it has to do with timeout semantics:
Suppose the following sequence of events:
1. Thread A acquires the lock in shared mode.
2. Thread B attempts to acquire the lock in exclusive mode and blocks waiting for it.
3. Thread A attempts to acquire the lock in exclusive mode. It cannot obtain it at this moment, since that would have broken the FIFO semantics. Therefore thread A also blocks, but before it's blocked it has to give up its shared lock, otherwise thread B couldn't proceed. So thread A releases it's shared lock and then blocks for exclusive lock.
4. Thread B acquires the exclusive lock.
5. Thread B releases the exclusive lock.
6. Thread A acquires the exclusive lock and is also given its shared lock back - now it holds the exclusive lock as well as the shared one.
So far this is all normal and works fine. The problem would have appeared if thread A's attempt to acquire an exclusive lock at step 3 had a timeout on it, say 1 sec. If thread B holds the lock between steps 4 and 5 for more than 1 sec, thread A should time out, but now it has to somehow obtain its previously owned shared lock back before it can even report locking failure. Thread A hence blocks for indeterminate amount of time until it's able to get a shared lock back. Only then it reports exclusive locking attempt timeout. Therefore, in this case thread A's lock.acquire(1.0) still would return False, but not within a second, but in some longer time.
|