Skip to main content

Command Palette

Search for a command to run...

JVM Low-Level I/O - Part 6

Putting It All Together: High-Performance IPC Message Bus

Published
16 min read

This final article brings together everything from the series — ByteBuffer, encoding, FileChannel, IPC, and ring buffers — into a complete, production-grade IPC message bus. Two (or more) JVM processes will exchange structured messages at a rate of millions per second using shared memory.


1. System Overview

Our message bus uses three files in shared memory:

Why Three Files?

File Purpose Access Pattern
control.shm Handshake, heartbeat, shutdown Infrequent, both read/write
data.shm Variable-length message payloads Sequential write, sequential read
index.shm Fixed-size index entries (ring buffer) Ring buffer pattern

Separating index from data allows fixed-size ring buffer entries in the index (fast!) while supporting variable-length messages in the data region.


2. Protocol Design

Control Channel Layout

Index Entry Layout

Data Region Layout


3. The Message Bus Core

import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.zip.CRC32;

/**
 * High-performance IPC Message Bus using shared memory.
 * 
 * Concepts used:
 * - ByteBuffer (Article 1): Binary data manipulation
 * - Encoding (Article 2): UTF-8 string encoding
 * - FileChannel (Article 3): Memory-mapped I/O
 * - IPC (Article 4): Shared memory communication
 * - Ring Buffer (Article 5): Lock-free message passing
 */
public class MessageBus implements AutoCloseable {

    // --- Constants ---
    private static final int CTRL_MAGIC = 0xB0C7RL;  // "BUS CTRL"
    private static final int VERSION = 1;
    
    // Control offsets
    private static final int CTRL_SIZE = 256;
    private static final int CTRL_MAGIC_OFF = 0;
    private static final int CTRL_VERSION_OFF = 4;
    private static final int CTRL_PRODUCER_STATUS_OFF = 8;
    private static final int CTRL_CONSUMER_STATUS_OFF = 12;
    private static final int CTRL_PRODUCER_HB_OFF = 16;
    private static final int CTRL_CONSUMER_HB_OFF = 24;
    private static final int CTRL_SHUTDOWN_OFF = 32;
    
    // Index ring buffer offsets
    private static final int IDX_PRODUCER_SEQ_OFF = 0;   // cache line 0
    private static final int IDX_CONSUMER_SEQ_OFF = 64;  // cache line 1
    private static final int IDX_ENTRIES_OFF = 128;       // entries start
    private static final int IDX_ENTRY_SIZE = 32;         // bytes per entry
    
    // Index entry field offsets (within an entry)
    private static final int ENTRY_DATA_OFF = 0;
    private static final int ENTRY_DATA_LEN = 8;
    private static final int ENTRY_MSG_TYPE = 12;
    private static final int ENTRY_TIMESTAMP = 16;
    private static final int ENTRY_CRC = 24;
    
    // Configuration
    private final int indexCapacity;  // number of index slots (power of 2)
    private final int indexMask;
    private final int dataCapacity;   // size of data region in bytes
    
    // Memory-mapped buffers
    private final MappedByteBuffer ctrlBuffer;
    private final MappedByteBuffer indexBuffer;
    private final MappedByteBuffer dataBuffer;
    
    // File channels
    private final FileChannel ctrlChannel;
    private final FileChannel indexChannel;
    private final FileChannel dataChannel;
    
    // Data write position (producer only)
    private long dataWritePos = 0;
    
