JVM Low-Level I/O - Part 5
Ring Buffers & Memory Bus Patterns
Ring buffers are the backbone of high-performance messaging. They power financial trading systems, game engines, logging frameworks, and inter-process communication. In this article, we'll build one from the ground up using ByteBuffer, understand why they're so fast, and explore the memory bus patterns that make them work.
1. What is a Ring Buffer?
A ring buffer (also called circular buffer) is a fixed-size data structure that wraps around. When you reach the end, the next write goes back to the beginning — like a clock hand going from 12 back to 1.
Key Properties
| Property | Description |
|---|---|
| Fixed size | No allocations during operation |
| FIFO ordering | First in, first out |
| Wrap-around | Modular arithmetic: index % capacity |
| Lock-free capable | Single producer / single consumer needs no locks |
| Pre-allocated | Memory is allocated once at startup |
Array vs Ring Buffer
2. Why Ring Buffers are Fast
Ring buffers exploit three principles of modern CPU architecture:
Principle 1: Cache Locality
Ring buffers access memory sequentially (slot 0, slot 1, slot 2, ...). CPUs love sequential access because the hardware prefetcher predicts that you'll want the next cache line and loads it before you ask.
Principle 2: No Allocation
// ❌ LinkedBlockingQueue: allocates a Node object for every message
queue.add(new Message(data)); // new Node<>(message) internally
// ✅ Ring buffer: slot is pre-allocated, just copy data into it
buffer.putLong(sequence * SLOT_SIZE, data); // zero allocation
Principle 3: Mechanical Sympathy
The term "mechanical sympathy" means writing software that works with the hardware, not against it. Ring buffers achieve this by:
3. Building a Simple Ring Buffer
Let's start with a basic single-threaded ring buffer using ByteBuffer:
import java.nio.ByteBuffer;
/**
* A simple ring buffer backed by a ByteBuffer.
* Fixed-size entries of entrySize bytes each.
*/
public class SimpleRingBuffer {
private final ByteBuffer buffer;
private final int capacity; // number of slots
private final int entrySize; // bytes per slot
private long writePos = 0; // monotonically increasing
private long readPos = 0; // monotonically increasing
public SimpleRingBuffer(int capacity, int entrySize) {
// Capacity must be a power of 2 for fast modulo
if (Integer.bitCount(capacity) != 1) {
throw new IllegalArgumentException("Capacity must be a power of 2");
}
this.capacity = capacity;
this.entrySize = entrySize;
this.buffer = ByteBuffer.allocateDirect(capacity * entrySize);
}
/**
* Write an entry. Returns false if buffer is full.
*/
public boolean write(byte[] data) {
if (data.length != entrySize) {
throw new IllegalArgumentException("Data must be " + entrySize + " bytes");
}
// Check if full
if (writePos - readPos >= capacity) {
return false; // buffer full
}
// Calculate physical index using bitwise AND (fast modulo for power of 2)
int index = (int)(writePos & (capacity - 1));
int offset = index * entrySize;
// Write data
for (int i = 0; i < entrySize; i++) {
buffer.put(offset + i, data[i]);
}
writePos++;
return true;
}
/**
* Read an entry. Returns null if buffer is empty.
*/
public byte[] read() {
// Check if empty
if (readPos >= writePos) {
return null; // buffer empty
}
// Calculate physical index
int index = (int)(readPos & (capacity - 1));
int offset = index * entrySize;
// Read data
byte[] data = new byte[entrySize];
for (int i = 0; i < entrySize; i++) {
data[i] = buffer.get(offset + i);
}
readPos++;
return data;
}
public int size() {
return (int)(writePos - readPos);
}
public boolean isEmpty() { return readPos >= writePos; }
public boolean isFull() { return writePos - readPos >= capacity; }
}
Why Power of 2?
// Modulo with arbitrary number: SLOW (division hardware)
int index = (int)(sequence % capacity); // ❌ uses CPU division
// Modulo with power of 2: FAST (single bitwise AND)
int index = (int)(sequence & (capacity - 1)); // ✅ ~1 CPU cycle
// Example: capacity = 8 (binary: 1000), mask = 7 (binary: 0111)
// sequence=0: 0000 & 0111 = 0
// sequence=1: 0001 & 0111 = 1
// ...
// sequence=7: 0111 & 0111 = 7
// sequence=8: 1000 & 0111 = 0 ← wraps!
// sequence=9: 1001 & 0111 = 1
4. The Memory Bus and Cache Lines
To understand why ring buffers must be carefully designed, we need to understand the CPU's memory system.
Cache Line Architecture
Cache line = The smallest unit of data transferred between cache levels. On modern x86/ARM processors, it's 64 bytes.
Cache Line (64 bytes):
┌──────────────────────────────────────────────────────────────────┐
│ byte 0 │ byte 1 │ byte 2 │ ... │ byte 62 │ byte 63 │
└──────────────────────────────────────────────────────────────────┘
If you read byte 0, the CPU loads ALL 64 bytes into cache.
If you write byte 0, the ENTIRE 64-byte line is marked dirty.
MESI Protocol (Cache Coherence)
When multiple cores share data, the CPU uses the MESI protocol to keep caches consistent:
Key insight: When Core 0 writes to a cache line that Core 1 also has cached, Core 1's copy is invalidated. Core 1 must reload the line from a higher cache level or main memory on its next access. This costs ~30-100ns each time!
5. False Sharing — The Silent Killer
False sharing occurs when two independent variables happen to share the same cache line. Writes to one variable invalidate the other core's cache line, even though the other core is accessing a completely different variable.
The Fix: Cache Line Padding
// ❌ BAD: writePos and readPos share a cache line
public class BadRingBuffer {
private volatile long writePos; // 8 bytes
private volatile long readPos; // 8 bytes — same cache line!
}
// ✅ GOOD: Pad each variable to its own cache line
public class GoodRingBuffer {
// --- writePos occupies its own cache line ---
private long p1, p2, p3, p4, p5, p6, p7; // 56 bytes padding
private volatile long writePos; // 8 bytes
private long p8, p9, p10, p11, p12, p13, p14; // 56 bytes padding
// --- readPos occupies its own cache line ---
private volatile long readPos; // 8 bytes
private long p15, p16, p17, p18, p19, p20, p21; // 56 bytes padding
}
// ✅ BEST (Java 8+): Use @Contended annotation
// Requires JVM flag: -XX:-RestrictContended
import jdk.internal.vm.annotation.Contended;
public class BestRingBuffer {
@Contended
private volatile long writePos;
@Contended
private volatile long readPos;
}
Measuring False Sharing Impact
public class FalseSharingBenchmark {
// Without padding — suffer false sharing
static class SharedCounters {
volatile long counter1;
volatile long counter2;
}
// With padding — each counter on its own cache line
static class PaddedCounters {
volatile long counter1;
long p1, p2, p3, p4, p5, p6, p7; // 56 bytes padding
volatile long counter2;
}
public static void main(String[] args) throws InterruptedException {
// Test without padding
SharedCounters shared = new SharedCounters();
long sharedTime = benchmark(
() -> { for (int i = 0; i < 100_000_000; i++) shared.counter1++; },
() -> { for (int i = 0; i < 100_000_000; i++) shared.counter2++; }
);
// Test with padding
PaddedCounters padded = new PaddedCounters();
long paddedTime = benchmark(
() -> { for (int i = 0; i < 100_000_000; i++) padded.counter1++; },
() -> { for (int i = 0; i < 100_000_000; i++) padded.counter2++; }
);
System.out.printf("Without padding: %,d ms%n", sharedTime);
System.out.printf("With padding: %,d ms%n", paddedTime);
System.out.printf("Speedup: %.1fx%n",
(double) sharedTime / paddedTime);
}
static long benchmark(Runnable task1, Runnable task2)
throws InterruptedException {
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
long start = System.nanoTime();
t1.start();
t2.start();
t1.join();
t2.join();
return (System.nanoTime() - start) / 1_000_000;
}
}
Typical results:
Without padding: 2,100 ms
With padding: 580 ms
Speedup: 3.6x
6. Lock-Free Ring Buffer with ByteBuffer
Now let's build a proper lock-free, cache-line-padded ring buffer for single-producer/single-consumer (SPSC) use:
import java.nio.ByteBuffer;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
/**
* High-performance SPSC (Single Producer, Single Consumer) ring buffer.
*
* Features:
* - Lock-free: CAS-free, uses memory ordering only
* - Cache-line padded: no false sharing
* - ByteBuffer backed: direct memory, fixed-size entries
* - Power-of-2 capacity: fast index calculation
*/
public class SPSCRingBuffer {
private static final int CACHE_LINE = 64;
// --- Producer fields (own cache line) ---
private volatile long producerSequence;
@SuppressWarnings("unused")
private long p1, p2, p3, p4, p5, p6, p7; // padding
// --- Consumer fields (own cache line) ---
private volatile long consumerSequence;
@SuppressWarnings("unused")
private long c1, c2, c3, c4, c5, c6, c7; // padding
// --- Immutable fields (read-only after construction) ---
private final ByteBuffer buffer;
private final int capacity;
private final int mask;
private final int entrySize;
// VarHandles for ordered memory access
private static final VarHandle PRODUCER_SEQ;
private static final VarHandle CONSUMER_SEQ;
static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
PRODUCER_SEQ = lookup.findVarHandle(
SPSCRingBuffer.class, "producerSequence", long.class);
CONSUMER_SEQ = lookup.findVarHandle(
SPSCRingBuffer.class, "consumerSequence", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
/**
* @param capacity Number of entries (must be power of 2)
* @param entrySize Bytes per entry
*/
public SPSCRingBuffer(int capacity, int entrySize) {
if (Integer.bitCount(capacity) != 1) {
throw new IllegalArgumentException("Capacity must be power of 2");
}
this.capacity = capacity;
this.mask = capacity - 1;
this.entrySize = entrySize;
this.buffer = ByteBuffer.allocateDirect(capacity * entrySize);
this.producerSequence = 0;
this.consumerSequence = 0;
}
// ==== PRODUCER API ====
/**
* Claim a slot for writing. Returns the ByteBuffer positioned at the slot,
* or null if the buffer is full.
* MUST be followed by publish() after writing.
*/
public int claim() {
long current = producerSequence;
long consumerPos = (long) CONSUMER_SEQ.getAcquire(this);
// Check if full
if (current - consumerPos >= capacity) {
return -1; // FULL
}
int index = (int)(current & mask);
return index * entrySize;
}
/**
* Publish the claimed slot, making it visible to the consumer.
*/
public void publish() {
PRODUCER_SEQ.setRelease(this, producerSequence + 1);
}
/**
* Write data and publish in one call.
*/
public boolean offer(byte[] data) {
int offset = claim();
if (offset < 0) return false;
for (int i = 0; i < Math.min(data.length, entrySize); i++) {
buffer.put(offset + i, data[i]);
}
publish();
return true;
}
/**
* Write a long value and publish.
*/
public boolean offerLong(long value) {
int offset = claim();
if (offset < 0) return false;
buffer.putLong(offset, value);
publish();
return true;
}
// ==== CONSUMER API ====
/**
* Poll for a new entry. Returns the offset to read from, or -1 if empty.
* MUST be followed by consume() after reading.
*/
public int poll() {
long current = consumerSequence;
long producerPos = (long) PRODUCER_SEQ.getAcquire(this);
if (current >= producerPos) {
return -1; // EMPTY
}
int index = (int)(current & mask);
return index * entrySize;
}
/**
* Consume the polled entry, advancing the consumer position.
*/
public void consume() {
CONSUMER_SEQ.setRelease(this, consumerSequence + 1);
}
/**
* Read data and consume in one call.
*/
public byte[] take() {
int offset = poll();
if (offset < 0) return null;
byte[] data = new byte[entrySize];
for (int i = 0; i < entrySize; i++) {
data[i] = buffer.get(offset + i);
}
consume();
return data;
}
/**
* Read a long value and consume.
*/
public long takeLong() {
int offset = poll();
if (offset < 0) return Long.MIN_VALUE; // sentinel
long value = buffer.getLong(offset);
consume();
return value;
}
// ==== QUERIES ====
/** Get the ByteBuffer for direct access. */
public ByteBuffer buffer() { return buffer; }
public int size() {
return (int)(producerSequence - consumerSequence);
}
public boolean isEmpty() { return size() == 0; }
public boolean isFull() { return size() >= capacity; }
public int capacity() { return capacity; }
}
Usage Example
public class RingBufferDemo {
public static void main(String[] args) throws Exception {
// 1M slots, 8 bytes each = 8MB buffer
SPSCRingBuffer ring = new SPSCRingBuffer(1 << 20, 8);
final long MESSAGE_COUNT = 100_000_000L;
// Consumer thread
Thread consumer = new Thread(() -> {
long count = 0;
long sum = 0;
while (count < MESSAGE_COUNT) {
long value = ring.takeLong();
if (value != Long.MIN_VALUE) {
sum += value;
count++;
} else {
Thread.onSpinWait();
}
}
System.out.printf("Consumer done. Sum: %,d%n", sum);
});
consumer.start();
// Producer (main thread)
long start = System.nanoTime();
for (long i = 0; i < MESSAGE_COUNT; i++) {
while (!ring.offerLong(i)) {
Thread.onSpinWait(); // back-pressure
}
}
consumer.join();
long elapsed = System.nanoTime() - start;
System.out.printf("Throughput: %,.0f messages/sec%n",
MESSAGE_COUNT / (elapsed / 1_000_000_000.0));
System.out.printf("Latency: %.0f ns/message%n",
(double) elapsed / MESSAGE_COUNT);
}
}
Typical output:
Consumer done. Sum: 4,999,999,950,000,000
Throughput: 150,000,000 messages/sec
Latency: 7 ns/message
7. Ring Buffer over Shared Memory (IPC)
Now let's combine the ring buffer with memory-mapped files for IPC:
Shared Memory Ring Buffer
import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*;
/**
* Ring buffer backed by a memory-mapped file for IPC.
*
* Memory layout:
* ┌─────────────────────────────────────────────────────────┐
* │ Offset 0: Magic number (4 bytes) │
* │ Offset 4: Version (4 bytes) │
* │ Offset 8: Capacity (4 bytes) │
* │ Offset 12: Entry size (4 bytes) │
* │ Offset 16: Reserved (48 bytes) │
* │ Offset 64: Producer sequence (8 bytes) [cache line] │
* │ Offset 128: Consumer sequence (8 bytes) [cache line] │
* │ Offset 192: Ring buffer data starts │
* └─────────────────────────────────────────────────────────┘
*/
public class SharedMemoryRingBuffer implements AutoCloseable {
private static final int MAGIC = 0xR1NGBUF;
private static final int VERSION = 1;
// Header offsets (cache-line aligned)
private static final int OFF_MAGIC = 0;
private static final int OFF_VERSION = 4;
private static final int OFF_CAPACITY = 8;
private static final int OFF_ENTRY_SIZE = 12;
private static final int OFF_PRODUCER_SEQ = 64; // own cache line
private static final int OFF_CONSUMER_SEQ = 128; // own cache line
private static final int DATA_OFFSET = 192; // data starts here
private final MappedByteBuffer shm;
private final FileChannel channel;
private final int capacity;
private final int mask;
private final int entrySize;
/**
* Create (producer) or open (consumer) a shared ring buffer.
*/
public SharedMemoryRingBuffer(Path path, int capacity, int entrySize,
boolean create) throws Exception {
if (Integer.bitCount(capacity) != 1) {
throw new IllegalArgumentException("Capacity must be power of 2");
}
this.capacity = capacity;
this.mask = capacity - 1;
this.entrySize = entrySize;
int totalSize = DATA_OFFSET + capacity * entrySize;
if (create) {
this.channel = FileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
// Pre-allocate file
channel.write(ByteBuffer.allocate(totalSize), 0);
this.shm = channel.map(FileChannel.MapMode.READ_WRITE, 0, totalSize);
// Write header
shm.putInt(OFF_MAGIC, MAGIC);
shm.putInt(OFF_VERSION, VERSION);
shm.putInt(OFF_CAPACITY, capacity);
shm.putInt(OFF_ENTRY_SIZE, entrySize);
shm.putLong(OFF_PRODUCER_SEQ, 0);
shm.putLong(OFF_CONSUMER_SEQ, 0);
shm.force();
} else {
this.channel = FileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE);
this.shm = channel.map(FileChannel.MapMode.READ_WRITE, 0, totalSize);
// Validate header
if (shm.getInt(OFF_MAGIC) != MAGIC) {
throw new IllegalStateException("Not a valid ring buffer file");
}
if (shm.getInt(OFF_CAPACITY) != capacity ||
shm.getInt(OFF_ENTRY_SIZE) != entrySize) {
throw new IllegalStateException("Ring buffer size mismatch");
}
}
}
// --- Producer operations ---
public boolean offer(ByteBuffer data) {
long producerSeq = shm.getLong(OFF_PRODUCER_SEQ);
long consumerSeq = shm.getLong(OFF_CONSUMER_SEQ);
if (producerSeq - consumerSeq >= capacity) {
return false; // full
}
int index = (int)(producerSeq & mask);
int offset = DATA_OFFSET + index * entrySize;
// Copy data
data.flip();
for (int i = 0; i < Math.min(data.remaining(), entrySize); i++) {
shm.put(offset + i, data.get());
}
// Memory barrier + publish
shm.force();
shm.putLong(OFF_PRODUCER_SEQ, producerSeq + 1);
shm.force();
return true;
}
// --- Consumer operations ---
public ByteBuffer poll() {
long consumerSeq = shm.getLong(OFF_CONSUMER_SEQ);
long producerSeq = shm.getLong(OFF_PRODUCER_SEQ);
if (consumerSeq >= producerSeq) {
return null; // empty
}
int index = (int)(consumerSeq & mask);
int offset = DATA_OFFSET + index * entrySize;
// Read data
ByteBuffer data = ByteBuffer.allocate(entrySize);
for (int i = 0; i < entrySize; i++) {
data.put(shm.get(offset + i));
}
data.flip();
// Acknowledge
shm.putLong(OFF_CONSUMER_SEQ, consumerSeq + 1);
shm.force();
return data;
}
public int size() {
return (int)(shm.getLong(OFF_PRODUCER_SEQ) - shm.getLong(OFF_CONSUMER_SEQ));
}
@Override
public void close() throws Exception {
channel.close();
}
}
8. The LMAX Disruptor Pattern
The LMAX Disruptor is the inspiration for modern ring buffer designs. It achieved 100 million+ messages per second on a single thread.
Disruptor Architecture
Key Disruptor Innovations
Wait Strategies
// Wait strategy implementations
public interface WaitStrategy {
void wait(long current, SPSCRingBuffer ring);
}
// Lowest latency, burns CPU
public class BusySpinWait implements WaitStrategy {
public void wait(long current, SPSCRingBuffer ring) {
while (ring.isEmpty()) {
Thread.onSpinWait(); // hint to CPU (PAUSE instruction on x86)
}
}
}
// Balanced: spin briefly, then yield
public class YieldWait implements WaitStrategy {
private static final int SPIN_TRIES = 100;
public void wait(long current, SPSCRingBuffer ring) {
int counter = SPIN_TRIES;
while (ring.isEmpty()) {
if (counter > 0) {
counter--;
Thread.onSpinWait();
} else {
Thread.yield();
}
}
}
}
// Low CPU usage, higher latency
public class SleepWait implements WaitStrategy {
public void wait(long current, SPSCRingBuffer ring) {
while (ring.isEmpty()) {
try {
Thread.sleep(0, 1000); // 1 microsecond
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
9. Batching and Throughput Optimization
One of the most important optimizations is batching — processing multiple messages per consumer wake-up:
/**
* Batch-aware consumer that processes all available messages at once.
*/
public class BatchConsumer {
private final SPSCRingBuffer ring;
private final int maxBatchSize;
public BatchConsumer(SPSCRingBuffer ring, int maxBatchSize) {
this.ring = ring;
this.maxBatchSize = maxBatchSize;
}
public void consumeLoop() {
while (!Thread.currentThread().isInterrupted()) {
int available = ring.size();
if (available == 0) {
Thread.onSpinWait();
continue;
}
// Process up to maxBatchSize messages
int batchSize = Math.min(available, maxBatchSize);
// Begin batch
long batchStart = System.nanoTime();
for (int i = 0; i < batchSize; i++) {
long value = ring.takeLong();
if (value == Long.MIN_VALUE) break;
processMessage(value);
}
long batchEnd = System.nanoTime();
recordMetrics(batchSize, batchEnd - batchStart);
}
}
private void processMessage(long value) {
// Your business logic here
}
private void recordMetrics(int batchSize, long nanos) {
// Track average batch size, throughput, latency
}
}
Why batching helps:
Fewer sequence updates → fewer cache line invalidations
Better instruction pipeline utilization
Amortizes the cost of wait strategy wake-ups
10. Monitoring and Observability
A production ring buffer needs monitoring:
public class RingBufferMetrics {
private final SPSCRingBuffer ring;
private long lastProducerSeq = 0;
private long lastConsumerSeq = 0;
private long lastTimestamp = System.nanoTime();
public RingBufferMetrics(SPSCRingBuffer ring) {
this.ring = ring;
}
public Snapshot snapshot() {
long now = System.nanoTime();
int currentSize = ring.size();
int capacity = ring.capacity();
double utilizationPct = 100.0 * currentSize / capacity;
// Calculate rates
double elapsedSec = (now - lastTimestamp) / 1_000_000_000.0;
return new Snapshot(
currentSize,
capacity,
utilizationPct,
ring.isEmpty(),
ring.isFull()
);
}
record Snapshot(
int size,
int capacity,
double utilizationPct,
boolean isEmpty,
boolean isFull
) {
@Override
public String toString() {
return String.format(
"RingBuffer[size=%d/%d (%.1f%%), empty=%s, full=%s]",
size, capacity, utilizationPct, isEmpty, isFull);
}
}
}
Health Dashboard Visualization
Key metrics to monitor:
| Metric | Warning Level | Why |
|---|---|---|
| Utilization % | > 80% | Consumer can't keep up |
| Full events | > 0/min | Producer is blocked |
| Consumer lag | Growing | Consumer falling behind |
| Batch size | Shrinking | Possible contention |
11. Summary
Key takeaways:
Ring buffers eliminate allocation and provide O(1) enqueue/dequeue
Power-of-2 capacity enables fast index calculation via bitwise AND
Cache line padding prevents false sharing (3-5x speedup)
SPSC ring buffers need no locks — just memory ordering
Wait strategies trade CPU usage for latency
Batching amortizes synchronization overhead
Ring buffers over shared memory enable ultra-low-latency IPC