Discussion 13: Synchronized Buffers

Download Solution Code download
Exercise 1: Explaining Race Conditions
What are race conditions? Why haven't we needed to worry about them until this point in the semester?
Race conditions occur when multiple threads attempt to access a shared variable in memory during the same window of time (with at least one of the threads reassigning this variable). The behavior (i.e., the final value of this shared variable and possibly other side-effects) of the code during this time window differs based on the specific interleaving of the executing instructions between these threads.

We have not needed to worry about race conditions before since our code has not been concurrent; it has always executed sequentially on a single thread.
Exercise 2: Race Conditions
Suppose we have a RingBuffer object buffer containing Integers, and the current state of the array backing store for our buffer looks like this:

You can also assume that, at this point, iHead == 0 and iTail == 3. Then, suppose the following enqueue calls are executed on separate threads:
1
2
new Thread(() -> buffer.enqueue(3)).start();
new Thread(() -> buffer.enqueue(4)).start();
1
2
new Thread(() -> buffer.enqueue(3)).start();
new Thread(() -> buffer.enqueue(4)).start();
Draw the possible states of the buffer (specifically, the backing array and iTail) after these calls are executed.
There are two possible outcomes where invariants are maintained/the program behaves as intended:

and

For both of these, iTail == 0.

There are several pathological cases:



It is possible that only one update gets written, and iTail only gets incremented once (iTail == 4). This happens when both threads read iTail == 3 and perform their updates without seeing the other thread’s updates, leading to the first update being overwritten.

It is also possible for the array states to be the same as above, but iTail gets incremented twice (iTail == 0). This is because in our implementation, the array modification is performed first, and then iTail is incremented, so there exists an interleaving where the second executing thread does not read the new iTail value when adding to the array, but does read it when incrementing iTail.



Exercise 3: Concurrency Benefits
What benefits does a concurrent ring buffer offer over the simple data structure from before? Give two examples where we'd prefer the concurrency-friendly implementation.
Concurrency allows multiple threads to process the data in the buffer at the same time, which enables distributed applications and can reduce the execution time for computation-heavy processes.

(There are many possible examples. We give two here.)

As a first example, suppose that the buffer is used to model a queue of concert ticket purchases. These purchase requests will be coming in from multiple computers, and new requests may come in while others are in the process of being added or removed from the buffer. This could cause race conditions, as we saw above, where some tickets could end up unsold or sold to multiple people. A properly synchronized concurrent implementation would avoid these issues.

As a second example, suppose that the buffer is used to queue up subroutines that need to be carried out in a large scientific simulation (for example, simulating weather dynamics in many small subregions of the atmosphere to build up a forecast). If we were constrained to a single thread, we'd need to fully carry out the simulation on one region (which could take multiple minutes) before starting to process the next region. We could use concurrency to separate these computations onto different threads that pull new jobs to execute in parallel across multiple machine cores.
Exercise 4: Synchronization in Java
Look up each of the following keywords and methods in the Lecture 27 notes. In your own words, give a 1-2 sentence description of what each construct does and why it's useful to achieve proper concurrency.

synchronized:
This allows us to specify that only one thread should execute code within a method or block at a time, preventing race conditions.
wait():
This allows a thread to give up control of a synchronized block to another thread, so the other thread can handle some prerequisite for the first thread.
notifyAll():
This allows a thread to signal to other threads that they may be able to resume their execution of a synchronized block.
Exercise 5: Synchronized Code
Complete the definitions of the enqueue() and dequeue() methods according to their specifications. Use the wait() and notifyAll() methods to achieve proper synchronization.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * Append a value `x` to the buffer. Blocks until value is added.
 */
public synchronized void enqueue(T x) throws InterruptedException {
  // Block until buffer is not full
  while (isFull()) {
    wait();
  }

  // Store value and wake any consumers
  assert store[iTail] == null;
  store[iTail] = x;
  iTail = (iTail + 1) % store.length;
  size += 1;
  notifyAll();
  assert invariantSatisfied();
}

/**
 * Remove and return the oldest value in the buffer. Blocks until value is returned.
 */
public synchronized T dequeue() throws InterruptedException {
  // Block until buffer is non-empty
  while (isEmpty()) {
    this.wait();
  }

  // Extract value to be returned
  T ans = store[iHead];

  // Advance head to remove value and wake any producers
  // (set element to null to preserve invariant and avoid memory leak)
  store[iHead] = null;
  iHead = (iHead + 1) % store.length;
  size -= 1;
  this.notifyAll();
  assert invariantSatisfied();
  return ans;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * Append a value `x` to the buffer. Blocks until value is added.
 */
public synchronized void enqueue(T x) throws InterruptedException {
  // Block until buffer is not full
  while (isFull()) {
    wait();
  }

  // Store value and wake any consumers
  assert store[iTail] == null;
  store[iTail] = x;
  iTail = (iTail + 1) % store.length;
  size += 1;
  notifyAll();
  assert invariantSatisfied();
}

/**
 * Remove and return the oldest value in the buffer. Blocks until value is returned.
 */
public synchronized T dequeue() throws InterruptedException {
  // Block until buffer is non-empty
  while (isEmpty()) {
    this.wait();
  }

  // Extract value to be returned
  T ans = store[iHead];

  // Advance head to remove value and wake any producers
  // (set element to null to preserve invariant and avoid memory leak)
  store[iHead] = null;
  iHead = (iHead + 1) % store.length;
  size -= 1;
  this.notifyAll();
  assert invariantSatisfied();
  return ans;
}