Skip to main content

Command Palette

Search for a command to run...

Using the Agrona library (Part 5)

Broadcast Buffers & Counters

Published
5 min read

"Ring buffers are for guaranteed delivery. Broadcast buffers are for 'hear this if you can.' Counters are for 'how much happened.'"

This article covers two more Agrona communication primitives: Broadcast Buffers for one-to-many messaging, and Counters for off-heap telemetry and coordination.


Broadcast Buffers — One-to-Many Messaging

Unlike ring buffers (1 consumer), broadcast buffers support multiple independent consumers. Each receiver tracks its own position, and the transmitter doesn't wait for slow receivers.

Key difference from ring buffers:

  • Ring buffer: consumer controls back-pressure (producer waits if full)

  • Broadcast: transmitter never waits — slow receivers are "lapped" (they miss messages)


BroadcastTransmitter & BroadcastReceiver

import org.agrona.concurrent.broadcast.*;
import org.agrona.concurrent.UnsafeBuffer;

// Shared broadcast buffer memory
int capacity = 1024 * 64; // 64 KB
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(
    capacity + BroadcastBufferDescriptor.TRAILER_LENGTH
);
UnsafeBuffer broadcastBuffer = new UnsafeBuffer(byteBuffer);

// --- Transmitter (producer) ---
BroadcastTransmitter transmitter = new BroadcastTransmitter(broadcastBuffer);

UnsafeBuffer msg = new UnsafeBuffer(new byte[64]);
msg.putLong(0, System.nanoTime());
msg.putInt(8, 42);

transmitter.transmit(1, msg, 0, 12); // msgTypeId=1, 12 bytes

// --- Receiver (consumer) — one per consumer thread ---
BroadcastReceiver receiver = new BroadcastReceiver(broadcastBuffer);
CopyBroadcastReceiver copyReceiver = new CopyBroadcastReceiver(receiver);

// Each receiver independently tracks its position
int messagesReceived = copyReceiver.receive((msgTypeId, buffer, index, length) -> {
    long timestamp = buffer.getLong(index);
    int value = buffer.getInt(index + 8);
    process(timestamp, value);
});

Lapped Receivers — Handling Slow Consumers

When the transmitter overwrites messages that a slow receiver hasn't read yet, the receiver detects a lap and must discard its current batch:

CopyBroadcastReceiver copyReceiver = new CopyBroadcastReceiver(receiver);

// CopyBroadcastReceiver copies the message to a scratch buffer before
// delivering it — this provides a consistent snapshot even if the
// transmitter overwrites the original during processing.

// If a lap is detected, receive() returns 0 for that batch
// and the receiver resets to the latest position.
int received = copyReceiver.receive(handler);
// received = 0 means "lapped — try again from latest position"

Counters — Off-Heap Telemetry

Agrona's counters provide off-heap, atomically updated values for:

  • Application telemetry (messages processed, errors, latency)

  • Position tracking (consumer position in a log)

  • Coordination (heartbeat timestamps, state flags)


CountersManager — The Counter Registry

import org.agrona.concurrent.status.*;
import org.agrona.concurrent.UnsafeBuffer;

// Two buffers: values and metadata
int maxCounters = 1024;
UnsafeBuffer valuesBuffer = new UnsafeBuffer(
    ByteBuffer.allocateDirect(maxCounters * CountersReader.COUNTER_LENGTH)
);
UnsafeBuffer metaDataBuffer = new UnsafeBuffer(
    ByteBuffer.allocateDirect(maxCounters * CountersReader.METADATA_LENGTH)
);

// Create the manager
CountersManager countersManager = new CountersManager(
    metaDataBuffer, valuesBuffer
);

// Allocate a named counter
int counterId = countersManager.allocate("messages-processed");
int errorId   = countersManager.allocate("errors-total");

// Get an AtomicCounter for writing
AtomicCounter messagesCounter = new AtomicCounter(valuesBuffer, counterId);
AtomicCounter errorsCounter   = new AtomicCounter(valuesBuffer, errorId);

Counter Layout in Memory

