Skip to main content

Command Palette

Search for a command to run...

Java Concurrency — Producer-Consumer & Blocking Queues

Updated
8 min read

"The producer-consumer pattern is the foundation of every message queue, event bus, and pipeline you've ever used."

This article implements the bounded blocking queue from scratch, explores the producer-consumer pattern in depth, and builds a simplified SQS-style message queue.


The Producer-Consumer Pattern

Why this pattern?

  • Decouples producers from consumers — they run at different speeds

  • Buffers bursts — the queue absorbs temporary spikes

  • Throttles — bounded queue prevents memory exhaustion (back-pressure)


Bounded Blocking Queue — From Scratch

A bounded blocking queue has these behaviors:

Operation Queue Full Queue Empty
put(item) Block until space Add item
take() Return item Block until item available

Version 1: wait/notify

public class BoundedBlockingQueue<T> {
    private final Queue<T> queue;
    private final int capacity;
    private final Object lock = new Object();

    public BoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<>();
    }

    public void put(T item) throws InterruptedException {
        synchronized (lock) {
            // WAIT if queue is full
            while (queue.size() == capacity) {
                System.out.println(Thread.currentThread().getName()
                    + " blocked — queue full");
                lock.wait();
            }
            queue.add(item);
            System.out.println(Thread.currentThread().getName()
                + " produced: " + item + " [size=" + queue.size() + "]");
            lock.notifyAll();  // Wake consumers waiting for data
        }
    }

    public T take() throws InterruptedException {
        synchronized (lock) {
            // WAIT if queue is empty
            while (queue.isEmpty()) {
                System.out.println(Thread.currentThread().getName()
                    + " blocked — queue empty");
                lock.wait();
            }
            T item = queue.poll();
            System.out.println(Thread.currentThread().getName()
                + " consumed: " + item + " [size=" + queue.size() + "]");
            lock.notifyAll();  // Wake producers waiting for space
            return item;
        }
    }

    public int size() {
        synchronized (lock) {
            return queue.size();
        }
    }
}

The Problem with notifyAll() Here

When a producer calls notifyAll(), it wakes both producers and consumers. Only consumers should wake up (they're the ones waiting for data). This is wasteful.


Version 2: Condition Variables

Two separate conditions solve the wasted-wakeup problem:

public class BoundedBlockingQueueV2<T> {
    private final Queue<T> queue;
    private final int capacity;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();  // Producers wait here
    private final Condition notEmpty = lock.newCondition();  // Consumers wait here

    public BoundedBlockingQueueV2(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<>();
    }

    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await();  // Producer waits for space
            }
            queue.add(item);
            notEmpty.signal();    // Signal ONE consumer: data available
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();  // Consumer waits for data
            }
            T item = queue.poll();
            notFull.signal();      // Signal ONE producer: space available
            return item;
        } finally {
            lock.unlock();
        }
    }
}

Why This is Better

Aspect notifyAll() Condition.signal()
Wakeups All threads (producers + consumers) Only the right role
Re-checks Many unnecessary Minimal
Performance O(number of waiters) O(1)

Java's Built-in BlockingQueue

Java provides several BlockingQueue implementations out of the box:

Implementation Backing Bounded Notes
ArrayBlockingQueue Array ✅ Fixed Fair option, compact
LinkedBlockingQueue Linked nodes ✅ Optional Default: unbounded, higher throughput
SynchronousQueue None ✅ Capacity = 0 Direct handoff — put blocks until take
PriorityBlockingQueue Heap ❌ Unbounded Priority ordering
DelayQueue Heap ❌ Unbounded Elements available after delay
// ArrayBlockingQueue — fixed capacity
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

// Producer
queue.put("task");      // Blocks if full
queue.offer("task");    // Returns false if full (non-blocking)

// Consumer
String item = queue.take();   // Blocks if empty
String item2 = queue.poll();  // Returns null if empty (non-blocking)
String item3 = queue.poll(5, TimeUnit.SECONDS);  // Wait up to 5 seconds

BlockingQueue API Summary

Method Blocks Throws Returns Special Value Times Out
Insert put(e) add(e) offer(e) offer(e, time, unit)
Remove take() remove() poll() poll(time, unit)
Examine element() peek()

Back-Pressure — Why Bounding Matters

An unbounded queue can consume all available memory if producers are faster than consumers:


Problem: SQS-Style Message Queue

