Here is the API we have been asked to implement class TriableLock: def __init__(self): pass def Lock(self): pass def Unlock(self): pass def TryLock(self): pass -- We are going to assume that the API is used correctly; that is, threads that do not hold the TriableLock do not call Unlock. This is easy to enforce if necessary by keeping track of a "lockholder" field, but it would add four extra lines of code to every snippet below and obscure the nuances that we are trying to discuss, so we have decide to elide it. Even though the question explicitly steers the reader away from a solution that requires a modified semaphore implementation, the temptation to invent new synchronization primitives or to violate existing API boundaries is too great for most people. So, many inexperienced programmers find themselves drawn to an implementation that violates the semaphore abstraction a tiny bit, as shown below: class TriableLock: def __init__(self): self.sema = Semaphore(1) def Lock(self): self.sema.P() def Unlock(self): self.sema.V() def TryLock(self): if self.sema.count < 1: return 0 else: self.sema.P() This solution is a non-starter, because it is unimplementable. Modifying the system libraries that provide a semaphore implementation is often not possible. And even if it were possible, there are some strong arguments for why programmers, even those who are well-versed in concurrent programming, should stick to well-established APIs for synchronization~\cite{commandments}. Semaphores hide the semaphore count behind an API boundary and do not provide a way to read it. Some naive readers may think that this is a deficiency with semaphores, something that escaped the attention of the many people who have implemented semaphores in various runtimes. Nothing could be further from the truth: semaphores explicitly hide the internal count of the semaphore, because code that peeks at the internal count is almost always incorrect! The astute reader will notice that TryLock in the code above is horrendously broken and may block despite, or in fact, because of, peeking at the semaphore count. -- Ok, since our previous attempt was a dismal failure, it is time to go back to basics and try again. Another approach may involve a flag to indicate whether the lock is held. And we can take advantage of the fact that loads and stores to booleans are atomic: Attempt 0 class TriableLock: def __init__(self): self.inuse = False def Lock(self): self.inuse = True def Unlock(self): self.inuse = False def TryLock(self): if self.inuse: return 0 else: return 1 This implementation gets the TryLock semantics right, but it fails on Lock! Lock never blocks, and permits multiple people to enter their critical sections simultaneously. Perhaps adding a semaphore can help: Attempt 1 class TriableLock: def __init__(self): self.inuse = False self.lock = Semaphore(1) def Lock(self): self.lock.P() self.inuse = True def Unlock(self): self.inuse = False self.lock.V() def TryLock(self): if self.inuse: return 0 else: return 1 This implementation gets Lock and Unlock right, but TryLock does not actually acquire a lock. Perhaps we can fix this with an internal call: Attempt 2 class TriableLock: def __init__(self): self.inuse = False self.lock = Semaphore(1) def Lock(self): self.lock.P() self.inuse = True def Unlock(self): self.inuse = False self.lock.V() def TryLock(self): if self.inuse: return 0 else: Lock() return 1 This is not a correct implementation. There is a race condition involving lines 3-4 of TryLock. Any number of threads can execute line 1 simultaneously, read inuse, decide that the lock is currently free, and then jump to line 4 at the same time. Naturally, only one of them will enter the critical section, but the rest will block indefinitely. Recall that the whole point of this exercise was to build a TryLock primitive that does not block for the duration of a critical section. The error here stemmed from reading inuse without a lock. Clearly, a proper solution will require some kind of mutual exclusion in the TriableLock implementation so internal data values can be read and modified without race conditions. So let us try and protect the internal fields: Attempt 3 class TriableLock: def __init__(self): self.inuse = False self.reallock = Semaphore(1) self.mutex = Semaphore(1) def Lock(self): with self.mutex: self.reallock.P() self.inuse = True def Unlock(self): with self.mutex: self.inuse = False self.reallock.V() def TryLock(self): with self.mutex: if self.inuse: return 0 else: self.reallock.P() self.inuse = True return 1 We are now getting to the point where the code is complex enough that every solution will have multiple isomorphic variants. For instance, moving the 3rd line of Unlock out of the with block is a benign move that perhaps reduces critical section size, but does not materially change the solution. This implementation fixes the race condition in TryLock. But it suffers from the same problem; namely, TryLock can still block! If thread1 calls Lock() and returns, and thread2 calls lock, thread2 will end up blocking on reallock (as intended) while holding the mutex (not at all intended). This will then cause every other thread that calls TryLock() to block on the mutex for the duration of T1's and T2's critical sections! Since the problems we face stem from oversynchronization, a very tempting thing to do at this point is to frantically move operations on the reallock out of the critical section. Here is one attempt: class TriableLock: def __init__(self): self.inuse = False self.reallock = Semaphore(1) self.mutex = Semaphore(1) def Lock(self): with self.mutex: self.inuse = True self.reallock.P() def Unlock(self): with self.mutex: self.inuse = False self.reallock.V() def TryLock(self): with self.mutex: if self.inuse: return 0 else: self.reallock.P() self.inuse = True return 1 This fixed the previous problem, but TryLock can still block as a result, not of interference with Lock like last time, but as a result of self-interference. Specifically, thread1 calls TryLock and succeeds; thread2 calls TryLock and blocks. Here is a quick stab at fixing this problem that involves swapping two lines and taking one of them out of the mutex block: class TriableLock: def __init__(self): self.inuse = False self.reallock = Semaphore(1) self.mutex = Semaphore(1) def Lock(self): with self.mutex: self.inuse = True self.reallock.P() def Unlock(self): with self.mutex: self.inuse = False self.reallock.V() def TryLock(self): with self.mutex: if self.inuse: return 0 else: self.inuse = True self.reallock.P() return 1 This takes us back to a variant of our problem from before, where T1 performs a Lock(), T2 performs a Lock and is blocked on P(), T1 calls Unlock and is interrupted before the V(), T3 arrives in TryLock, decides that the lock is free, and is right before the P(). What happens at this point is at the whim of the scheduler. If it schedules T2 next, T3 will block for the duration of T2\'s critical section, something we were desperately trying to avoid. Evidently, this line of inquiry is not leading up to a successful solution. Let us try another take. class TriableLock: def __init__(self): self.inuse = False self.mutex = Semaphore(1) def Lock(self): while self.inuse: pass def Unlock(self): self.inuse = False def TryLock(self): if self.inuse: return 0 else: self.inuse = True return 1 Clearly, the Lock() implementation here contains a busy wait; this is a non-solution. Since we know that solving this problem using a monitor and condition variables is quite trivial, perhaps an alternative strategy would be to implement something like a monitor with a "poor-programmer's condition variable." Here is a very commonly used attempt: class TriableLock: def __init__(self): self.blocker = Semaphore(0) # note initialization to 0, not 1 self.mutex = Semaphore(1) self.locked = False def Lock(self): with self.mutex: while self.locked: self.mutex.V() # manually release self.blocker.P() # block until a V on blocker self.mutex.P() # reacquire self.locked = True def Unlock(self): self.locked = False self.blocker.V() # wake up one person who is blocked! def TryLock(self): with self.mutex: if self.locked: return 0 else: self.locked = True return 1 This looks like it does most things right. If the lock is busy, a thread that calls Lock() will give up the mutex and go to sleep on blocker. Upon an Unlock(), the lock holder will wake up one thread from the blocker semaphore, which will reacquire the mutex on wakeup, recheck the condition and make forward progress if possible. There are three immediate causes for concern, but none of them actually impact correctness. First, the unlock is occurring without the mutex being held. If this bothers you, go ahead and add it; it makes no material difference. Second, the Unlock() is releasing only one thread, and one should think a little bit about how many threads actually need to be woken up. But since at most one thread can make progress through Lock(), a single V() should be sufficient. Finally, the lock acquisition code inside TryLock() looks nothing like the lock acquisition code in Lock(); in particular, it is nowhere near as complicated, as it lacks the loop and any access to the blocker semaphore. But this is actually completely benign as well; it is easy to see that the else clause in TryLock can only be executed by at most one thread, and further, this thread is assured of succeeding in its locking attempt in the else clause; therefore, it need not be as complicated as the full Lock implementation. But there is nevertheless a big problem with this code! Consider what happens during routine use, where the same thread acquires and releases the same triable lock. The Unlock routine will blindly V() the blocker semaphore each and every time. So if this pattern is repeated M times, uncontended, then the blocker semaphore count will be M. A subsequent Lock() call by T2, when T1 is holding the lock, will then spin M times in the loop until the semaphore finally reaches 0 and blocks T2. So this code exhibits a "limited busy loop," where a thread does not busy loop for the entire duration of another thread\'s critical section, but nevertheless can spin for an arbitrarily long time (depending on what M has counted up to). One could fix this problem by keeping track of how many people are sleeping on the blocker semaphore, as follows: class TriableLock: def __init__(self): self.blocker = Semaphore(0) # note initialization to 0, not 1 self.mutex = Semaphore(1) self.locked = False self.waiting = False def Lock(self): with self.mutex: while self.locked: self.waiting = True self.mutex.V() # manually release self.blocker.P() # block until a V on blocker self.mutex.P() # reacquire self.waiting = False self.locked = True def Unlock(self): with self.mutex: self.locked = False if self.waiting: self.blocker.V() # wake up one person who is blocked! def TryLock(self): with self.mutex: if self.locked: return 0 else: self.locked = True return 1 This version of the code is almost, but not quite, correct; under some circumstances, threads trying to acquire the Lock will never be woken up. The problem lies in the Lock() routine when it is being contended by more than one thread, and is easy to locate if we make a habit of manually checking invariants at problem spots where locks are released and re-acquired. In this case, if T1 holds the lock, and T2 and T3 are spinning in the while loop, T2 will blindly set waiting to false even though T3 is still waiting to acquire the same lock. Its subsequent Unlock will not notice that T3 is waiting, and it will not perform the crucial V() to wake up T3. In general, counters are much less error-prone than boolean variables, as increment and decrement operations on counters commute. Re-casting the solution above to use counters yields the following: class TriableLock: def __init__(self): self.blocker = Semaphore(0) # note initialization to 0, not 1 self.mutex = Semaphore(1) self.locked = False self.waiting = 0 def Lock(self): with self.mutex: while self.locked: self.waiting += 1 self.mutex.V() # manually release self.blocker.P() # block until a V on blocker self.mutex.P() # reacquire self.waiting -= 1 self.locked = True def Unlock(self): with self.mutex: self.locked = False if self.waiting > 0: self.blocker.V() # wake up one person who is blocked! def TryLock(self): with self.mutex: if self.locked: return 0 else: self.locked = True return 1 The astute reader will note that this is a correct solution. There exist other correct solution strategies: class TriableLock: def __init__(self): self.available = True self.enqueued = 0 self.available_lock = Semaphore(1) self.critical_section = Semaphore(1) def Lock(self): with self.available_lock: if self.available: self.critical_section.P() self.available = False else: self.enqueued += 1 self.available_lock.V() self.critical_section.P() self.available_lock.P() self.enqueued -= 1 def Unlock(self): with self.available_lock: if self.enqueued == 0: self.available = True self.critical_section.V() def TryLock(self): with self.available_lock: if self.available: self.critical_section.P() self.available = False return 1 else: return 0 And here is another solution strategy that uses just a single count: class TriableLock: def __init__(self): self.actual_lock = Semaphore(1) self.count_lock = Semaphore(1) self.count = 0 def Lock(self): with self.counter_lock: self.count += 1 self.actual_lock.P() def Unlock(self): with self.counter_lock: self.actual_lock.V() self.count -= 1 def TryLock(self): with self.counter_lock: if self.count > 0: return 0 else: self.count += 1 self.actual_lock.P() return 1 Contrast this solution with one that uses monitors and condition variables: class TriableLock: def __init__(self): self.lock = Lock() self.waiting = Condition(self.lock) self.inuse = False def Lock(self): with self.lock: while(self.inuse): self.waiting.wait() self.inuse = True def Unlock(self): with self.lock: self.inuse = False self.waiting.notify() def TryLock(self): with self.lock: oldval = self.inuse self.inuse = True return 0 if oldval else 1 This example starkly illustrates the advantages of using monitors and condition variables over semaphores. The resulting code is cleaner, much easier to understand, and most of all, much easier to prove correct.