Each counter value is padded to a full 64-byte cache line. This prevents false sharing when different threads update different counters — a critical performance detail.


AtomicCounter — Thread-Safe Counting

AtomicCounter counter = new AtomicCounter(valuesBuffer, counterId);

// Increment (atomic)
counter.increment();              // += 1
counter.getAndAdd(10);            // += 10, returns old value

// Set (ordered write — visible to readers after publication)
counter.setOrdered(42L);

// Set (volatile — immediate visibility)
counter.set(100L);

// Read
long value = counter.get();        // volatile read
long ordered = counter.getWeak();  // plain read (fastest)

// Compare-and-set
boolean success = counter.compareAndSet(100L, 200L);

// Close (marks as free for reuse)
counter.close();
// Typical usage in an event loop:
class OrderProcessor implements Agent {
    private final AtomicCounter messagesProcessed;
    private final AtomicCounter latencyNs;
    
    public int doWork() {
        int work = 0;
        int messagesRead = ringBuffer.read((msgType, buf, idx, len) -> {
            long start = System.nanoTime();
            processOrder(buf, idx, len);
            latencyNs.setOrdered(System.nanoTime() - start);
        });
        
        if (messagesRead > 0) {
            messagesProcessed.getAndAdd(messagesRead);
            work += messagesRead;
        }
        return work;
    }
}

CountersReader — Reading from Another Process

The counters can be read from another process via memory-mapped files:

// Monitoring process — reads counters published by the application
CountersReader countersReader = new CountersReader(metaDataBuffer, valuesBuffer);

// Scan all allocated counters
countersReader.forEach((counterId, typeId, keyBuffer, label) -> {
    long value = countersReader.getCounterValue(counterId);
    System.out.println(label + " = " + value);
});

// Output:
// messages-processed = 150235
// errors-total = 3

This is how Aeron exposes its internal telemetry — the Aeron stat utility reads counters from a shared memory file without any RPC or serialization.


Real-World Patterns

Pattern 1: Application Metrics Dashboard

// In your application:
AtomicCounter ordersReceived = countersManager.newCounter("orders.received");
AtomicCounter ordersFilled   = countersManager.newCounter("orders.filled");
AtomicCounter ordersRejected = countersManager.newCounter("orders.rejected");
AtomicCounter p99LatencyNs   = countersManager.newCounter("latency.p99.ns");

// In your processing loop:
ordersReceived.increment();
if (fill(order)) {
    ordersFilled.increment();
} else {
    ordersRejected.increment();
}

// Monitoring tool reads these from another process — zero overhead!

Pattern 2: Broadcast Configuration Updates

// Configuration service broadcasts updates to all microservices
BroadcastTransmitter configBroadcast = new BroadcastTransmitter(sharedBuffer);

// Encode config update
UnsafeBuffer msg = new UnsafeBuffer(new byte[256]);
msg.putStringAscii(0, "max.connections=500");
configBroadcast.transmit(CONFIG_UPDATE_MSG_TYPE, msg, 0, msg.capacity());

// Each microservice has its own CopyBroadcastReceiver
CopyBroadcastReceiver receiver = new CopyBroadcastReceiver(
    new BroadcastReceiver(sharedBuffer)
);
receiver.receive((type, buf, idx, len) -> {
    String config = buf.getStringAscii(idx);
    applyConfig(config);
});

Pattern 3: Heartbeat Monitoring

// Worker process: write heartbeat timestamp
AtomicCounter heartbeat = countersManager.newCounter("worker.heartbeat.ns");

// In duty cycle:
heartbeat.setOrdered(System.nanoTime());

// Monitor process: check liveness
long lastHeartbeat = countersReader.getCounterValue(heartbeatCounterId);
long elapsed = System.nanoTime() - lastHeartbeat;
if (elapsed > TimeUnit.SECONDS.toNanos(5)) {
    alert("Worker appears dead! Last heartbeat: " + elapsed + " ns ago");
}

Summary

The one-liner: Agrona broadcast buffers provide one-to-many messaging where slow receivers are lapped (not blocked), while counters provide cache-line-aligned off-heap atomic values for zero-overhead cross-process telemetry.