Skip to main content

Command Palette

Search for a command to run...

Deep Dive (java.util.concurrent) - BlockingQueues

Producer-Consumer Primitives

Published
7 min read

"The queue that makes threads wait politely — the backbone of producer-consumer patterns."

BlockingQueue is the interface that powers most concurrent producer-consumer systems on the JVM. This deep dive covers its four major implementations: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, and PriorityBlockingQueue.


What is BlockingQueue?

A BlockingQueue is a Queue that blocks when:

  • put() — queue is full → producer thread waits until space is available

  • take() — queue is empty → consumer thread waits until an element arrives


The BlockingQueue API

Throws Exception Returns null/false Blocks Blocks w/ Timeout
Insert add(e) offer(e) put(e) offer(e, t, u)
Remove remove() poll() take() poll(t, u)
Examine element() peek()

ArrayBlockingQueue — Bounded Array

A fixed-capacity blocking queue backed by a circular array (like ArrayDeque, but thread-safe and bounded).

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
    final Object[] items;           // Circular buffer
    int takeIndex;                  // Index for next take/poll
    int putIndex;                   // Index for next put/offer
    int count;                      // Number of elements
    
    final ReentrantLock lock;       // Single lock for all operations
    private final Condition notEmpty; // Signaled when element added
    private final Condition notFull;  // Signaled when element removed
}

The put() and take() Dance

public void put(E e) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();          // BLOCK: queue is full, wait for space
        enqueue(e);                   // Add to circular buffer
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();         // BLOCK: queue is empty, wait for element
        return dequeue();             // Remove from circular buffer
    } finally {
        lock.unlock();
    }
}

private void enqueue(E e) {
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;  // Wrap around
    count++;
    notEmpty.signal();  // Wake up ONE waiting consumer!
}

private E dequeue() {
    E e = (E) items[takeIndex];
    items[takeIndex] = null;  // Help GC
    if (++takeIndex == items.length) takeIndex = 0;  // Wrap around
    count--;
    notFull.signal();   // Wake up ONE waiting producer!
    return e;
}

Fair vs Unfair

// Default: unfair (higher throughput)
ArrayBlockingQueue<String> unfair = new ArrayBlockingQueue<>(100);

// Fair: strict FIFO among waiting threads (lower throughput)
ArrayBlockingQueue<String> fair = new ArrayBlockingQueue<>(100, true);
// Uses ReentrantLock(true) — guarantees longest-waiting thread gets lock first

LinkedBlockingQueue — Optionally Bounded Chain

A linked-node blocking queue with separate locks for put and take — allowing higher throughput than ArrayBlockingQueue:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
    private final int capacity;       // Max size (Integer.MAX_VALUE if unbounded)
    private final AtomicInteger count; // Atomic counter (shared between two locks)
    
    // TWO separate locks — the key difference from ArrayBlockingQueue!
    private final ReentrantLock takeLock;
    private final Condition notEmpty;
    private final ReentrantLock putLock;
    private final Condition notFull;
    
    transient Node<E> head;  // Sentinel node
    private transient Node<E> last;
}

Key insight: Producers lock putLock and consumers lock takeLock. They can operate simultaneously! This gives 2x throughput over ArrayBlockingQueue's single lock when both producers and consumers are active.

Bounded vs Unbounded

// Bounded (recommended)
LinkedBlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);

// Unbounded (dangerous!)
LinkedBlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// capacity = Integer.MAX_VALUE
// put() NEVER blocks — can cause OutOfMemoryError!

SynchronousQueue — Zero-Capacity Handoff

The most unusual one. SynchronousQueue has zero internal capacity — every put() must wait for a corresponding take(), and vice versa.

// No storage at all! Direct handoff between threads.
SynchronousQueue<String> handoff = new SynchronousQueue<>();

Use cases:

  • Thread pool handoff (Executors.newCachedThreadPool() uses this)

  • Rendezvous synchronization

  • When you want zero buffering

Fair vs Unfair

