Using the Agrona library (Part 7)
Timers, Clocks & Error Handling
"The utilities that make your low-latency service production-ready: timer wheels for scheduling, cached clocks for speed, and distinct error logs for sanity."
This final article covers Agrona's utility classes—the infrastructure components that turn a prototype into a production system.
DeadlineTimerWheel — O(1) Timer Scheduling
DeadlineTimerWheel is a hierarchical timer wheel that provides O(1) timer registration and cancellation. It's far more efficient than java.util.Timer or ScheduledThreadPoolExecutor when you have thousands of pending timers.
How a Timer Wheel Works
Usage
import org.agrona.DeadlineTimerWheel;
import java.util.concurrent.TimeUnit;
DeadlineTimerWheel timerWheel = new DeadlineTimerWheel(
TimeUnit.MILLISECONDS, // Time unit
System.currentTimeMillis(), // Start time
128, // Ticks per wheel (power of 2)
10 // Tick duration in time units (10ms per tick)
);
// Schedule a timer (returns a timer ID for cancellation)
long timerId1 = timerWheel.scheduleTimer(
System.currentTimeMillis() + 500 // Deadline: 500ms from now
);
long timerId2 = timerWheel.scheduleTimer(
System.currentTimeMillis() + 2000 // Deadline: 2 seconds from now
);
// Cancel a timer — O(1)!
boolean cancelled = timerWheel.cancelTimer(timerId1);
// In your duty cycle — advance time and fire expired timers
int expired = timerWheel.poll(
System.currentTimeMillis(), // Current time
(timeUnit, now, timerId) -> { // Handler for expired timers
System.out.println("Timer fired: " + timerId);
// Return true to reschedule, false to consume
return false;
},
Integer.MAX_VALUE // Max timers to process
);
Connection Timeout Example
class ConnectionManager implements Agent {
private final DeadlineTimerWheel timerWheel;
private final Long2ObjectHashMap<Connection> connections;
void onNewConnection(Connection conn) {
long timerId = timerWheel.scheduleTimer(
System.currentTimeMillis() + 30_000 // 30-second timeout
);
conn.setTimerId(timerId);
connections.put(conn.id(), conn);
}
void onMessageReceived(Connection conn) {
// Reset the timeout
timerWheel.cancelTimer(conn.getTimerId());
long newTimerId = timerWheel.scheduleTimer(
System.currentTimeMillis() + 30_000
);
conn.setTimerId(newTimerId);
}
@Override
public int doWork() {
return timerWheel.poll(
System.currentTimeMillis(),
(timeUnit, now, timerId) -> {
// Timer expired — close the connection
Connection conn = findByTimerId(timerId);
if (conn != null) {
conn.close("Timeout");
connections.remove(conn.id());
}
return false;
},
10
);
}
}
Clocks — CachedEpochClock & SystemEpochClock
The Problem with System.currentTimeMillis()
Calling System.currentTimeMillis() on every message is expensive — it's a system call that involves crossing the user/kernel boundary:
CachedEpochClock — Read the Clock Once Per Tick
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.SystemEpochClock;
// System clock — reads the real time (expensive)
EpochClock systemClock = SystemEpochClock.INSTANCE;
long now = systemClock.time(); // System.currentTimeMillis()
// Cached clock — returns the last cached value (cheap!)
CachedEpochClock cachedClock = new CachedEpochClock();
// In your duty cycle:
class MyAgent implements Agent {
private final CachedEpochClock clock = new CachedEpochClock();
@Override
public int doWork() {
// Update the clock ONCE per duty cycle iteration
clock.update(System.currentTimeMillis());
// All messages in this batch use the same timestamp
return processMessages(clock);
}
void processMessages(EpochClock clock) {
long now = clock.time(); // Returns cached value — ~1ns instead of ~30ns
// Process 1000 messages, all with the same "now"
}
}
CachedNanoClock — For High-Resolution Timing
import org.agrona.concurrent.CachedNanoClock;
CachedNanoClock nanoClock = new CachedNanoClock();
nanoClock.update(System.nanoTime());
long nanos = nanoClock.nanoTime(); // Cached nanosecond timestamp
Why Two Clock Interfaces?
// EpochClock — milliseconds since Unix epoch
interface EpochClock {
long time(); // like System.currentTimeMillis()
}
// NanoClock — nanoseconds (monotonic, no wall-clock meaning)
interface NanoClock {
long nanoTime(); // like System.nanoTime()
}
Both are interfaces, making them easy to mock for testing:
// In tests — control time precisely!
CachedEpochClock testClock = new CachedEpochClock();
testClock.update(1000L);
myService.setEpochClock(testClock);
// Advance time by 5 seconds
testClock.advance(5000L);
// Now test timeout behavior without Thread.sleep()!
DistinctErrorLog — Deduplication for Errors
In production, the same error often repeats thousands of times. Traditional logging fills disks. Agrona's DistinctErrorLog records each distinct error only once, with a count and timestamps:
import org.agrona.concurrent.errors.DistinctErrorLog;
UnsafeBuffer errorBuffer = new UnsafeBuffer(
ByteBuffer.allocateDirect(1024 * 1024) // 1 MB for error log
);
DistinctErrorLog errorLog = new DistinctErrorLog(errorBuffer, SystemEpochClock.INSTANCE);
// Record errors — duplicates are counted, not re-logged
try {
riskyOperation();
} catch (Exception e) {
errorLog.record(e); // First time: stores full stack trace
}
// Same exception again:
try {
riskyOperation();
} catch (Exception e) {
errorLog.record(e); // Just increments counter + updates timestamp
}
Reading Errors from Another Process
import org.agrona.concurrent.errors.ErrorLogReader;
// Monitoring process reads the error log
ErrorLogReader.read(errorBuffer, (observationCount, firstObservedTimestamp,
lastObservedTimestamp, encodedException) -> {
System.out.printf("Error (seen %d times, last at %d):%n%s%n",
observationCount, lastObservedTimestamp, encodedException);
});
IdGenerator — Distributed Unique IDs
Agrona includes a lock-free Snowflake-style ID generator for generating unique IDs across distributed nodes:
import org.agrona.generation.IdGenerator;
// Each node gets a unique nodeId (0-1023)
IdGenerator idGenerator = new IdGenerator(nodeId);
// Generate unique, roughly time-ordered IDs
long uniqueId = idGenerator.nextId();
// Format: [timestamp bits][nodeId bits][sequence bits]
// Guaranteed unique across nodes without coordination!
CloseHelper & Other Utilities
CloseHelper — Safe Resource Cleanup
import org.agrona.CloseHelper;
// Closes AutoCloseable without throwing — logs errors instead
CloseHelper.quietClose(agentRunner);
CloseHelper.quietClose(counter1);
CloseHelper.quietClose(counter2);
// Close multiple at once
CloseHelper.closeAll(agentRunner, counter1, counter2, ringBuffer);
BitUtil — Bit Manipulation
import org.agrona.BitUtil;
// Align to cache line boundaries
int aligned = BitUtil.align(100, 64); // 128 (next multiple of 64)
// Check power of 2
BitUtil.isPowerOfTwo(1024); // true
BitUtil.isPowerOfTwo(1000); // false
// Find next power of 2
BitUtil.findNextPositivePowerOfTwo(1000); // 1024
BufferUtil — Buffer Operations
import org.agrona.BufferUtil;
// Allocate page-aligned direct ByteBuffer
ByteBuffer aligned = BufferUtil.allocateDirectAligned(4096, 4096);
// Useful for O_DIRECT file IO and DMA operations
// Free a direct ByteBuffer immediately (don't wait for GC)
BufferUtil.free(directByteBuffer);
SystemUtil — System Information
import org.agrona.SystemUtil;
// Load system properties from a file
SystemUtil.loadPropertiesFiles("config.properties");
// Parse durations and sizes
long durationNs = SystemUtil.parseDuration("PT5S", TimeUnit.NANOSECONDS);
long bytes = SystemUtil.parseSize("256m"); // 256 * 1024 * 1024
// Format durations and sizes
String formatted = SystemUtil.formatDuration(TimeUnit.SECONDS.toNanos(125));
String size = SystemUtil.formatSize(1024 * 1024 * 256);
Putting It All Together
Here's how these components compose into a production service:
public class ProductionService implements Agent {
private final OneToOneRingBuffer inbound;
private final OneToOneRingBuffer outbound;
private final DeadlineTimerWheel timerWheel;
private final CachedEpochClock clock;
private final AtomicCounter messagesProcessed;
private final DistinctErrorLog errorLog;
private final Int2IntHashMap instrumentPrices;
@Override
public int doWork() {
int work = 0;
// Update clock once per tick
clock.update(System.currentTimeMillis());
// Process inbound messages
work += inbound.read((msgTypeId, buf, idx, len) -> {
try {
processAndPublish(buf, idx, len);
messagesProcessed.increment();
} catch (Exception e) {
errorLog.record(e);
}
}, 10);
// Fire expired timers
work += timerWheel.poll(clock.time(), this::onTimerExpired, 5);
return work;
}
@Override
public String roleName() {
return "production-service";
}
public static void main(String[] args) {
// ... build all components ...
ProductionService service = new ProductionService(/* ... */);
AgentRunner runner = new AgentRunner(
new BackoffIdleStrategy(100, 10, 1, 1_000_000),
(t) -> { /* log error */ },
errorCounter,
service
);
AgentRunner.startOnThread(runner);
ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
barrier.await();
CloseHelper.closeAll(runner, service);
}
}
Summary
The one-liner: Agrona's utilities — timer wheels for O(1) scheduling, cached clocks for cheap timestamps, distinct error logs for deduplicated monitoring, and Snowflake ID generators — are the production-grade infrastructure pieces that complement the core buffers, collections, and agents.