JVM Low-Level I/O - Part 6
Putting It All Together: High-Performance IPC Message Bus
This final article brings together everything from the series — ByteBuffer, encoding, FileChannel, IPC, and ring buffers — into a complete, production-grade IPC message bus. Two (or more) JVM processes will exchange structured messages at a rate of millions per second using shared memory.
1. System Overview
Our message bus uses three files in shared memory:
Why Three Files?
| File | Purpose | Access Pattern |
|---|---|---|
| control.shm | Handshake, heartbeat, shutdown | Infrequent, both read/write |
| data.shm | Variable-length message payloads | Sequential write, sequential read |
| index.shm | Fixed-size index entries (ring buffer) | Ring buffer pattern |
Separating index from data allows fixed-size ring buffer entries in the index (fast!) while supporting variable-length messages in the data region.
2. Protocol Design
Control Channel Layout
Index Entry Layout
Data Region Layout
3. The Message Bus Core
import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.zip.CRC32;
/**
* High-performance IPC Message Bus using shared memory.
*
* Concepts used:
* - ByteBuffer (Article 1): Binary data manipulation
* - Encoding (Article 2): UTF-8 string encoding
* - FileChannel (Article 3): Memory-mapped I/O
* - IPC (Article 4): Shared memory communication
* - Ring Buffer (Article 5): Lock-free message passing
*/
public class MessageBus implements AutoCloseable {
// --- Constants ---
private static final int CTRL_MAGIC = 0xB0C7RL; // "BUS CTRL"
private static final int VERSION = 1;
// Control offsets
private static final int CTRL_SIZE = 256;
private static final int CTRL_MAGIC_OFF = 0;
private static final int CTRL_VERSION_OFF = 4;
private static final int CTRL_PRODUCER_STATUS_OFF = 8;
private static final int CTRL_CONSUMER_STATUS_OFF = 12;
private static final int CTRL_PRODUCER_HB_OFF = 16;
private static final int CTRL_CONSUMER_HB_OFF = 24;
private static final int CTRL_SHUTDOWN_OFF = 32;
// Index ring buffer offsets
private static final int IDX_PRODUCER_SEQ_OFF = 0; // cache line 0
private static final int IDX_CONSUMER_SEQ_OFF = 64; // cache line 1
private static final int IDX_ENTRIES_OFF = 128; // entries start
private static final int IDX_ENTRY_SIZE = 32; // bytes per entry
// Index entry field offsets (within an entry)
private static final int ENTRY_DATA_OFF = 0;
private static final int ENTRY_DATA_LEN = 8;
private static final int ENTRY_MSG_TYPE = 12;
private static final int ENTRY_TIMESTAMP = 16;
private static final int ENTRY_CRC = 24;
// Configuration
private final int indexCapacity; // number of index slots (power of 2)
private final int indexMask;
private final int dataCapacity; // size of data region in bytes
// Memory-mapped buffers
private final MappedByteBuffer ctrlBuffer;
private final MappedByteBuffer indexBuffer;
private final MappedByteBuffer dataBuffer;
// File channels
private final FileChannel ctrlChannel;
private final FileChannel indexChannel;
private final FileChannel dataChannel;
// Data write position (producer only)
private long dataWritePos = 0;
/**
* Create or open the message bus.
*
* @param directory Directory for shared memory files
* @param indexSlots Number of index slots (power of 2)
* @param dataSize Size of data region in bytes
* @param create true to create (producer), false to open (consumer)
*/
public MessageBus(Path directory, int indexSlots, int dataSize,
boolean create) throws Exception {
if (Integer.bitCount(indexSlots) != 1) {
throw new IllegalArgumentException("indexSlots must be power of 2");
}
this.indexCapacity = indexSlots;
this.indexMask = indexSlots - 1;
this.dataCapacity = dataSize;
Files.createDirectories(directory);
Path ctrlPath = directory.resolve("control.shm");
Path indexPath = directory.resolve("index.shm");
Path dataPath = directory.resolve("data.shm");
int indexFileSize = IDX_ENTRIES_OFF + indexSlots * IDX_ENTRY_SIZE;
if (create) {
// Producer creates files
ctrlChannel = openAndSize(ctrlPath, CTRL_SIZE, true);
indexChannel = openAndSize(indexPath, indexFileSize, true);
dataChannel = openAndSize(dataPath, dataSize, true);
ctrlBuffer = ctrlChannel.map(FileChannel.MapMode.READ_WRITE, 0, CTRL_SIZE);
indexBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexFileSize);
dataBuffer = dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, dataSize);
// Initialize control
ctrlBuffer.putInt(CTRL_MAGIC_OFF, CTRL_MAGIC);
ctrlBuffer.putInt(CTRL_VERSION_OFF, VERSION);
ctrlBuffer.putInt(CTRL_PRODUCER_STATUS_OFF, 1); // ready
ctrlBuffer.putLong(CTRL_PRODUCER_HB_OFF, System.currentTimeMillis());
ctrlBuffer.force();
// Initialize index sequences
indexBuffer.putLong(IDX_PRODUCER_SEQ_OFF, 0);
indexBuffer.putLong(IDX_CONSUMER_SEQ_OFF, 0);
indexBuffer.force();
} else {
// Consumer opens existing files
ctrlChannel = FileChannel.open(ctrlPath,
StandardOpenOption.READ, StandardOpenOption.WRITE);
indexChannel = FileChannel.open(indexPath,
StandardOpenOption.READ, StandardOpenOption.WRITE);
dataChannel = FileChannel.open(dataPath,
StandardOpenOption.READ, StandardOpenOption.WRITE);
ctrlBuffer = ctrlChannel.map(FileChannel.MapMode.READ_WRITE, 0, CTRL_SIZE);
indexBuffer = indexChannel.map(
FileChannel.MapMode.READ_WRITE, 0, indexFileSize);
dataBuffer = dataChannel.map(
FileChannel.MapMode.READ_WRITE, 0, dataSize);
// Validate
int magic = ctrlBuffer.getInt(CTRL_MAGIC_OFF);
if (magic != CTRL_MAGIC) {
throw new IllegalStateException(
"Invalid control file: magic=0x" + Integer.toHexString(magic));
}
// Mark consumer as ready
ctrlBuffer.putInt(CTRL_CONSUMER_STATUS_OFF, 1);
ctrlBuffer.putLong(CTRL_CONSUMER_HB_OFF, System.currentTimeMillis());
ctrlBuffer.force();
}
}
private FileChannel openAndSize(Path path, int size, boolean create)
throws Exception {
StandardOpenOption[] opts = create
? new StandardOpenOption[]{StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING}
: new StandardOpenOption[]{StandardOpenOption.READ,
StandardOpenOption.WRITE};
FileChannel ch = FileChannel.open(path, opts);
if (create && ch.size() < size) {
ch.write(ByteBuffer.allocate(size), 0);
}
return ch;
}
// ===============================================
// PRODUCER API
// ===============================================
/**
* Publish a message to the bus.
*
* @param type Message type identifier
* @param payload Message payload bytes
* @return true if published, false if bus is full
*/
public boolean publish(int type, byte[] payload) {
long producerSeq = indexBuffer.getLong(IDX_PRODUCER_SEQ_OFF);
long consumerSeq = indexBuffer.getLong(IDX_CONSUMER_SEQ_OFF);
// Check if index ring is full
if (producerSeq - consumerSeq >= indexCapacity) {
return false;
}
// Check if data region has enough space
if (payload.length > dataCapacity) {
throw new IllegalArgumentException("Payload too large");
}
// Write payload to data region (with wrap-around)
long dataOffset = dataWritePos % dataCapacity;
int remaining = dataCapacity - (int) dataOffset;
if (remaining >= payload.length) {
// Contiguous write
for (int i = 0; i < payload.length; i++) {
dataBuffer.put((int) dataOffset + i, payload[i]);
}
} else {
// Split write (wraps around)
for (int i = 0; i < remaining; i++) {
dataBuffer.put((int) dataOffset + i, payload[i]);
}
for (int i = 0; i < payload.length - remaining; i++) {
dataBuffer.put(i, payload[remaining + i]);
}
}
// Calculate CRC32
CRC32 crc = new CRC32();
crc.update(payload);
int checksum = (int) crc.getValue();
// Write index entry
int indexSlot = (int)(producerSeq & indexMask);
int entryOffset = IDX_ENTRIES_OFF + indexSlot * IDX_ENTRY_SIZE;
indexBuffer.putLong(entryOffset + ENTRY_DATA_OFF, dataOffset);
indexBuffer.putInt(entryOffset + ENTRY_DATA_LEN, payload.length);
indexBuffer.putInt(entryOffset + ENTRY_MSG_TYPE, type);
indexBuffer.putLong(entryOffset + ENTRY_TIMESTAMP, System.nanoTime());
indexBuffer.putInt(entryOffset + ENTRY_CRC, checksum);
// Advance data write position
dataWritePos += payload.length;
// Memory barrier + publish sequence
indexBuffer.force();
indexBuffer.putLong(IDX_PRODUCER_SEQ_OFF, producerSeq + 1);
indexBuffer.force();
return true;
}
/**
* Convenience: publish a string message.
*/
public boolean publishString(int type, String message) {
return publish(type, message.getBytes(java.nio.charset.StandardCharsets.UTF_8));
}
// ===============================================
// CONSUMER API
// ===============================================
/**
* Receive the next message. Returns null if no message available.
*/
public Message receive() {
long consumerSeq = indexBuffer.getLong(IDX_CONSUMER_SEQ_OFF);
long producerSeq = indexBuffer.getLong(IDX_PRODUCER_SEQ_OFF);
if (consumerSeq >= producerSeq) {
return null; // no new messages
}
// Read index entry
int indexSlot = (int)(consumerSeq & indexMask);
int entryOffset = IDX_ENTRIES_OFF + indexSlot * IDX_ENTRY_SIZE;
long dataOffset = indexBuffer.getLong(entryOffset + ENTRY_DATA_OFF);
int dataLength = indexBuffer.getInt(entryOffset + ENTRY_DATA_LEN);
int msgType = indexBuffer.getInt(entryOffset + ENTRY_MSG_TYPE);
long timestamp = indexBuffer.getLong(entryOffset + ENTRY_TIMESTAMP);
int expectedCrc = indexBuffer.getInt(entryOffset + ENTRY_CRC);
// Read payload from data region (with wrap-around)
byte[] payload = new byte[dataLength];
int remaining = dataCapacity - (int) dataOffset;
if (remaining >= dataLength) {
for (int i = 0; i < dataLength; i++) {
payload[i] = dataBuffer.get((int) dataOffset + i);
}
} else {
for (int i = 0; i < remaining; i++) {
payload[i] = dataBuffer.get((int) dataOffset + i);
}
for (int i = 0; i < dataLength - remaining; i++) {
payload[remaining + i] = dataBuffer.get(i);
}
}
// Verify CRC32
CRC32 crc = new CRC32();
crc.update(payload);
int actualCrc = (int) crc.getValue();
if (actualCrc != expectedCrc) {
throw new RuntimeException(
"CRC mismatch! Expected: " + expectedCrc + " Actual: " + actualCrc);
}
// Advance consumer sequence
indexBuffer.putLong(IDX_CONSUMER_SEQ_OFF, consumerSeq + 1);
indexBuffer.force();
return new Message(consumerSeq, msgType, timestamp, payload);
}
// ===============================================
// CONTROL API
// ===============================================
/** Update heartbeat (call periodically from both sides). */
public void heartbeat(boolean isProducer) {
int offset = isProducer ? CTRL_PRODUCER_HB_OFF : CTRL_CONSUMER_HB_OFF;
ctrlBuffer.putLong(offset, System.currentTimeMillis());
}
/** Check if the other side is alive. */
public boolean isPeerAlive(boolean checkProducer, long timeoutMs) {
int offset = checkProducer ? CTRL_PRODUCER_HB_OFF : CTRL_CONSUMER_HB_OFF;
long lastHb = ctrlBuffer.getLong(offset);
return (System.currentTimeMillis() - lastHb) < timeoutMs;
}
/** Signal shutdown. */
public void requestShutdown() {
ctrlBuffer.putInt(CTRL_SHUTDOWN_OFF, 1);
ctrlBuffer.force();
}
/** Check if shutdown was requested. */
public boolean isShutdownRequested() {
return ctrlBuffer.getInt(CTRL_SHUTDOWN_OFF) == 1;
}
/** Get the number of pending messages. */
public int pendingCount() {
long prod = indexBuffer.getLong(IDX_PRODUCER_SEQ_OFF);
long cons = indexBuffer.getLong(IDX_CONSUMER_SEQ_OFF);
return (int)(prod - cons);
}
@Override
public void close() throws Exception {
ctrlChannel.close();
indexChannel.close();
dataChannel.close();
}
// ===============================================
// MESSAGE RECORD
// ===============================================
public record Message(long sequence, int type, long timestamp, byte[] payload) {
/** Get payload as UTF-8 string. */
public String payloadAsString() {
return new String(payload, java.nio.charset.StandardCharsets.UTF_8);
}
@Override
public String toString() {
String preview = payload.length > 50
? new String(payload, 0, 50, java.nio.charset.StandardCharsets.UTF_8) + "..."
: payloadAsString();
return String.format("Message[seq=%d, type=%d, len=%d, data='%s']",
sequence, type, payload.length, preview);
}
}
}
4. Message Serialization
For structured messages, define a serialization layer:
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* Structured message types for the bus.
* Each message type has a fixed type ID and a serialization format.
*/
public class Messages {
// Message type constants
public static final int TYPE_ORDER = 1;
public static final int TYPE_TRADE = 2;
public static final int TYPE_HEARTBEAT = 3;
public static final int TYPE_STATUS = 4;
/**
* Order message:
* ┌───────────┬───────────┬──────────┬──────────────────┐
* │ orderId │ quantity │ price │ symbol (UTF-8) │
* │ 8 bytes │ 4 bytes │ 8 bytes │ 2+N bytes │
* └───────────┴───────────┴──────────┴──────────────────┘
*/
public record Order(long orderId, int quantity, double price, String symbol) {
public byte[] serialize() {
byte[] symbolBytes = symbol.getBytes(StandardCharsets.UTF_8);
ByteBuffer buf = ByteBuffer.allocate(8 + 4 + 8 + 2 + symbolBytes.length);
buf.putLong(orderId);
buf.putInt(quantity);
buf.putDouble(price);
buf.putShort((short) symbolBytes.length);
buf.put(symbolBytes);
return buf.array();
}
public static Order deserialize(byte[] data) {
ByteBuffer buf = ByteBuffer.wrap(data);
long orderId = buf.getLong();
int quantity = buf.getInt();
double price = buf.getDouble();
short symbolLen = buf.getShort();
byte[] symbolBytes = new byte[symbolLen];
buf.get(symbolBytes);
String symbol = new String(symbolBytes, StandardCharsets.UTF_8);
return new Order(orderId, quantity, price, symbol);
}
}
/**
* Trade message:
* ┌───────────┬───────────┬──────────┬──────────┬───────────┐
* │ tradeId │ orderId │ quantity │ price │ timestamp │
* │ 8 bytes │ 8 bytes │ 4 bytes │ 8 bytes │ 8 bytes │
* └───────────┴───────────┴──────────┴──────────┴───────────┘
*/
public record Trade(long tradeId, long orderId, int quantity,
double price, long timestamp) {
public byte[] serialize() {
ByteBuffer buf = ByteBuffer.allocate(36);
buf.putLong(tradeId);
buf.putLong(orderId);
buf.putInt(quantity);
buf.putDouble(price);
buf.putLong(timestamp);
return buf.array();
}
public static Trade deserialize(byte[] data) {
ByteBuffer buf = ByteBuffer.wrap(data);
return new Trade(
buf.getLong(), buf.getLong(), buf.getInt(),
buf.getDouble(), buf.getLong()
);
}
}
}
5. The Producer API
import java.nio.file.*;
/**
* Example producer application.
* Simulates an order management system sending orders through the bus.
*/
public class ProducerApp {
public static void main(String[] args) throws Exception {
Path busDir = Path.of(System.getProperty("java.io.tmpdir"), "message-bus");
// Create bus: 8192 index slots, 4MB data region
try (MessageBus bus = new MessageBus(busDir, 8192, 4 * 1024 * 1024, true)) {
System.out.println("Producer started. Bus directory: " + busDir);
System.out.println("Waiting for consumer...");
// Wait for consumer to connect
while (!bus.isPeerAlive(false, 5000)) {
Thread.sleep(100);
bus.heartbeat(true);
}
System.out.println("Consumer connected!");
// Send orders
long startTime = System.nanoTime();
int messageCount = 1_000_000;
int published = 0;
for (int i = 0; i < messageCount; i++) {
Messages.Order order = new Messages.Order(
i, // orderId
100 + (i % 900), // quantity
150.50 + (i % 100) * 0.01, // price
"AAPL" // symbol
);
byte[] payload = order.serialize();
// Retry loop for back-pressure
while (!bus.publish(Messages.TYPE_ORDER, payload)) {
Thread.onSpinWait();
}
published++;
// Periodic heartbeat
if (i % 10_000 == 0) {
bus.heartbeat(true);
}
}
long elapsed = System.nanoTime() - startTime;
System.out.printf("%nPublished %,d messages in %,d ms%n",
published, elapsed / 1_000_000);
System.out.printf("Throughput: %,.0f messages/sec%n",
published / (elapsed / 1_000_000_000.0));
// Wait for consumer to finish
System.out.println("Waiting for consumer to drain...");
while (bus.pendingCount() > 0) {
Thread.sleep(10);
bus.heartbeat(true);
}
// Shutdown
bus.requestShutdown();
System.out.println("Producer shutdown complete.");
}
}
}
6. The Consumer API
import java.nio.file.*;
/**
* Example consumer application.
* Processes orders from the message bus.
*/
public class ConsumerApp {
public static void main(String[] args) throws Exception {
Path busDir = Path.of(System.getProperty("java.io.tmpdir"), "message-bus");
// Wait for producer to create files
while (!Files.exists(busDir.resolve("control.shm"))) {
System.out.println("Waiting for producer to start...");
Thread.sleep(500);
}
Thread.sleep(100); // let producer finish initialization
try (MessageBus bus = new MessageBus(busDir, 8192, 4 * 1024 * 1024, false)) {
System.out.println("Consumer started. Connected to bus.");
long startTime = System.nanoTime();
long messagesReceived = 0;
long totalQuantity = 0;
double totalValue = 0.0;
while (!bus.isShutdownRequested() || bus.pendingCount() > 0) {
MessageBus.Message msg = bus.receive();
if (msg == null) {
Thread.onSpinWait();
continue;
}
// Dispatch by message type
switch (msg.type()) {
case Messages.TYPE_ORDER -> {
Messages.Order order = Messages.Order.deserialize(msg.payload());
totalQuantity += order.quantity();
totalValue += order.quantity() * order.price();
messagesReceived++;
}
case Messages.TYPE_TRADE -> {
Messages.Trade trade = Messages.Trade.deserialize(msg.payload());
// Process trade...
messagesReceived++;
}
default -> {
System.out.println("Unknown message type: " + msg.type());
}
}
// Periodic heartbeat and stats
if (messagesReceived % 100_000 == 0) {
bus.heartbeat(false);
System.out.printf("Processed %,d messages...%n", messagesReceived);
}
}
long elapsed = System.nanoTime() - startTime;
System.out.printf("%n=== Consumer Summary ===%n");
System.out.printf("Messages received: %,d%n", messagesReceived);
System.out.printf("Total quantity: %,d%n", totalQuantity);
System.out.printf("Total value: $%,.2f%n", totalValue);
System.out.printf("Elapsed time: %,d ms%n", elapsed / 1_000_000);
System.out.printf("Throughput: %,.0f messages/sec%n",
messagesReceived / (elapsed / 1_000_000_000.0));
System.out.printf("Avg latency: %.0f ns/message%n",
(double) elapsed / messagesReceived);
}
}
}
7. The Control Channel
The control channel handles lifecycle management:
Detecting Dead Peers
/**
* Watchdog that monitors the peer and takes action on failure.
*/
public class BusWatchdog implements Runnable {
private final MessageBus bus;
private final boolean monitorProducer;
private final long timeoutMs;
private final Runnable onPeerDeath;
public BusWatchdog(MessageBus bus, boolean monitorProducer,
long timeoutMs, Runnable onPeerDeath) {
this.bus = bus;
this.monitorProducer = monitorProducer;
this.timeoutMs = timeoutMs;
this.onPeerDeath = onPeerDeath;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(timeoutMs / 2);
// Update our heartbeat
bus.heartbeat(!monitorProducer);
// Check peer
if (!bus.isPeerAlive(monitorProducer, timeoutMs)) {
System.err.println("⚠️ Peer heartbeat timeout! " +
"Last seen > " + timeoutMs + "ms ago");
onPeerDeath.run();
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
8. Running the Full System
Step 1: Compile
# Create a project directory
mkdir -p ipc-bus/src
# Copy all Java files to src/
# Compile
javac -d out src/*.java
Step 2: Run Consumer (Terminal 1)
java -cp out ConsumerApp
Step 3: Run Producer (Terminal 2)
java -cp out ProducerApp
Expected Output
Producer:
Producer started. Bus directory: /tmp/message-bus
Waiting for consumer...
Consumer connected!
Published 1,000,000 messages in 850 ms
Throughput: 1,176,471 messages/sec
Waiting for consumer to drain...
Producer shutdown complete.
Consumer:
Consumer started. Connected to bus.
Processed 100,000 messages...
Processed 200,000 messages...
...
Processed 1,000,000 messages...
=== Consumer Summary ===
Messages received: 1,000,000
Total quantity: 549,500,000
Total value: $82,785,750,000.00
Elapsed time: 920 ms
Throughput: 1,086,957 messages/sec
Avg latency: 920 ns/message
9. Performance Testing
Latency Histogram
/**
* Latency measurement using coordinated timestamps.
* Producer embeds nanoTime in message, consumer measures delta.
*/
public class LatencyBenchmark {
public static void main(String[] args) throws Exception {
// Use SPSCRingBuffer for intra-JVM latency test
SPSCRingBuffer ring = new SPSCRingBuffer(1 << 16, 8);
long[] latencies = new long[1_000_000];
Thread consumer = new Thread(() -> {
for (int i = 0; i < latencies.length; i++) {
long value;
while ((value = ring.takeLong()) == Long.MIN_VALUE) {
Thread.onSpinWait();
}
latencies[i] = System.nanoTime() - value;
}
});
consumer.start();
// Warm up
Thread.sleep(10);
for (int i = 0; i < latencies.length; i++) {
while (!ring.offerLong(System.nanoTime())) {
Thread.onSpinWait();
}
}
consumer.join();
// Calculate percentiles
java.util.Arrays.sort(latencies);
System.out.printf("Latency Results:%n");
System.out.printf(" Min: %,d ns%n", latencies[0]);
System.out.printf(" p50: %,d ns%n", latencies[latencies.length / 2]);
System.out.printf(" p90: %,d ns%n", latencies[(int)(latencies.length * 0.9)]);
System.out.printf(" p99: %,d ns%n", latencies[(int)(latencies.length * 0.99)]);
System.out.printf(" p99.9: %,d ns%n", latencies[(int)(latencies.length * 0.999)]);
System.out.printf(" Max: %,d ns%n", latencies[latencies.length - 1]);
}
}
Typical results (intra-JVM SPSC ring buffer):
Latency Results:
Min: 28 ns
p50: 52 ns
p90: 78 ns
p99: 145 ns
p99.9: 890 ns
Max: 12,340 ns
10. Production Considerations
Checklist
JVM Flags for Production
java \
-XX:MaxDirectMemorySize=256m \ # Limit direct buffer memory
-XX:+UseG1GC \ # G1 GC for low pause times
-XX:MaxGCPauseMillis=5 \ # Target max GC pause
-XX:-RestrictContended \ # Enable @Contended annotation
-XX:+AlwaysPreTouch \ # Pre-fault memory pages
-XX:+UseTransparentHugePages \ # Large pages for performance
-Djava.io.tmpdir=/dev/shm \ # Use RAM-backed tmpfs
-cp out ProducerApp
Using /dev/shm (Linux) for True Shared Memory
// On Linux, /dev/shm is a RAM-backed filesystem (tmpfs)
// Files here NEVER touch disk — pure shared memory!
Path shmDir = Path.of("/dev/shm", "my-app-bus");
// This gives us the performance of shared memory
// with the simplicity of file-based APIs
MessageBus bus = new MessageBus(shmDir, 8192, 4 * 1024 * 1024, true);
11. Series Recap
Let's trace how every concept from the series comes together in our message bus:
What We Built
| Component | Concepts Used |
|---|---|
| Data serialization | ByteBuffer (put/get), Encoding (UTF-8) |
| Shared memory mapping | FileChannel (memory-mapped I/O) |
| Message routing | IPC patterns (shared files) |
| Index management | Ring buffer (power-of-2, sequences) |
| Integrity verification | Encoding (CRC32), ByteBuffer (checksums) |
| Lifecycle management | IPC (handshake, heartbeat, shutdown) |
Key Lessons
ByteBuffer is the foundation — all JVM binary I/O flows through it
Always specify encoding — UTF-8 is the safe default
Memory-mapped I/O bridges the gap between files and memory
Shared memory is the fastest IPC — sub-microsecond latency
Ring buffers enable lock-free messaging — millions of messages/sec
Mechanical sympathy matters — cache lines, padding, alignment
Simple protocols are robust — magic numbers, CRC, heartbeats
Further Reading
LMAX Disruptor: Production-grade ring buffer implementation
Chronicle Queue: Persistent, memory-mapped message queue for Java
Aeron: Ultra-low-latency transport from Real Logic
Java Foreign Memory API (JEP 454): Modern, safe alternative to
sun.misc.Unsafeio_uring: Linux's next-gen async I/O (accessible via JNI/FFI)