// Unfair (default) — uses stack internally (LIFO)
SynchronousQueue<String> unfair = new SynchronousQueue<>();

// Fair — uses queue internally (FIFO)
SynchronousQueue<String> fair = new SynchronousQueue<>(true);

PriorityBlockingQueue — Concurrent Priority Heap

A thread-safe, unbounded priority queue backed by a binary heap:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
    private transient Object[] queue;    // Binary heap
    private transient int size;
    private final ReentrantLock lock;     // Single lock
    private final Condition notEmpty;
    // No notFull — unbounded, so put() NEVER blocks!
}
PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();

// Producer
taskQueue.put(new Task("Low", 3));
taskQueue.put(new Task("Critical", 1));
taskQueue.put(new Task("Medium", 2));

// Consumer — always gets highest priority first
Task task = taskQueue.take(); // blocks if empty
// Returns: Task("Critical", 1) — priority 1 comes first

ReentrantLock + Condition — The Locking Mechanism

All BlockingQueue implementations use ReentrantLock + Condition for thread coordination. Here's how these primitives work:

Key concept: await() atomically releases the lock and puts the thread to sleep. signal() wakes up one waiting thread, which will re-acquire the lock before continuing.


Producer-Consumer Pattern

The classic concurrent pattern using BlockingQueue:

// Shared queue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

// Producer thread
Thread producer = new Thread(() -> {
    try {
        for (int i = 0; i < 1000; i++) {
            String item = "item-" + i;
            queue.put(item);  // Blocks if full
            System.out.println("Produced: " + item);
        }
        queue.put("POISON_PILL"); // Shutdown signal
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

// Consumer thread
Thread consumer = new Thread(() -> {
    try {
        while (true) {
            String item = queue.take();  // Blocks if empty
            if ("POISON_PILL".equals(item)) break;
            System.out.println("Consumed: " + item);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

producer.start();
consumer.start();

Choosing the Right BlockingQueue

Feature ArrayBlockingQueue LinkedBlockingQueue SynchronousQueue PriorityBlockingQueue
Bounded ✅ Fixed ✅ Optional ✅ Zero ❌ Unbounded
Locks 1 2 (higher throughput) Lock-free / 1 1
put() blocks When full When full Until take() matches Never
Ordering FIFO FIFO FIFO or LIFO Priority
Memory Fixed array Dynamic nodes None Dynamic array
Best for Simple bounded buffers High-throughput pipes Thread pool handoff Priority scheduling

Common Pitfalls

Pitfall 1: Unbounded Queue = Memory Bomb

// ❌ DANGEROUS: No bounds = can grow until OOM!
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(); // capacity = MAX_INT
// If producer is faster than consumer, this will eat all memory!

// ✅ Always set a reasonable bound
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(10_000);

Pitfall 2: Not Handling InterruptedException

// ❌ WRONG: Swallowing the interrupt
try {
    String item = queue.take();
} catch (InterruptedException e) {
    // Silently ignoring — Thread.interrupted flag is cleared!
}

// ✅ CORRECT: Restore the interrupt flag
try {
    String item = queue.take();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // Restore the flag!
    return; // Or handle gracefully
}

Pitfall 3: Using offer() Without Checking Return Value

// ❌ Silently drops elements!
queue.offer(task);  // Returns false if full — but you never check!

// ✅ Check the return value
if (!queue.offer(task)) {
    log.warn("Queue full! Dropping task: " + task);
    // Or: queue.put(task); // to block instead
}

// ✅ Or use offer with timeout
if (!queue.offer(task, 5, TimeUnit.SECONDS)) {
    throw new TimeoutException("Queue full for 5 seconds");
}

Summary

The one-liner: BlockingQueues make threads wait when the queue is full (put) or empty (take), enabling natural back-pressure in producer-consumer systems — choose ArrayBlockingQueue for simplicity, LinkedBlockingQueue for throughput, SynchronousQueue for handoffs, or PriorityBlockingQueue for scheduling.