Skip to main content

Command Palette

Search for a command to run...

Using the Agrona library (Part 7)

Timers, Clocks & Error Handling

Published
6 min read

"The utilities that make your low-latency service production-ready: timer wheels for scheduling, cached clocks for speed, and distinct error logs for sanity."

This final article covers Agrona's utility classes—the infrastructure components that turn a prototype into a production system.


DeadlineTimerWheel — O(1) Timer Scheduling

DeadlineTimerWheel is a hierarchical timer wheel that provides O(1) timer registration and cancellation. It's far more efficient than java.util.Timer or ScheduledThreadPoolExecutor when you have thousands of pending timers.

How a Timer Wheel Works

Usage

import org.agrona.DeadlineTimerWheel;
import java.util.concurrent.TimeUnit;

DeadlineTimerWheel timerWheel = new DeadlineTimerWheel(
    TimeUnit.MILLISECONDS,    // Time unit
    System.currentTimeMillis(), // Start time
    128,                       // Ticks per wheel (power of 2)
    10                         // Tick duration in time units (10ms per tick)
);

// Schedule a timer (returns a timer ID for cancellation)
long timerId1 = timerWheel.scheduleTimer(
    System.currentTimeMillis() + 500  // Deadline: 500ms from now
);

long timerId2 = timerWheel.scheduleTimer(
    System.currentTimeMillis() + 2000  // Deadline: 2 seconds from now
);

// Cancel a timer — O(1)!
boolean cancelled = timerWheel.cancelTimer(timerId1);

// In your duty cycle — advance time and fire expired timers
int expired = timerWheel.poll(
    System.currentTimeMillis(),          // Current time
    (timeUnit, now, timerId) -> {        // Handler for expired timers
        System.out.println("Timer fired: " + timerId);
        
        // Return true to reschedule, false to consume
        return false;
    },
    Integer.MAX_VALUE                    // Max timers to process
);

Connection Timeout Example

class ConnectionManager implements Agent {
    private final DeadlineTimerWheel timerWheel;
    private final Long2ObjectHashMap<Connection> connections;
    
    void onNewConnection(Connection conn) {
        long timerId = timerWheel.scheduleTimer(
            System.currentTimeMillis() + 30_000  // 30-second timeout
        );
        conn.setTimerId(timerId);
        connections.put(conn.id(), conn);
    }
    
    void onMessageReceived(Connection conn) {
        // Reset the timeout
        timerWheel.cancelTimer(conn.getTimerId());
        long newTimerId = timerWheel.scheduleTimer(
            System.currentTimeMillis() + 30_000
        );
        conn.setTimerId(newTimerId);
    }
    
    @Override
    public int doWork() {
        return timerWheel.poll(
            System.currentTimeMillis(),
            (timeUnit, now, timerId) -> {
                // Timer expired — close the connection
                Connection conn = findByTimerId(timerId);
                if (conn != null) {
                    conn.close("Timeout");
                    connections.remove(conn.id());
                }
                return false;
            },
            10
        );
    }
}

Clocks — CachedEpochClock & SystemEpochClock

The Problem with System.currentTimeMillis()

Calling System.currentTimeMillis() on every message is expensive — it's a system call that involves crossing the user/kernel boundary:

CachedEpochClock — Read the Clock Once Per Tick

import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.SystemEpochClock;

// System clock — reads the real time (expensive)
EpochClock systemClock = SystemEpochClock.INSTANCE;
long now = systemClock.time(); // System.currentTimeMillis()

// Cached clock — returns the last cached value (cheap!)
CachedEpochClock cachedClock = new CachedEpochClock();

// In your duty cycle:
class MyAgent implements Agent {
    private final CachedEpochClock clock = new CachedEpochClock();
    
    @Override
    public int doWork() {
        // Update the clock ONCE per duty cycle iteration
        clock.update(System.currentTimeMillis());
        
        // All messages in this batch use the same timestamp
        return processMessages(clock);
    }
    
    void processMessages(EpochClock clock) {
        long now = clock.time(); // Returns cached value — ~1ns instead of ~30ns
        // Process 1000 messages, all with the same "now"
    }
}

CachedNanoClock — For High-Resolution Timing

import org.agrona.concurrent.CachedNanoClock;

CachedNanoClock nanoClock = new CachedNanoClock();
nanoClock.update(System.nanoTime());
long nanos = nanoClock.nanoTime(); // Cached nanosecond timestamp

Why Two Clock Interfaces?

// EpochClock — milliseconds since Unix epoch
interface EpochClock {
    long time(); // like System.currentTimeMillis()
}

// NanoClock — nanoseconds (monotonic, no wall-clock meaning)
interface NanoClock {
    long nanoTime(); // like System.nanoTime()
}

Both are interfaces, making them easy to mock for testing:

// In tests — control time precisely!
CachedEpochClock testClock = new CachedEpochClock();
testClock.update(1000L);
myService.setEpochClock(testClock);

// Advance time by 5 seconds
testClock.advance(5000L);
// Now test timeout behavior without Thread.sleep()!

DistinctErrorLog — Deduplication for Errors

In production, the same error often repeats thousands of times. Traditional logging fills disks. Agrona's DistinctErrorLog records each distinct error only once, with a count and timestamps:

