Java Concurrency — Producer-Consumer & Blocking Queues
"The producer-consumer pattern is the foundation of every message queue, event bus, and pipeline you've ever used."
This article implements the bounded blocking queue from scratch, explores the producer-consumer pattern in depth, and builds a simplified SQS-style message queue.
The Producer-Consumer Pattern
Why this pattern?
Decouples producers from consumers — they run at different speeds
Buffers bursts — the queue absorbs temporary spikes
Throttles — bounded queue prevents memory exhaustion (back-pressure)
Bounded Blocking Queue — From Scratch
A bounded blocking queue has these behaviors:
| Operation | Queue Full | Queue Empty |
|---|---|---|
put(item) |
Block until space | Add item |
take() |
Return item | Block until item available |
Version 1: wait/notify
public class BoundedBlockingQueue<T> {
private final Queue<T> queue;
private final int capacity;
private final Object lock = new Object();
public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
}
public void put(T item) throws InterruptedException {
synchronized (lock) {
// WAIT if queue is full
while (queue.size() == capacity) {
System.out.println(Thread.currentThread().getName()
+ " blocked — queue full");
lock.wait();
}
queue.add(item);
System.out.println(Thread.currentThread().getName()
+ " produced: " + item + " [size=" + queue.size() + "]");
lock.notifyAll(); // Wake consumers waiting for data
}
}
public T take() throws InterruptedException {
synchronized (lock) {
// WAIT if queue is empty
while (queue.isEmpty()) {
System.out.println(Thread.currentThread().getName()
+ " blocked — queue empty");
lock.wait();
}
T item = queue.poll();
System.out.println(Thread.currentThread().getName()
+ " consumed: " + item + " [size=" + queue.size() + "]");
lock.notifyAll(); // Wake producers waiting for space
return item;
}
}
public int size() {
synchronized (lock) {
return queue.size();
}
}
}
The Problem with notifyAll() Here
When a producer calls notifyAll(), it wakes both producers and consumers. Only consumers should wake up (they're the ones waiting for data). This is wasteful.
Version 2: Condition Variables
Two separate conditions solve the wasted-wakeup problem:
public class BoundedBlockingQueueV2<T> {
private final Queue<T> queue;
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // Producers wait here
private final Condition notEmpty = lock.newCondition(); // Consumers wait here
public BoundedBlockingQueueV2(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // Producer waits for space
}
queue.add(item);
notEmpty.signal(); // Signal ONE consumer: data available
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // Consumer waits for data
}
T item = queue.poll();
notFull.signal(); // Signal ONE producer: space available
return item;
} finally {
lock.unlock();
}
}
}
Why This is Better
| Aspect | notifyAll() |
Condition.signal() |
|---|---|---|
| Wakeups | All threads (producers + consumers) | Only the right role |
| Re-checks | Many unnecessary | Minimal |
| Performance | O(number of waiters) | O(1) |
Java's Built-in BlockingQueue
Java provides several BlockingQueue implementations out of the box:
| Implementation | Backing | Bounded | Notes |
|---|---|---|---|
ArrayBlockingQueue |
Array | ✅ Fixed | Fair option, compact |
LinkedBlockingQueue |
Linked nodes | ✅ Optional | Default: unbounded, higher throughput |
SynchronousQueue |
None | ✅ Capacity = 0 | Direct handoff — put blocks until take |
PriorityBlockingQueue |
Heap | ❌ Unbounded | Priority ordering |
DelayQueue |
Heap | ❌ Unbounded | Elements available after delay |
// ArrayBlockingQueue — fixed capacity
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
// Producer
queue.put("task"); // Blocks if full
queue.offer("task"); // Returns false if full (non-blocking)
// Consumer
String item = queue.take(); // Blocks if empty
String item2 = queue.poll(); // Returns null if empty (non-blocking)
String item3 = queue.poll(5, TimeUnit.SECONDS); // Wait up to 5 seconds
BlockingQueue API Summary
| Method | Blocks | Throws | Returns Special Value | Times Out |
|---|---|---|---|---|
| Insert | put(e) |
add(e) |
offer(e) |
offer(e, time, unit) |
| Remove | take() |
remove() |
poll() |
poll(time, unit) |
| Examine | — | element() |
peek() |
— |
Back-Pressure — Why Bounding Matters
An unbounded queue can consume all available memory if producers are faster than consumers:
Problem: SQS-Style Message Queue
Goal: Build a simplified message queue where multiple consumers can poll and process messages concurrently. Each message is consumed by exactly one consumer.
public class SimpleMessageQueue<T> {
private final BlockingQueue<T> queue;
private final AtomicBoolean running = new AtomicBoolean(true);
public SimpleMessageQueue(int capacity) {
this.queue = new ArrayBlockingQueue<>(capacity);
}
// Producer: send a message
public void send(T message) throws InterruptedException {
queue.put(message);
System.out.println("[Producer " + Thread.currentThread().getName()
+ "] Sent: " + message);
}
// Consumer: receive a message (blocks until available)
public T receive() throws InterruptedException {
T message = queue.take();
System.out.println("[Consumer " + Thread.currentThread().getName()
+ "] Received: " + message);
return message;
}
// Graceful shutdown
public void shutdown() {
running.set(false);
}
public boolean isRunning() {
return running.get();
}
// Start consumer workers
public void startConsumers(int numConsumers, Consumer<T> processor) {
for (int i = 0; i < numConsumers; i++) {
Thread consumer = new Thread(() -> {
while (isRunning() || !queue.isEmpty()) {
try {
T message = queue.poll(1, TimeUnit.SECONDS);
if (message != null) {
processor.accept(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Consumer-" + i);
consumer.setDaemon(true);
consumer.start();
}
}
}
// Usage
public static void main(String[] args) throws Exception {
SimpleMessageQueue<String> mq = new SimpleMessageQueue<>(100);
// Start 3 consumers
mq.startConsumers(3, msg -> {
System.out.println("Processing: " + msg + " by " +
Thread.currentThread().getName());
});
// Produce 10 messages
for (int i = 0; i < 10; i++) {
mq.send("Order-" + i);
}
Thread.sleep(3000);
mq.shutdown();
}
Common Pitfalls
Pitfall 1: Forgetting Back-Pressure
// ❌ Unbounded — will eventually OOM under load
Queue<Task> queue = new LinkedList<>();
// ✅ Bounded — producers block when full
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(1000);
Pitfall 2: Using notify() Instead of notifyAll() with Multiple Consumers
// ❌ With N consumers and 1 producer, notify() might wake a producer
lock.notify();
// ✅ Use notifyAll() or separate Conditions
lock.notifyAll();
notEmpty.signal(); // Even better — targeted
Pitfall 3: Not Handling InterruptedException in Consumer Loops
// ❌ Swallows interrupt — consumer never stops
while (running) {
try {
T item = queue.take();
process(item);
} catch (InterruptedException e) {
// Silently ignored — BAD
}
}
// ✅ Proper shutdown
while (running) {
try {
T item = queue.poll(1, TimeUnit.SECONDS);
if (item != null) process(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break; // Exit loop
}
}
FAQs
Q1: Implement a bounded blocking queue.
A: Use a ReentrantLock with two Condition variables (notFull, notEmpty). put() waits on notFull when full and signals notEmpty after adding. take() waits on notEmpty when empty and signals notFull after removing. See Version 2 above.
Q2: What is the difference between ArrayBlockingQueue and LinkedBlockingQueue?
A: ArrayBlockingQueue uses a fixed-size array (always bounded), has lower memory overhead, and uses a single lock. LinkedBlockingQueue uses linked nodes (optionally bounded), uses two separate locks (for head and tail), giving higher throughput at the cost of more GC pressure from node allocation.
Q3: What is a SynchronousQueue?
A: A blocking queue with zero capacity. Every put() blocks until a corresponding take() is ready, and vice versa. It's used for direct handoff — the producer waits until a consumer is available. Used internally by Executors.newCachedThreadPool().
Q4: How do you gracefully shut down a producer-consumer system?
A: (1) Signal producers to stop (via AtomicBoolean flag). (2) Let consumers drain remaining items. (3) Use poll() with timeout instead of take() so consumers can check the flag. (4) Alternatively, use a poison pill — a special sentinel value that tells consumers to shut down.
Q5: What is back-pressure? Why does it matter?
A: Back-pressure is when a bounded queue throttles fast producers by blocking them when the queue is full. Without it, an unbounded queue grows indefinitely, leading to OutOfMemoryError. Back-pressure naturally matches producer speed to consumer capacity.