Deep Dive (java.util.concurrent) - BlockingQueues
Producer-Consumer Primitives
"The queue that makes threads wait politely — the backbone of producer-consumer patterns."
BlockingQueue is the interface that powers most concurrent producer-consumer systems on the JVM. This deep dive covers its four major implementations: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, and PriorityBlockingQueue.
What is BlockingQueue?
A BlockingQueue is a Queue that blocks when:
put() — queue is full → producer thread waits until space is available
take() — queue is empty → consumer thread waits until an element arrives
The BlockingQueue API
| Throws Exception | Returns null/false | Blocks | Blocks w/ Timeout | |
|---|---|---|---|---|
| Insert | add(e) |
offer(e) |
put(e) |
offer(e, t, u) |
| Remove | remove() |
poll() |
take() |
poll(t, u) |
| Examine | element() |
peek() |
— | — |
ArrayBlockingQueue — Bounded Array
A fixed-capacity blocking queue backed by a circular array (like ArrayDeque, but thread-safe and bounded).
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E> {
final Object[] items; // Circular buffer
int takeIndex; // Index for next take/poll
int putIndex; // Index for next put/offer
int count; // Number of elements
final ReentrantLock lock; // Single lock for all operations
private final Condition notEmpty; // Signaled when element added
private final Condition notFull; // Signaled when element removed
}
The put() and take() Dance
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // BLOCK: queue is full, wait for space
enqueue(e); // Add to circular buffer
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // BLOCK: queue is empty, wait for element
return dequeue(); // Remove from circular buffer
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0; // Wrap around
count++;
notEmpty.signal(); // Wake up ONE waiting consumer!
}
private E dequeue() {
E e = (E) items[takeIndex];
items[takeIndex] = null; // Help GC
if (++takeIndex == items.length) takeIndex = 0; // Wrap around
count--;
notFull.signal(); // Wake up ONE waiting producer!
return e;
}
Fair vs Unfair
// Default: unfair (higher throughput)
ArrayBlockingQueue<String> unfair = new ArrayBlockingQueue<>(100);
// Fair: strict FIFO among waiting threads (lower throughput)
ArrayBlockingQueue<String> fair = new ArrayBlockingQueue<>(100, true);
// Uses ReentrantLock(true) — guarantees longest-waiting thread gets lock first
LinkedBlockingQueue — Optionally Bounded Chain
A linked-node blocking queue with separate locks for put and take — allowing higher throughput than ArrayBlockingQueue:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final int capacity; // Max size (Integer.MAX_VALUE if unbounded)
private final AtomicInteger count; // Atomic counter (shared between two locks)
// TWO separate locks — the key difference from ArrayBlockingQueue!
private final ReentrantLock takeLock;
private final Condition notEmpty;
private final ReentrantLock putLock;
private final Condition notFull;
transient Node<E> head; // Sentinel node
private transient Node<E> last;
}
Key insight: Producers lock putLock and consumers lock takeLock. They can operate simultaneously! This gives 2x throughput over ArrayBlockingQueue's single lock when both producers and consumers are active.
Bounded vs Unbounded
// Bounded (recommended)
LinkedBlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);
// Unbounded (dangerous!)
LinkedBlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// capacity = Integer.MAX_VALUE
// put() NEVER blocks — can cause OutOfMemoryError!
SynchronousQueue — Zero-Capacity Handoff
The most unusual one. SynchronousQueue has zero internal capacity — every put() must wait for a corresponding take(), and vice versa.
// No storage at all! Direct handoff between threads.
SynchronousQueue<String> handoff = new SynchronousQueue<>();
Use cases:
Thread pool handoff (
Executors.newCachedThreadPool()uses this)Rendezvous synchronization
When you want zero buffering
Fair vs Unfair
// Unfair (default) — uses stack internally (LIFO)
SynchronousQueue<String> unfair = new SynchronousQueue<>();
// Fair — uses queue internally (FIFO)
SynchronousQueue<String> fair = new SynchronousQueue<>(true);
PriorityBlockingQueue — Concurrent Priority Heap
A thread-safe, unbounded priority queue backed by a binary heap:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E> {
private transient Object[] queue; // Binary heap
private transient int size;
private final ReentrantLock lock; // Single lock
private final Condition notEmpty;
// No notFull — unbounded, so put() NEVER blocks!
}
PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();
// Producer
taskQueue.put(new Task("Low", 3));
taskQueue.put(new Task("Critical", 1));
taskQueue.put(new Task("Medium", 2));
// Consumer — always gets highest priority first
Task task = taskQueue.take(); // blocks if empty
// Returns: Task("Critical", 1) — priority 1 comes first
ReentrantLock + Condition — The Locking Mechanism
All BlockingQueue implementations use ReentrantLock + Condition for thread coordination. Here's how these primitives work:
Key concept: await() atomically releases the lock and puts the thread to sleep. signal() wakes up one waiting thread, which will re-acquire the lock before continuing.
Producer-Consumer Pattern
The classic concurrent pattern using BlockingQueue:
// Shared queue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 1000; i++) {
String item = "item-" + i;
queue.put(item); // Blocks if full
System.out.println("Produced: " + item);
}
queue.put("POISON_PILL"); // Shutdown signal
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
while (true) {
String item = queue.take(); // Blocks if empty
if ("POISON_PILL".equals(item)) break;
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
Choosing the Right BlockingQueue
| Feature | ArrayBlockingQueue | LinkedBlockingQueue | SynchronousQueue | PriorityBlockingQueue |
|---|---|---|---|---|
| Bounded | ✅ Fixed | ✅ Optional | ✅ Zero | ❌ Unbounded |
| Locks | 1 | 2 (higher throughput) | Lock-free / 1 | 1 |
| put() blocks | When full | When full | Until take() matches | Never |
| Ordering | FIFO | FIFO | FIFO or LIFO | Priority |
| Memory | Fixed array | Dynamic nodes | None | Dynamic array |
| Best for | Simple bounded buffers | High-throughput pipes | Thread pool handoff | Priority scheduling |
Common Pitfalls
Pitfall 1: Unbounded Queue = Memory Bomb
// ❌ DANGEROUS: No bounds = can grow until OOM!
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(); // capacity = MAX_INT
// If producer is faster than consumer, this will eat all memory!
// ✅ Always set a reasonable bound
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(10_000);
Pitfall 2: Not Handling InterruptedException
// ❌ WRONG: Swallowing the interrupt
try {
String item = queue.take();
} catch (InterruptedException e) {
// Silently ignoring — Thread.interrupted flag is cleared!
}
// ✅ CORRECT: Restore the interrupt flag
try {
String item = queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the flag!
return; // Or handle gracefully
}
Pitfall 3: Using offer() Without Checking Return Value
// ❌ Silently drops elements!
queue.offer(task); // Returns false if full — but you never check!
// ✅ Check the return value
if (!queue.offer(task)) {
log.warn("Queue full! Dropping task: " + task);
// Or: queue.put(task); // to block instead
}
// ✅ Or use offer with timeout
if (!queue.offer(task, 5, TimeUnit.SECONDS)) {
throw new TimeoutException("Queue full for 5 seconds");
}
Summary
The one-liner: BlockingQueues make threads wait when the queue is full (put) or empty (take), enabling natural back-pressure in producer-consumer systems — choose ArrayBlockingQueue for simplicity, LinkedBlockingQueue for throughput, SynchronousQueue for handoffs, or PriorityBlockingQueue for scheduling.