# Deep Dive (java.util.concurrent) - BlockingQueues

> *"The queue that makes threads wait politely — the backbone of producer-consumer patterns."*

`BlockingQueue` is the interface that powers most concurrent producer-consumer systems on the JVM. This deep dive covers its four major implementations: `ArrayBlockingQueue`, `LinkedBlockingQueue`, `SynchronousQueue`, and `PriorityBlockingQueue`.

* * *

## What is BlockingQueue?

A `BlockingQueue` is a `Queue` that blocks when:

*   **put()** — queue is full → producer thread **waits** until space is available
    
*   **take()** — queue is empty → consumer thread **waits** until an element arrives
    

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/35d9c742-6feb-4b68-9785-f1b3e8c8cac7.png align="center")

* * *

## The BlockingQueue API

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/fbb8c979-7e29-41d0-b50b-8a678f3bf201.png align="center")

|  | Throws Exception | Returns null/false | Blocks | Blocks w/ Timeout |
| --- | --- | --- | --- | --- |
| **Insert** | `add(e)` | `offer(e)` | `put(e)` | `offer(e, t, u)` |
| **Remove** | `remove()` | `poll()` | `take()` | `poll(t, u)` |
| **Examine** | `element()` | `peek()` | — | — |

* * *

## ArrayBlockingQueue — Bounded Array

A **fixed-capacity** blocking queue backed by a circular array (like ArrayDeque, but thread-safe and bounded).

```java
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
    final Object[] items;           // Circular buffer
    int takeIndex;                  // Index for next take/poll
    int putIndex;                   // Index for next put/offer
    int count;                      // Number of elements
    
    final ReentrantLock lock;       // Single lock for all operations
    private final Condition notEmpty; // Signaled when element added
    private final Condition notFull;  // Signaled when element removed
}
```

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/a8c7b49d-e9f7-48f3-8a47-e036fc8fc914.png align="center")

### The put() and take() Dance

```java
public void put(E e) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();          // BLOCK: queue is full, wait for space
        enqueue(e);                   // Add to circular buffer
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();         // BLOCK: queue is empty, wait for element
        return dequeue();             // Remove from circular buffer
    } finally {
        lock.unlock();
    }
}

private void enqueue(E e) {
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;  // Wrap around
    count++;
    notEmpty.signal();  // Wake up ONE waiting consumer!
}

private E dequeue() {
    E e = (E) items[takeIndex];
    items[takeIndex] = null;  // Help GC
    if (++takeIndex == items.length) takeIndex = 0;  // Wrap around
    count--;
    notFull.signal();   // Wake up ONE waiting producer!
    return e;
}
```

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/a6ac6a28-0e55-44a2-afcc-172c9afaa215.png align="center")

### Fair vs Unfair

```java
// Default: unfair (higher throughput)
ArrayBlockingQueue<String> unfair = new ArrayBlockingQueue<>(100);

// Fair: strict FIFO among waiting threads (lower throughput)
ArrayBlockingQueue<String> fair = new ArrayBlockingQueue<>(100, true);
// Uses ReentrantLock(true) — guarantees longest-waiting thread gets lock first
```

* * *

## LinkedBlockingQueue — Optionally Bounded Chain

A **linked-node** blocking queue with separate locks for put and take — allowing higher throughput than ArrayBlockingQueue:

```java
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
    private final int capacity;       // Max size (Integer.MAX_VALUE if unbounded)
    private final AtomicInteger count; // Atomic counter (shared between two locks)
    
    // TWO separate locks — the key difference from ArrayBlockingQueue!
    private final ReentrantLock takeLock;
    private final Condition notEmpty;
    private final ReentrantLock putLock;
    private final Condition notFull;
    
    transient Node<E> head;  // Sentinel node
    private transient Node<E> last;
}
```

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/c6885ae7-e9e7-466b-b04e-81d0cdc6dddd.png align="center")

**Key insight:** Producers lock `putLock` and consumers lock `takeLock`. **They can operate simultaneously!** This gives 2x throughput over ArrayBlockingQueue's single lock when both producers and consumers are active.

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/161a96fd-cd70-4168-bfdd-e86c9a74372f.png align="center")

### Bounded vs Unbounded

```java
// Bounded (recommended)
LinkedBlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);

// Unbounded (dangerous!)
LinkedBlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// capacity = Integer.MAX_VALUE
// put() NEVER blocks — can cause OutOfMemoryError!
```

* * *

## SynchronousQueue — Zero-Capacity Handoff

The most unusual one. `SynchronousQueue` has **zero internal capacity** — every `put()` must wait for a corresponding `take()`, and vice versa.

```java
// No storage at all! Direct handoff between threads.
SynchronousQueue<String> handoff = new SynchronousQueue<>();
```

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/8ebd8347-f7b0-46ca-8dd9-0d2c8a0e7c65.png align="center")

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/a5344df5-97db-4352-b550-64d4741a99e7.png align="center")

**Use cases:**