    /**
     * Create or open the message bus.
     *
     * @param directory  Directory for shared memory files
     * @param indexSlots Number of index slots (power of 2)
     * @param dataSize   Size of data region in bytes
     * @param create     true to create (producer), false to open (consumer)
     */
    public MessageBus(Path directory, int indexSlots, int dataSize, 
                      boolean create) throws Exception {
        
        if (Integer.bitCount(indexSlots) != 1) {
            throw new IllegalArgumentException("indexSlots must be power of 2");
        }
        
        this.indexCapacity = indexSlots;
        this.indexMask = indexSlots - 1;
        this.dataCapacity = dataSize;
        
        Files.createDirectories(directory);
        
        Path ctrlPath = directory.resolve("control.shm");
        Path indexPath = directory.resolve("index.shm");
        Path dataPath = directory.resolve("data.shm");
        
        int indexFileSize = IDX_ENTRIES_OFF + indexSlots * IDX_ENTRY_SIZE;
        
        if (create) {
            // Producer creates files
            ctrlChannel = openAndSize(ctrlPath, CTRL_SIZE, true);
            indexChannel = openAndSize(indexPath, indexFileSize, true);
            dataChannel = openAndSize(dataPath, dataSize, true);
            
            ctrlBuffer = ctrlChannel.map(FileChannel.MapMode.READ_WRITE, 0, CTRL_SIZE);
            indexBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexFileSize);
            dataBuffer = dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, dataSize);
            
            // Initialize control
            ctrlBuffer.putInt(CTRL_MAGIC_OFF, CTRL_MAGIC);
            ctrlBuffer.putInt(CTRL_VERSION_OFF, VERSION);
            ctrlBuffer.putInt(CTRL_PRODUCER_STATUS_OFF, 1); // ready
            ctrlBuffer.putLong(CTRL_PRODUCER_HB_OFF, System.currentTimeMillis());
            ctrlBuffer.force();
            
