Using the Agrona library (Part 4)
Ring Buffers: Lock-Free IPC
"The fastest way for one thread to talk to another — or one process to talk to another — on the same machine."
Agrona's ring buffers are lock-free, off-heap message-passing structures designed for ultra-low-latency communication. They're the core IPC mechanism inside Aeron and are used in trading systems worldwide.
What is an Agrona Ring Buffer?
An Agrona ring buffer is a fixed-size circular buffer stored in a contiguous chunk of memory (on-heap or off-heap). Messages are written at the tail and read at the head, wrapping around when the end is reached.
Ring Buffer Layout
The underlying memory is divided into two regions:
// Total required memory:
int totalLength = TRAILER_LENGTH + capacity;
// TRAILER_LENGTH = 128 bytes (metadata at the END of the buffer)
// capacity = your message space (must be a power of 2)
Note the 64-byte padding between tail and head positions. This prevents false sharing — both are on separate cache lines so producer and consumer don't contend on the same cache line.
OneToOneRingBuffer — Single Producer, Single Consumer
The fastest variant. One thread writes, one thread reads:
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
// 1. Allocate backing memory
int capacity = 1024 * 64; // 64 KB for messages
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(
capacity + RingBufferDescriptor.TRAILER_LENGTH
);
// 2. Create the ring buffer
OneToOneRingBuffer ringBuffer = new OneToOneRingBuffer(
new UnsafeBuffer(byteBuffer)
);
Writing Messages
// Prepare a message buffer (reusable!)
UnsafeBuffer msgBuffer = new UnsafeBuffer(new byte[256]);
// Encode a message
int msgTypeId = 1; // Application-defined message type
msgBuffer.putLong(0, System.nanoTime()); // timestamp
msgBuffer.putInt(8, 1001); // instrumentId
msgBuffer.putDouble(12, 150.25); // price
msgBuffer.putInt(20, 100); // quantity
int length = 24; // Total message bytes
// Write to ring buffer (returns true on success)
boolean success = ringBuffer.write(msgTypeId, msgBuffer, 0, length);
if (!success) {
// Ring buffer is full! Back-pressure.
// Options: retry, drop, block, apply idle strategy
}
Reading Messages
// Read callback — invoked once per message
ringBuffer.read((msgTypeId, buffer, index, length) -> {
// Called for each available message
long timestamp = buffer.getLong(index);
int instrument = buffer.getInt(index + 8);
double price = buffer.getDouble(index + 12);
int quantity = buffer.getInt(index + 20);
processOrder(timestamp, instrument, price, quantity);
});
// Read at most N messages (bounded batch)
int messagesRead = ringBuffer.read(handler, 10); // read up to 10
ManyToOneRingBuffer — Multiple Producers, Single Consumer
When multiple threads need to write concurrently:
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
ManyToOneRingBuffer ringBuffer = new ManyToOneRingBuffer(
new UnsafeBuffer(byteBuffer)
);
// Multiple producer threads can call write() concurrently
// They coordinate via CAS on the tail position
How Write Works
OneToOneRingBuffer Write (Simplified)
public boolean write(int msgTypeId, DirectBuffer srcBuffer, int srcIndex, int length) {
// 1. Calculate required space (message + header, aligned to 8 bytes)
int recordLength = length + HEADER_LENGTH; // Add 8-byte header
int alignedLength = align(recordLength, ALIGNMENT); // Align to 8 bytes
// 2. Check if there's enough space
long tail = this.tail; // Plain read (only writer)
long head = this.headCache; // Cached head position
int availableCapacity = capacity - (int)(tail - head);
if (alignedLength > availableCapacity) {
head = buffer.getLongVolatile(HEAD_OFFSET); // Refresh head from consumer
this.headCache = head; // Cache it
if (alignedLength > capacity - (int)(tail - head)) {
return false; // Truly full! Back-pressure.
}
}
// 3. Write the message
int tailIndex = (int)(tail & mask); // Bitmask for wrap-around
buffer.putInt(tailIndex, -recordLength); // Negative = "writing in progress"
buffer.putBytes(tailIndex + HEADER_LENGTH, srcBuffer, srcIndex, length);
buffer.putIntOrdered(tailIndex, makeHeader(recordLength, msgTypeId));
// ^^^^^^^^ ORDERED write publishes the message!
// 4. Advance tail
buffer.putLongOrdered(TAIL_OFFSET, tail + alignedLength);
return true;
}
Head Caching — A Performance Trick
The producer caches the consumer's head position. It only performs the expensive volatile read when the cached value suggests the buffer is full. Since the consumer only moves the head forward, the cache is conservatively valid.
How Read Works
public int read(MessageHandler handler, int messageCountLimit) {
long head = buffer.getLong(HEAD_OFFSET);
int headIndex = (int)(head & mask);
int bytesRead = 0;
int messagesRead = 0;
while (bytesRead < capacity && messagesRead < messageCountLimit) {
int recordIndex = headIndex + bytesRead;
int recordLength = buffer.getIntVolatile(recordIndex);
if (recordLength <= 0) {
break; // No more committed messages
// Negative = write in progress (not yet committed)
// Zero = no message here
}
int msgTypeId = recordLength >> 16; // Extract from header
int msgLength = recordLength & 0xFFFF;
// Invoke callback
handler.onMessage(msgTypeId, buffer, recordIndex + HEADER_LENGTH,
msgLength - HEADER_LENGTH);
bytesRead += align(msgLength, ALIGNMENT);
messagesRead++;
}
// Advance head (make space for producer)
if (bytesRead > 0) {
buffer.setMemory(headIndex, bytesRead, (byte) 0); // Zero out read messages
buffer.putLongOrdered(HEAD_OFFSET, head + bytesRead);
}
return messagesRead;
}
Message Format
Every message in the ring buffer has a fixed 8-byte header:
Byte layout of a single record:
┌─────────────────────────────────────────────┐
│ Record Length (4 bytes) │ Msg Type (4 bytes) │ ← Header (8 bytes)
├─────────────────────────────────────────────┤
│ Message Body │ ← Your data
│ (variable length) │
├─────────────────────────────────────────────┤
│ Padding (0-7 bytes) │ ← 8-byte alignment
└─────────────────────────────────────────────┘
IPC via Memory-Mapped Files
The killer feature: use ring buffers for inter-process communication by backing them with memory-mapped files:
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
int capacity = 1024 * 1024; // 1 MB message space
int totalLength = capacity + RingBufferDescriptor.TRAILER_LENGTH;
// Process 1: Create the ring buffer file
File file = new File("/dev/shm/my-ring-buffer"); // tmpfs for speed
try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
raf.setLength(totalLength);
MappedByteBuffer mappedBuffer = raf.getChannel().map(
FileChannel.MapMode.READ_WRITE, 0, totalLength
);
OneToOneRingBuffer ringBuffer = new OneToOneRingBuffer(
new UnsafeBuffer(mappedBuffer)
);
// Write messages...
ringBuffer.write(1, msgBuffer, 0, msgLength);
}
// Process 2: Open the same file and read
try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
MappedByteBuffer mappedBuffer = raf.getChannel().map(
FileChannel.MapMode.READ_WRITE, 0, totalLength
);
OneToOneRingBuffer ringBuffer = new OneToOneRingBuffer(
new UnsafeBuffer(mappedBuffer)
);
// Read messages from another process!
ringBuffer.read((msgTypeId, buffer, index, length) -> {
// Process message from Process 1!
});
}
Performance Characteristics
| Metric | OneToOneRingBuffer | ManyToOneRingBuffer |
|---|---|---|
| Write latency | ~20-40 ns | ~40-80 ns |
| Read latency | ~20-40 ns | ~20-40 ns |
| Throughput | ~100M+ msg/sec | ~50-80M msg/sec |
| Producers | 1 | Multiple |
| Consumers | 1 | 1 |
| Write mechanism | Plain tail update | CAS on tail |
| Allocation | Zero (steady state) | Zero (steady state) |
Why So Fast?
Common Pitfalls
Pitfall 1: Capacity Must Be a Power of 2
// ❌ Will throw IllegalArgumentException!
new OneToOneRingBuffer(new UnsafeBuffer(
ByteBuffer.allocateDirect(1000 + RingBufferDescriptor.TRAILER_LENGTH)
));
// ✅ Capacity must be a power of 2
int capacity = 1024; // 2^10
new OneToOneRingBuffer(new UnsafeBuffer(
ByteBuffer.allocateDirect(capacity + RingBufferDescriptor.TRAILER_LENGTH)
));
Pitfall 2: Don't Forget the Trailer
// ❌ Not enough space — missing trailer!
ByteBuffer buf = ByteBuffer.allocateDirect(1024); // Only message space!
// ✅ Always add TRAILER_LENGTH
ByteBuffer buf = ByteBuffer.allocateDirect(
1024 + RingBufferDescriptor.TRAILER_LENGTH // = 1024 + 128 = 1152
);
Pitfall 3: Handling a Full Buffer
// ❌ Silently dropping messages!
ringBuffer.write(msgType, buffer, 0, length); // Returns false when full!
// ✅ Handle back-pressure
while (!ringBuffer.write(msgType, buffer, 0, length)) {
idleStrategy.idle(); // Wait/yield before retrying
}
Pitfall 4: Message Size Limits
// Maximum message size = capacity / 8
// For a 64 KB ring buffer: maxMessageSize = 64KB/8 = 8 KB
// ❌ Writing a larger message will return false
Summary
The one-liner: Agrona ring buffers provide lock-free, zero-allocation message passing at 100M+ messages/second — backed by memory-mapped files for sub-microsecond IPC between processes on the same machine.