import org.agrona.concurrent.errors.DistinctErrorLog;

UnsafeBuffer errorBuffer = new UnsafeBuffer(
    ByteBuffer.allocateDirect(1024 * 1024)  // 1 MB for error log
);

DistinctErrorLog errorLog = new DistinctErrorLog(errorBuffer, SystemEpochClock.INSTANCE);

// Record errors — duplicates are counted, not re-logged
try {
    riskyOperation();
} catch (Exception e) {
    errorLog.record(e);  // First time: stores full stack trace
}

// Same exception again:
try {
    riskyOperation();
} catch (Exception e) {
    errorLog.record(e);  // Just increments counter + updates timestamp
}

Reading Errors from Another Process

import org.agrona.concurrent.errors.ErrorLogReader;

// Monitoring process reads the error log
ErrorLogReader.read(errorBuffer, (observationCount, firstObservedTimestamp,
        lastObservedTimestamp, encodedException) -> {
    System.out.printf("Error (seen %d times, last at %d):%n%s%n",
        observationCount, lastObservedTimestamp, encodedException);
});

IdGenerator — Distributed Unique IDs

Agrona includes a lock-free Snowflake-style ID generator for generating unique IDs across distributed nodes:

import org.agrona.generation.IdGenerator;

// Each node gets a unique nodeId (0-1023)
IdGenerator idGenerator = new IdGenerator(nodeId);

// Generate unique, roughly time-ordered IDs
long uniqueId = idGenerator.nextId();
// Format: [timestamp bits][nodeId bits][sequence bits]
// Guaranteed unique across nodes without coordination!

CloseHelper & Other Utilities

CloseHelper — Safe Resource Cleanup

import org.agrona.CloseHelper;

// Closes AutoCloseable without throwing — logs errors instead
CloseHelper.quietClose(agentRunner);
CloseHelper.quietClose(counter1);
CloseHelper.quietClose(counter2);

// Close multiple at once
CloseHelper.closeAll(agentRunner, counter1, counter2, ringBuffer);

BitUtil — Bit Manipulation

import org.agrona.BitUtil;

// Align to cache line boundaries
int aligned = BitUtil.align(100, 64);  // 128 (next multiple of 64)

// Check power of 2
BitUtil.isPowerOfTwo(1024);  // true
BitUtil.isPowerOfTwo(1000);  // false

// Find next power of 2
BitUtil.findNextPositivePowerOfTwo(1000);  // 1024

BufferUtil — Buffer Operations

import org.agrona.BufferUtil;

// Allocate page-aligned direct ByteBuffer
ByteBuffer aligned = BufferUtil.allocateDirectAligned(4096, 4096);
// Useful for O_DIRECT file IO and DMA operations

// Free a direct ByteBuffer immediately (don't wait for GC)
BufferUtil.free(directByteBuffer);

SystemUtil — System Information

import org.agrona.SystemUtil;

// Load system properties from a file
SystemUtil.loadPropertiesFiles("config.properties");

// Parse durations and sizes
long durationNs = SystemUtil.parseDuration("PT5S", TimeUnit.NANOSECONDS);
long bytes = SystemUtil.parseSize("256m");  // 256 * 1024 * 1024

// Format durations and sizes
String formatted = SystemUtil.formatDuration(TimeUnit.SECONDS.toNanos(125));
String size = SystemUtil.formatSize(1024 * 1024 * 256);

Putting It All Together

Here's how these components compose into a production service:

public class ProductionService implements Agent {
    private final OneToOneRingBuffer inbound;
    private final OneToOneRingBuffer outbound;
    private final DeadlineTimerWheel timerWheel;
    private final CachedEpochClock clock;
    private final AtomicCounter messagesProcessed;
    private final DistinctErrorLog errorLog;
    private final Int2IntHashMap instrumentPrices;
    
    @Override
    public int doWork() {
        int work = 0;
        
        // Update clock once per tick
        clock.update(System.currentTimeMillis());
        
        // Process inbound messages
        work += inbound.read((msgTypeId, buf, idx, len) -> {
            try {
                processAndPublish(buf, idx, len);
                messagesProcessed.increment();
            } catch (Exception e) {
                errorLog.record(e);
            }
        }, 10);
        
        // Fire expired timers
        work += timerWheel.poll(clock.time(), this::onTimerExpired, 5);
        
        return work;
    }
    
    @Override
    public String roleName() {
        return "production-service";
    }
    
    public static void main(String[] args) {
        // ... build all components ...
        
        ProductionService service = new ProductionService(/* ... */);
        
        AgentRunner runner = new AgentRunner(
            new BackoffIdleStrategy(100, 10, 1, 1_000_000),
            (t) -> { /* log error */ },
            errorCounter,
            service
        );
        
        AgentRunner.startOnThread(runner);
        
        ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
        barrier.await();
        CloseHelper.closeAll(runner, service);
    }
}

Summary

The one-liner: Agrona's utilities — timer wheels for O(1) scheduling, cached clocks for cheap timestamps, distinct error logs for deduplicated monitoring, and Snowflake ID generators — are the production-grade infrastructure pieces that complement the core buffers, collections, and agents.