            // Initialize index sequences
            indexBuffer.putLong(IDX_PRODUCER_SEQ_OFF, 0);
            indexBuffer.putLong(IDX_CONSUMER_SEQ_OFF, 0);
            indexBuffer.force();
            
        } else {
            // Consumer opens existing files
            ctrlChannel = FileChannel.open(ctrlPath, 
                StandardOpenOption.READ, StandardOpenOption.WRITE);
            indexChannel = FileChannel.open(indexPath, 
                StandardOpenOption.READ, StandardOpenOption.WRITE);
            dataChannel = FileChannel.open(dataPath, 
                StandardOpenOption.READ, StandardOpenOption.WRITE);
            
            ctrlBuffer = ctrlChannel.map(FileChannel.MapMode.READ_WRITE, 0, CTRL_SIZE);
            indexBuffer = indexChannel.map(
                FileChannel.MapMode.READ_WRITE, 0, indexFileSize);
            dataBuffer = dataChannel.map(
                FileChannel.MapMode.READ_WRITE, 0, dataSize);
            
            // Validate
            int magic = ctrlBuffer.getInt(CTRL_MAGIC_OFF);
            if (magic != CTRL_MAGIC) {
                throw new IllegalStateException(
                    "Invalid control file: magic=0x" + Integer.toHexString(magic));
            }
            
            // Mark consumer as ready
            ctrlBuffer.putInt(CTRL_CONSUMER_STATUS_OFF, 1);
            ctrlBuffer.putLong(CTRL_CONSUMER_HB_OFF, System.currentTimeMillis());
            ctrlBuffer.force();
        }
    }
    
    private FileChannel openAndSize(Path path, int size, boolean create) 
            throws Exception {
        StandardOpenOption[] opts = create 
            ? new StandardOpenOption[]{StandardOpenOption.READ, 
                StandardOpenOption.WRITE, StandardOpenOption.CREATE,
                StandardOpenOption.TRUNCATE_EXISTING}
            : new StandardOpenOption[]{StandardOpenOption.READ, 
                StandardOpenOption.WRITE};
        
        FileChannel ch = FileChannel.open(path, opts);
        if (create && ch.size() < size) {
            ch.write(ByteBuffer.allocate(size), 0);
        }
        return ch;
    }

    // ===============================================
    // PRODUCER API
    // ===============================================
    
    /**
     * Publish a message to the bus.
     * 
     * @param type    Message type identifier
     * @param payload Message payload bytes
     * @return true if published, false if bus is full
     */
    public boolean publish(int type, byte[] payload) {
        long producerSeq = indexBuffer.getLong(IDX_PRODUCER_SEQ_OFF);
        long consumerSeq = indexBuffer.getLong(IDX_CONSUMER_SEQ_OFF);
        
        // Check if index ring is full
        if (producerSeq - consumerSeq >= indexCapacity) {
            return false;
        }
        
        // Check if data region has enough space
        if (payload.length > dataCapacity) {
            throw new IllegalArgumentException("Payload too large");
        }
        
        // Write payload to data region (with wrap-around)
        long dataOffset = dataWritePos % dataCapacity;
        int remaining = dataCapacity - (int) dataOffset;
        
        if (remaining >= payload.length) {
            // Contiguous write
            for (int i = 0; i < payload.length; i++) {
                dataBuffer.put((int) dataOffset + i, payload[i]);
            }
        } else {
            // Split write (wraps around)
            for (int i = 0; i < remaining; i++) {
                dataBuffer.put((int) dataOffset + i, payload[i]);
            }
            for (int i = 0; i < payload.length - remaining; i++) {
                dataBuffer.put(i, payload[remaining + i]);
            }
        }
        
        // Calculate CRC32
        CRC32 crc = new CRC32();
        crc.update(payload);
        int checksum = (int) crc.getValue();
        
        // Write index entry
        int indexSlot = (int)(producerSeq & indexMask);
        int entryOffset = IDX_ENTRIES_OFF + indexSlot * IDX_ENTRY_SIZE;
        
        indexBuffer.putLong(entryOffset + ENTRY_DATA_OFF, dataOffset);
        indexBuffer.putInt(entryOffset + ENTRY_DATA_LEN, payload.length);
        indexBuffer.putInt(entryOffset + ENTRY_MSG_TYPE, type);
        indexBuffer.putLong(entryOffset + ENTRY_TIMESTAMP, System.nanoTime());
        indexBuffer.putInt(entryOffset + ENTRY_CRC, checksum);
        
        // Advance data write position
        dataWritePos += payload.length;
        
        // Memory barrier + publish sequence
        indexBuffer.force();
        indexBuffer.putLong(IDX_PRODUCER_SEQ_OFF, producerSeq + 1);
        indexBuffer.force();
        
        return true;
    }
    
    /**
     * Convenience: publish a string message.
     */
    public boolean publishString(int type, String message) {
        return publish(type, message.getBytes(java.nio.charset.StandardCharsets.UTF_8));
    }
    
    // ===============================================
    // CONSUMER API
    // ===============================================
    
    /**
     * Receive the next message. Returns null if no message available.
     */
    public Message receive() {
        long consumerSeq = indexBuffer.getLong(IDX_CONSUMER_SEQ_OFF);
        long producerSeq = indexBuffer.getLong(IDX_PRODUCER_SEQ_OFF);
        
        if (consumerSeq >= producerSeq) {
            return null; // no new messages
        }
        
        // Read index entry
        int indexSlot = (int)(consumerSeq & indexMask);
        int entryOffset = IDX_ENTRIES_OFF + indexSlot * IDX_ENTRY_SIZE;
        
        long dataOffset = indexBuffer.getLong(entryOffset + ENTRY_DATA_OFF);
        int dataLength = indexBuffer.getInt(entryOffset + ENTRY_DATA_LEN);
        int msgType = indexBuffer.getInt(entryOffset + ENTRY_MSG_TYPE);
        long timestamp = indexBuffer.getLong(entryOffset + ENTRY_TIMESTAMP);
        int expectedCrc = indexBuffer.getInt(entryOffset + ENTRY_CRC);
        
        // Read payload from data region (with wrap-around)
        byte[] payload = new byte[dataLength];
        int remaining = dataCapacity - (int) dataOffset;
        
        if (remaining >= dataLength) {
            for (int i = 0; i < dataLength; i++) {
                payload[i] = dataBuffer.get((int) dataOffset + i);
            }
        } else {
            for (int i = 0; i < remaining; i++) {
                payload[i] = dataBuffer.get((int) dataOffset + i);
            }
            for (int i = 0; i < dataLength - remaining; i++) {
                payload[remaining + i] = dataBuffer.get(i);
            }
        }
        
        // Verify CRC32
        CRC32 crc = new CRC32();
        crc.update(payload);
        int actualCrc = (int) crc.getValue();
        
        if (actualCrc != expectedCrc) {
            throw new RuntimeException(
                "CRC mismatch! Expected: " + expectedCrc + " Actual: " + actualCrc);
        }
        
        // Advance consumer sequence
        indexBuffer.putLong(IDX_CONSUMER_SEQ_OFF, consumerSeq + 1);
        indexBuffer.force();
        
        return new Message(consumerSeq, msgType, timestamp, payload);
    }
    
    // ===============================================
    // CONTROL API
    // ===============================================
    
    /** Update heartbeat (call periodically from both sides). */
    public void heartbeat(boolean isProducer) {
        int offset = isProducer ? CTRL_PRODUCER_HB_OFF : CTRL_CONSUMER_HB_OFF;
        ctrlBuffer.putLong(offset, System.currentTimeMillis());
    }
    
    /** Check if the other side is alive. */
    public boolean isPeerAlive(boolean checkProducer, long timeoutMs) {
        int offset = checkProducer ? CTRL_PRODUCER_HB_OFF : CTRL_CONSUMER_HB_OFF;
        long lastHb = ctrlBuffer.getLong(offset);
        return (System.currentTimeMillis() - lastHb) < timeoutMs;
    }
    
    /** Signal shutdown. */
    public void requestShutdown() {
        ctrlBuffer.putInt(CTRL_SHUTDOWN_OFF, 1);
        ctrlBuffer.force();
    }
    
    /** Check if shutdown was requested. */
    public boolean isShutdownRequested() {
        return ctrlBuffer.getInt(CTRL_SHUTDOWN_OFF) == 1;
    }
    
    /** Get the number of pending messages. */
    public int pendingCount() {
        long prod = indexBuffer.getLong(IDX_PRODUCER_SEQ_OFF);
        long cons = indexBuffer.getLong(IDX_CONSUMER_SEQ_OFF);
        return (int)(prod - cons);
    }
    
    @Override
    public void close() throws Exception {
        ctrlChannel.close();
        indexChannel.close();
        dataChannel.close();
    }
    
    // ===============================================
    // MESSAGE RECORD
    // ===============================================
    
    public record Message(long sequence, int type, long timestamp, byte[] payload) {
        /** Get payload as UTF-8 string. */
        public String payloadAsString() {
            return new String(payload, java.nio.charset.StandardCharsets.UTF_8);
        }
        
        @Override
        public String toString() {
            String preview = payload.length > 50 
                ? new String(payload, 0, 50, java.nio.charset.StandardCharsets.UTF_8) + "..."
                : payloadAsString();
            return String.format("Message[seq=%d, type=%d, len=%d, data='%s']",
                sequence, type, payload.length, preview);
        }
    }
}

