Discussion 13: Synchronization

Download Solution Code download
Exercise 1: Race Conditions
Suppose we have a RingBuffer object buffer containing Integers, and the current state of the array backing store for our buffer looks like this:
You can also assume that, at this point, iHead == 0 and iTail == 3. Then, suppose the following enqueue calls are executed on separate threads:
1
2
new Thread(() -> buffer.enqueue(3)).start();
new Thread(() -> buffer.enqueue(4)).start();
1
2
new Thread(() -> buffer.enqueue(3)).start();
new Thread(() -> buffer.enqueue(4)).start();
Draw the possible states of the buffer (specifically, the backing array and iTail) after these calls are executed.
There are two possible outcomes where invariants are maintained/the program behaves as intended:

and

For both of these, iTail == 0.

There are several pathological cases:



It is possible that only one update gets written, and iTail only gets incremented once (iTail == 4). This happens when both threads read iTail == 3 and perform their updates without seeing the other thread’s updates, leading to the first update being overwritten.

It is also possible for the array states to be the same as above, but iTail gets incremented twice (iTail == 0). This is because in our implementation, the array modification is performed first, and then iTail is incremented, so there exists an interleaving where the second executing thread does not read the new iTail value when adding to the array, but does read it when incrementing iTail.



Exercise 2: Concurrent Ring Buffer
Now look at the ConcurrentRingBuffer.java file in the release code. Your task is to implement thread-safe enqueue() and dequeue() functions (TODOs 1-2). Your implementations should block if there is nothing to be done; that is, if there is no space in the queue to add a new element, or if there is no element in the queue to be consumed.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * Append a value `x` to the buffer. Blocks until value is added.
 */
public synchronized void enqueue(T x) throws InterruptedException {
  // Block until buffer is not full
  while (isFull()) {
    wait();
  }

  // Store value and wake any consumers
  assert store[iTail] == null;
  store[iTail] = x;
  iTail = (iTail + 1) % store.length;
  size += 1;
  notifyAll();
  assertInv();
}

/**
 * Remove and return the oldest value in the buffer. Blocks until value is returned.
 */
public synchronized T dequeue() throws InterruptedException {
  // Block until buffer is non-empty
  while (isEmpty()) {
    this.wait();
  }

  // Extract value to be returned
  T ans = store[iHead];

  // Advance head to remove value and wake any producers
  // (set element to null to preserve invariant and avoid memory leak)
  store[iHead] = null;
  iHead = (iHead + 1) % store.length;
  size -= 1;
  this.notifyAll();
  assertInv();
  return ans;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * Append a value `x` to the buffer. Blocks until value is added.
 */
public synchronized void enqueue(T x) throws InterruptedException {
  // Block until buffer is not full
  while (isFull()) {
    wait();
  }

  // Store value and wake any consumers
  assert store[iTail] == null;
  store[iTail] = x;
  iTail = (iTail + 1) % store.length;
  size += 1;
  notifyAll();
  assertInv();
}

/**
 * Remove and return the oldest value in the buffer. Blocks until value is returned.
 */
public synchronized T dequeue() throws InterruptedException {
  // Block until buffer is non-empty
  while (isEmpty()) {
    this.wait();
  }

  // Extract value to be returned
  T ans = store[iHead];

  // Advance head to remove value and wake any producers
  // (set element to null to preserve invariant and avoid memory leak)
  store[iHead] = null;
  iHead = (iHead + 1) % store.length;
  size -= 1;
  this.notifyAll();
  assertInv();
  return ans;
}