*   Thread pool handoff (`Executors.newCachedThreadPool()` uses this)
    
*   Rendezvous synchronization
    
*   When you want zero buffering
    

### Fair vs Unfair

```java
// Unfair (default) — uses stack internally (LIFO)
SynchronousQueue<String> unfair = new SynchronousQueue<>();

// Fair — uses queue internally (FIFO)
SynchronousQueue<String> fair = new SynchronousQueue<>(true);
```

* * *

## PriorityBlockingQueue — Concurrent Priority Heap

A thread-safe, unbounded priority queue backed by a binary heap:

```java
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E> {
    
    private transient Object[] queue;    // Binary heap
    private transient int size;
    private final ReentrantLock lock;     // Single lock
    private final Condition notEmpty;
    // No notFull — unbounded, so put() NEVER blocks!
}
```

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/67a449f7-83ed-43a8-acd9-887e8ece8d4c.png align="center")

```java
PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();

// Producer
taskQueue.put(new Task("Low", 3));
taskQueue.put(new Task("Critical", 1));
taskQueue.put(new Task("Medium", 2));

// Consumer — always gets highest priority first
Task task = taskQueue.take(); // blocks if empty
// Returns: Task("Critical", 1) — priority 1 comes first
```

* * *

## ReentrantLock + Condition — The Locking Mechanism

All BlockingQueue implementations use `ReentrantLock` + `Condition` for thread coordination. Here's how these primitives work:

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/e307a349-90a2-488d-b2fb-b818bb622862.png align="center")

**Key concept:** `await()` atomically releases the lock and puts the thread to sleep. `signal()` wakes up one waiting thread, which will re-acquire the lock before continuing.

* * *

## Producer-Consumer Pattern

The classic concurrent pattern using BlockingQueue:

```java
// Shared queue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

// Producer thread
Thread producer = new Thread(() -> {
    try {
        for (int i = 0; i < 1000; i++) {
            String item = "item-" + i;
            queue.put(item);  // Blocks if full
            System.out.println("Produced: " + item);
        }
        queue.put("POISON_PILL"); // Shutdown signal
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

// Consumer thread
Thread consumer = new Thread(() -> {
    try {
        while (true) {
            String item = queue.take();  // Blocks if empty
            if ("POISON_PILL".equals(item)) break;
            System.out.println("Consumed: " + item);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

producer.start();
consumer.start();
```

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/d4bb105d-ae82-4dfa-a330-a18e81e58d82.png align="center")

* * *

## Choosing the Right BlockingQueue

![](https://cdn.hashnode.com/uploads/covers/637f189ed7d9bcd845996b4b/96b94fe4-7b94-4966-93df-14237717b36e.png align="center")

| Feature | ArrayBlockingQueue | LinkedBlockingQueue | SynchronousQueue | PriorityBlockingQueue |
| --- | --- | --- | --- | --- |
| **Bounded** | ✅ Fixed | ✅ Optional | ✅ Zero | ❌ Unbounded |
| **Locks** | 1 | **2** (higher throughput) | Lock-free / 1 | 1 |
| **put() blocks** | When full | When full | Until take() matches | **Never** |
| **Ordering** | FIFO | FIFO | FIFO or LIFO | Priority |
| **Memory** | Fixed array | Dynamic nodes | None | Dynamic array |
| **Best for** | Simple bounded buffers | High-throughput pipes | Thread pool handoff | Priority scheduling |

* * *

## Common Pitfalls

### Pitfall 1: Unbounded Queue = Memory Bomb

```java
// ❌ DANGEROUS: No bounds = can grow until OOM!
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(); // capacity = MAX_INT
// If producer is faster than consumer, this will eat all memory!

// ✅ Always set a reasonable bound
LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(10_000);
```

### Pitfall 2: Not Handling InterruptedException

```java
// ❌ WRONG: Swallowing the interrupt
try {
    String item = queue.take();
} catch (InterruptedException e) {
    // Silently ignoring — Thread.interrupted flag is cleared!
}

// ✅ CORRECT: Restore the interrupt flag
try {
    String item = queue.take();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); // Restore the flag!
    return; // Or handle gracefully
}
```

### Pitfall 3: Using offer() Without Checking Return Value

```java
// ❌ Silently drops elements!
queue.offer(task);  // Returns false if full — but you never check!

// ✅ Check the return value
if (!queue.offer(task)) {
    log.warn("Queue full! Dropping task: " + task);
    // Or: queue.put(task); // to block instead
}

// ✅ Or use offer with timeout
if (!queue.offer(task, 5, TimeUnit.SECONDS)) {
    throw new TimeoutException("Queue full for 5 seconds");
}
```

* * *

## Summary

**The one-liner:** BlockingQueues make threads wait when the queue is full (put) or empty (take), enabling natural back-pressure in producer-consumer systems — choose ArrayBlockingQueue for simplicity, LinkedBlockingQueue for throughput, SynchronousQueue for handoffs, or PriorityBlockingQueue for scheduling.