4. Message Serialization

For structured messages, define a serialization layer:

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
 * Structured message types for the bus.
 * Each message type has a fixed type ID and a serialization format.
 */
public class Messages {
    
    // Message type constants
    public static final int TYPE_ORDER = 1;
    public static final int TYPE_TRADE = 2;
    public static final int TYPE_HEARTBEAT = 3;
    public static final int TYPE_STATUS = 4;
    
    /**
     * Order message:
     * ┌───────────┬───────────┬──────────┬──────────────────┐
     * │ orderId   │ quantity  │ price    │ symbol (UTF-8)   │
     * │ 8 bytes   │ 4 bytes   │ 8 bytes  │ 2+N bytes        │
     * └───────────┴───────────┴──────────┴──────────────────┘
     */
    public record Order(long orderId, int quantity, double price, String symbol) {
        
        public byte[] serialize() {
            byte[] symbolBytes = symbol.getBytes(StandardCharsets.UTF_8);
            ByteBuffer buf = ByteBuffer.allocate(8 + 4 + 8 + 2 + symbolBytes.length);
            buf.putLong(orderId);
            buf.putInt(quantity);
            buf.putDouble(price);
            buf.putShort((short) symbolBytes.length);
            buf.put(symbolBytes);
            return buf.array();
        }
        