Goal: Build a simplified message queue where multiple consumers can poll and process messages concurrently. Each message is consumed by exactly one consumer.

public class SimpleMessageQueue<T> {
    private final BlockingQueue<T> queue;
    private final AtomicBoolean running = new AtomicBoolean(true);

    public SimpleMessageQueue(int capacity) {
        this.queue = new ArrayBlockingQueue<>(capacity);
    }

    // Producer: send a message
    public void send(T message) throws InterruptedException {
        queue.put(message);
        System.out.println("[Producer " + Thread.currentThread().getName()
            + "] Sent: " + message);
    }

    // Consumer: receive a message (blocks until available)
    public T receive() throws InterruptedException {
        T message = queue.take();
        System.out.println("[Consumer " + Thread.currentThread().getName()
            + "] Received: " + message);
        return message;
    }

    // Graceful shutdown
    public void shutdown() {
        running.set(false);
    }

    public boolean isRunning() {
        return running.get();
    }

    // Start consumer workers
    public void startConsumers(int numConsumers, Consumer<T> processor) {
        for (int i = 0; i < numConsumers; i++) {
            Thread consumer = new Thread(() -> {
                while (isRunning() || !queue.isEmpty()) {
                    try {
                        T message = queue.poll(1, TimeUnit.SECONDS);
                        if (message != null) {
                            processor.accept(message);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }, "Consumer-" + i);
            consumer.setDaemon(true);
            consumer.start();
        }
    }
}
// Usage
public static void main(String[] args) throws Exception {
    SimpleMessageQueue<String> mq = new SimpleMessageQueue<>(100);

    // Start 3 consumers
    mq.startConsumers(3, msg -> {
        System.out.println("Processing: " + msg + " by " +
            Thread.currentThread().getName());
    });

    // Produce 10 messages
    for (int i = 0; i < 10; i++) {
        mq.send("Order-" + i);
    }

    Thread.sleep(3000);
    mq.shutdown();
}

Common Pitfalls

Pitfall 1: Forgetting Back-Pressure

// ❌ Unbounded — will eventually OOM under load
Queue<Task> queue = new LinkedList<>();

// ✅ Bounded — producers block when full
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(1000);

Pitfall 2: Using notify() Instead of notifyAll() with Multiple Consumers

// ❌ With N consumers and 1 producer, notify() might wake a producer
lock.notify();

// ✅ Use notifyAll() or separate Conditions
lock.notifyAll();
notEmpty.signal();  // Even better — targeted

Pitfall 3: Not Handling InterruptedException in Consumer Loops

// ❌ Swallows interrupt — consumer never stops
while (running) {
    try {
        T item = queue.take();
        process(item);
    } catch (InterruptedException e) {
        // Silently ignored — BAD
    }
}

// ✅ Proper shutdown
while (running) {
    try {
        T item = queue.poll(1, TimeUnit.SECONDS);
        if (item != null) process(item);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        break;  // Exit loop
    }
}

FAQs

Q1: Implement a bounded blocking queue.

A: Use a ReentrantLock with two Condition variables (notFull, notEmpty). put() waits on notFull when full and signals notEmpty after adding. take() waits on notEmpty when empty and signals notFull after removing. See Version 2 above.

Q2: What is the difference between ArrayBlockingQueue and LinkedBlockingQueue?

A: ArrayBlockingQueue uses a fixed-size array (always bounded), has lower memory overhead, and uses a single lock. LinkedBlockingQueue uses linked nodes (optionally bounded), uses two separate locks (for head and tail), giving higher throughput at the cost of more GC pressure from node allocation.

Q3: What is a SynchronousQueue?

A: A blocking queue with zero capacity. Every put() blocks until a corresponding take() is ready, and vice versa. It's used for direct handoff — the producer waits until a consumer is available. Used internally by Executors.newCachedThreadPool().

Q4: How do you gracefully shut down a producer-consumer system?

A: (1) Signal producers to stop (via AtomicBoolean flag). (2) Let consumers drain remaining items. (3) Use poll() with timeout instead of take() so consumers can check the flag. (4) Alternatively, use a poison pill — a special sentinel value that tells consumers to shut down.

Q5: What is back-pressure? Why does it matter?

A: Back-pressure is when a bounded queue throttles fast producers by blocking them when the queue is full. Without it, an unbounded queue grows indefinitely, leading to OutOfMemoryError. Back-pressure naturally matches producer speed to consumer capacity.