Solution: Using Locks and Condition Variables
class BoundedBuffer {
constructor(capacity) {
this.capacity = capacity;
this.buffer = [];
this.mutex = new Mutex();
this.notFull = new ConditionVariable();
this.notEmpty = new ConditionVariable();
}
async produce(item) {
await this.mutex.lock();
try {
// Wait while buffer is full
while (this.buffer.length >= this.capacity) {
await this.notFull.wait(this.mutex);
}
// Add item
this.buffer.push(item);
console.log(`Produced: ${item}, Buffer: [${this.buffer}]`);
// Signal that buffer is not empty
this.notEmpty.signal();
} finally {
this.mutex.unlock();
}
}
async consume() {
await this.mutex.lock();
try {
// Wait while buffer is empty
while (this.buffer.length === 0) {
await this.notEmpty.wait(this.mutex);
}
// Remove item
const item = this.buffer.shift();
console.log(`Consumed: ${item}, Buffer: [${this.buffer}]`);
// Signal that buffer is not full
this.notFull.signal();
return item;
} finally {
this.mutex.unlock();
}
}
}
// Simplified Mutex and ConditionVariable classes
class Mutex {
constructor() {
this.locked = false;
this.waiting = [];
}
async lock() {
while (this.locked) {
await new Promise(resolve => this.waiting.push(resolve));
}
this.locked = true;
}
unlock() {
this.locked = false;
if (this.waiting.length > 0) {
const resolve = this.waiting.shift();
resolve();
}
}
}
class ConditionVariable {
constructor() {
this.waiting = [];
}
async wait(mutex) {
const promise = new Promise(resolve => this.waiting.push(resolve));
mutex.unlock();
await promise;
await mutex.lock();
}
signal() {
if (this.waiting.length > 0) {
const resolve = this.waiting.shift();
resolve();
}
}
broadcast() {
while (this.waiting.length > 0) {
const resolve = this.waiting.shift();
resolve();
}
}
}
// Test
async function test() {
const buffer = new BoundedBuffer(3);
// Start producers
async function producer(id, count) {
for (let i = 0; i < count; i++) {
await buffer.produce(`P${id}-${i}`);
await sleep(Math.random() * 100);
}
}
// Start consumers
async function consumer(id, count) {
for (let i = 0; i < count; i++) {
await buffer.consume();
await sleep(Math.random() * 150);
}
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Run 2 producers and 2 consumers
await Promise.all([
producer(1, 5),
producer(2, 5),
consumer(1, 5),
consumer(2, 5)
]);
}
test();
Alternative: Using Semaphores
class BoundedBufferWithSemaphores {
constructor(capacity) {
this.capacity = capacity;
this.buffer = [];
this.mutex = new Semaphore(1); // Binary semaphore (mutex)
this.empty = new Semaphore(capacity); // Count empty slots
this.full = new Semaphore(0); // Count full slots
}
async produce(item) {
await this.empty.acquire(); // Wait for empty slot
await this.mutex.acquire(); // Lock buffer
this.buffer.push(item);
console.log(`Produced: ${item}`);
this.mutex.release();
this.full.release(); // Signal item available
}
async consume() {
await this.full.acquire(); // Wait for item
await this.mutex.acquire(); // Lock buffer
const item = this.buffer.shift();
console.log(`Consumed: ${item}`);
this.mutex.release();
this.empty.release(); // Signal slot available
return item;
}
}
class Semaphore {
constructor(count) {
this.count = count;
this.waiting = [];
}
async acquire() {
if (this.count > 0) {
this.count--;
} else {
await new Promise(resolve => this.waiting.push(resolve));
}
}
release() {
if (this.waiting.length > 0) {
const resolve = this.waiting.shift();
resolve();
} else {
this.count++;
}
}
}
Java Implementation
class BoundedBuffer {
private final Queue buffer;
private final int capacity;
private final Object lock = new Object();
public BoundedBuffer(int capacity) {
this.capacity = capacity;
this.buffer = new LinkedList<>();
}
public void produce(T item) throws InterruptedException {
synchronized (lock) {
while (buffer.size() >= capacity) {
lock.wait(); // Wait until not full
}
buffer.add(item);
System.out.println("Produced: " + item);
lock.notifyAll(); // Wake up consumers
}
}
public T consume() throws InterruptedException {
synchronized (lock) {
while (buffer.isEmpty()) {
lock.wait(); // Wait until not empty
}
T item = buffer.remove();
System.out.println("Consumed: " + item);
lock.notifyAll(); // Wake up producers
return item;
}
}
}
Why Use While Instead of If?
Spurious wakeups: Thread can wake up without being signaled.
Multiple consumers: Two consumers wake up, first takes item, second finds empty buffer.
// WRONG - uses if
if (buffer.isEmpty()) {
wait();
}
// After wakeup, might still be empty!
item = buffer.remove(); // CRASH
// CORRECT - uses while
while (buffer.isEmpty()) {
wait();
}
// After wakeup, check again
item = buffer.remove(); // Safe
Common Mistakes
- Using if instead of while: Doesn't handle spurious wakeups
- Forgetting to signal: Threads wait forever
- Wrong signal: Signaling wrong condition variable
- Deadlock: Holding lock while waiting for another thread
- Busy waiting: Checking condition in tight loop instead of waiting
Complexity
Time: O(1) per operation (excluding wait time)
Space: O(capacity) for buffer
Real-World Usage
Thread pools: Tasks are "produced", worker threads "consume"
Message queues: Producers send messages, consumers process
Event systems: Events produced by UI, consumed by handlers
Logging: Multiple threads produce logs, single thread writes to disk
Follow-Up Questions
Q: What if you have multiple types of consumers?
A: Use multiple condition variables, one per type
Q: How to prioritize certain items?
A: Use priority queue instead of regular queue
Q: What about fairness?
A: Use fair locks or FIFO ordering for waiting threads
Related Problems