        public static Order deserialize(byte[] data) {
            ByteBuffer buf = ByteBuffer.wrap(data);
            long orderId = buf.getLong();
            int quantity = buf.getInt();
            double price = buf.getDouble();
            short symbolLen = buf.getShort();
            byte[] symbolBytes = new byte[symbolLen];
            buf.get(symbolBytes);
            String symbol = new String(symbolBytes, StandardCharsets.UTF_8);
            return new Order(orderId, quantity, price, symbol);
        }
    }
    
    /**
     * Trade message:
     * ┌───────────┬───────────┬──────────┬──────────┬───────────┐
     * │ tradeId   │ orderId   │ quantity │ price    │ timestamp │
     * │ 8 bytes   │ 8 bytes   │ 4 bytes  │ 8 bytes  │ 8 bytes   │
     * └───────────┴───────────┴──────────┴──────────┴───────────┘
     */
    public record Trade(long tradeId, long orderId, int quantity, 
                        double price, long timestamp) {
        
        public byte[] serialize() {
            ByteBuffer buf = ByteBuffer.allocate(36);
            buf.putLong(tradeId);
            buf.putLong(orderId);
            buf.putInt(quantity);
            buf.putDouble(price);
            buf.putLong(timestamp);
            return buf.array();
        }
        
        public static Trade deserialize(byte[] data) {
            ByteBuffer buf = ByteBuffer.wrap(data);
            return new Trade(
                buf.getLong(), buf.getLong(), buf.getInt(),
                buf.getDouble(), buf.getLong()
            );
        }
    }
}

5. The Producer API

import java.nio.file.*;

/**
 * Example producer application.
 * Simulates an order management system sending orders through the bus.
 */
public class ProducerApp {
    
    public static void main(String[] args) throws Exception {
        Path busDir = Path.of(System.getProperty("java.io.tmpdir"), "message-bus");
        
        // Create bus: 8192 index slots, 4MB data region
        try (MessageBus bus = new MessageBus(busDir, 8192, 4 * 1024 * 1024, true)) {
            System.out.println("Producer started. Bus directory: " + busDir);
            System.out.println("Waiting for consumer...");
            
            // Wait for consumer to connect
            while (!bus.isPeerAlive(false, 5000)) {
                Thread.sleep(100);
                bus.heartbeat(true);
            }
            System.out.println("Consumer connected!");
            
            // Send orders
            long startTime = System.nanoTime();
            int messageCount = 1_000_000;
            int published = 0;
            
            for (int i = 0; i < messageCount; i++) {
                Messages.Order order = new Messages.Order(
                    i,                                    // orderId
                    100 + (i % 900),                     // quantity
                    150.50 + (i % 100) * 0.01,           // price
                    "AAPL"                                // symbol
                );
                
                byte[] payload = order.serialize();
                
                // Retry loop for back-pressure
                while (!bus.publish(Messages.TYPE_ORDER, payload)) {
                    Thread.onSpinWait();
                }
                
                published++;
                
                // Periodic heartbeat
                if (i % 10_000 == 0) {
                    bus.heartbeat(true);
                }
            }
            
            long elapsed = System.nanoTime() - startTime;
            
            System.out.printf("%nPublished %,d messages in %,d ms%n", 
                published, elapsed / 1_000_000);
            System.out.printf("Throughput: %,.0f messages/sec%n", 
                published / (elapsed / 1_000_000_000.0));
            
            // Wait for consumer to finish
            System.out.println("Waiting for consumer to drain...");
            while (bus.pendingCount() > 0) {
                Thread.sleep(10);
                bus.heartbeat(true);
            }
            
            // Shutdown
            bus.requestShutdown();
            System.out.println("Producer shutdown complete.");
        }
    }
}

6. The Consumer API

import java.nio.file.*;

/**
 * Example consumer application.
 * Processes orders from the message bus.
 */
public class ConsumerApp {
    
    public static void main(String[] args) throws Exception {
        Path busDir = Path.of(System.getProperty("java.io.tmpdir"), "message-bus");
        
        // Wait for producer to create files
        while (!Files.exists(busDir.resolve("control.shm"))) {
            System.out.println("Waiting for producer to start...");
            Thread.sleep(500);
        }
        Thread.sleep(100); // let producer finish initialization
        
        try (MessageBus bus = new MessageBus(busDir, 8192, 4 * 1024 * 1024, false)) {
            System.out.println("Consumer started. Connected to bus.");
            
            long startTime = System.nanoTime();
            long messagesReceived = 0;
            long totalQuantity = 0;
            double totalValue = 0.0;
            
            while (!bus.isShutdownRequested() || bus.pendingCount() > 0) {
                MessageBus.Message msg = bus.receive();
                
                if (msg == null) {
                    Thread.onSpinWait();
                    continue;
                }
                
                // Dispatch by message type
                switch (msg.type()) {
                    case Messages.TYPE_ORDER -> {
                        Messages.Order order = Messages.Order.deserialize(msg.payload());
                        totalQuantity += order.quantity();
                        totalValue += order.quantity() * order.price();
                        messagesReceived++;
                    }
                    case Messages.TYPE_TRADE -> {
                        Messages.Trade trade = Messages.Trade.deserialize(msg.payload());
                        // Process trade...
                        messagesReceived++;
                    }
                    default -> {
                        System.out.println("Unknown message type: " + msg.type());
                    }
                }
                
                // Periodic heartbeat and stats
                if (messagesReceived % 100_000 == 0) {
                    bus.heartbeat(false);
                    System.out.printf("Processed %,d messages...%n", messagesReceived);
                }
            }
            
            long elapsed = System.nanoTime() - startTime;
            
            System.out.printf("%n=== Consumer Summary ===%n");
            System.out.printf("Messages received:  %,d%n", messagesReceived);
            System.out.printf("Total quantity:     %,d%n", totalQuantity);
            System.out.printf("Total value:        $%,.2f%n", totalValue);
            System.out.printf("Elapsed time:       %,d ms%n", elapsed / 1_000_000);
            System.out.printf("Throughput:         %,.0f messages/sec%n", 
                messagesReceived / (elapsed / 1_000_000_000.0));
            System.out.printf("Avg latency:        %.0f ns/message%n", 
                (double) elapsed / messagesReceived);
        }
    }
}

7. The Control Channel

The control channel handles lifecycle management:

Detecting Dead Peers

/**
 * Watchdog that monitors the peer and takes action on failure.
 */
public class BusWatchdog implements Runnable {
    private final MessageBus bus;
    private final boolean monitorProducer;
    private final long timeoutMs;
    private final Runnable onPeerDeath;
    
    public BusWatchdog(MessageBus bus, boolean monitorProducer,
                       long timeoutMs, Runnable onPeerDeath) {
        this.bus = bus;
        this.monitorProducer = monitorProducer;
        this.timeoutMs = timeoutMs;
        this.onPeerDeath = onPeerDeath;
    }
    
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Thread.sleep(timeoutMs / 2);
                
                // Update our heartbeat
                bus.heartbeat(!monitorProducer);
                
                // Check peer
                if (!bus.isPeerAlive(monitorProducer, timeoutMs)) {
                    System.err.println("⚠️ Peer heartbeat timeout! " +
                        "Last seen > " + timeoutMs + "ms ago");
                    onPeerDeath.run();
                    return;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
}

8. Running the Full System

Step 1: Compile

# Create a project directory
mkdir -p ipc-bus/src
# Copy all Java files to src/

# Compile
javac -d out src/*.java

Step 2: Run Consumer (Terminal 1)

java -cp out ConsumerApp

Step 3: Run Producer (Terminal 2)

java -cp out ProducerApp

Expected Output

Producer:

Producer started. Bus directory: /tmp/message-bus
Waiting for consumer...
Consumer connected!

Published 1,000,000 messages in 850 ms
Throughput: 1,176,471 messages/sec
Waiting for consumer to drain...
Producer shutdown complete.

Consumer:

Consumer started. Connected to bus.
Processed 100,000 messages...
Processed 200,000 messages...
...
Processed 1,000,000 messages...

=== Consumer Summary ===
Messages received:  1,000,000
Total quantity:     549,500,000
Total value:        $82,785,750,000.00
Elapsed time:       920 ms
Throughput:         1,086,957 messages/sec
Avg latency:        920 ns/message

9. Performance Testing

Latency Histogram

/**
 * Latency measurement using coordinated timestamps.
 * Producer embeds nanoTime in message, consumer measures delta.
 */
public class LatencyBenchmark {
    
    public static void main(String[] args) throws Exception {
        // Use SPSCRingBuffer for intra-JVM latency test
        SPSCRingBuffer ring = new SPSCRingBuffer(1 << 16, 8);
        
        long[] latencies = new long[1_000_000];
        
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < latencies.length; i++) {
                long value;
                while ((value = ring.takeLong()) == Long.MIN_VALUE) {
                    Thread.onSpinWait();
                }
                latencies[i] = System.nanoTime() - value;
            }
        });
        consumer.start();
        
        // Warm up
        Thread.sleep(10);
        
        for (int i = 0; i < latencies.length; i++) {
            while (!ring.offerLong(System.nanoTime())) {
                Thread.onSpinWait();
            }
        }
        
        consumer.join();
        
        // Calculate percentiles
        java.util.Arrays.sort(latencies);
        System.out.printf("Latency Results:%n");
        System.out.printf("  Min:    %,d ns%n", latencies[0]);
        System.out.printf("  p50:    %,d ns%n", latencies[latencies.length / 2]);
        System.out.printf("  p90:    %,d ns%n", latencies[(int)(latencies.length * 0.9)]);
        System.out.printf("  p99:    %,d ns%n", latencies[(int)(latencies.length * 0.99)]);
        System.out.printf("  p99.9:  %,d ns%n", latencies[(int)(latencies.length * 0.999)]);
        System.out.printf("  Max:    %,d ns%n", latencies[latencies.length - 1]);
    }
}

Typical results (intra-JVM SPSC ring buffer):

Latency Results:
  Min:       28 ns
  p50:       52 ns
  p90:       78 ns
  p99:      145 ns
  p99.9:    890 ns
  Max:   12,340 ns

10. Production Considerations

Checklist

JVM Flags for Production

java \
  -XX:MaxDirectMemorySize=256m \     # Limit direct buffer memory
  -XX:+UseG1GC \                     # G1 GC for low pause times
  -XX:MaxGCPauseMillis=5 \           # Target max GC pause
  -XX:-RestrictContended \           # Enable @Contended annotation
  -XX:+AlwaysPreTouch \              # Pre-fault memory pages
  -XX:+UseTransparentHugePages \     # Large pages for performance
  -Djava.io.tmpdir=/dev/shm \        # Use RAM-backed tmpfs
  -cp out ProducerApp

Using /dev/shm (Linux) for True Shared Memory

// On Linux, /dev/shm is a RAM-backed filesystem (tmpfs)
// Files here NEVER touch disk — pure shared memory!

Path shmDir = Path.of("/dev/shm", "my-app-bus");

// This gives us the performance of shared memory
// with the simplicity of file-based APIs
MessageBus bus = new MessageBus(shmDir, 8192, 4 * 1024 * 1024, true);

11. Series Recap

Let's trace how every concept from the series comes together in our message bus:

What We Built

Component Concepts Used
Data serialization ByteBuffer (put/get), Encoding (UTF-8)
Shared memory mapping FileChannel (memory-mapped I/O)
Message routing IPC patterns (shared files)
Index management Ring buffer (power-of-2, sequences)
Integrity verification Encoding (CRC32), ByteBuffer (checksums)
Lifecycle management IPC (handshake, heartbeat, shutdown)

Key Lessons

  1. ByteBuffer is the foundation — all JVM binary I/O flows through it

  2. Always specify encoding — UTF-8 is the safe default

  3. Memory-mapped I/O bridges the gap between files and memory

  4. Shared memory is the fastest IPC — sub-microsecond latency

  5. Ring buffers enable lock-free messaging — millions of messages/sec

  6. Mechanical sympathy matters — cache lines, padding, alignment

  7. Simple protocols are robust — magic numbers, CRC, heartbeats

Further Reading