Using the Agrona library (Part 7)
Timers, Clocks & Error Handling
"The utilities that make your low-latency service production-ready: timer wheels for scheduling, cached clocks for speed, and distinct error logs for sanity."
This final article covers Agrona's utility classes—the infrastructure components that turn a prototype into a production system.
DeadlineTimerWheel — O(1) Timer Scheduling
DeadlineTimerWheel is a timer wheel that provides O(1) timer registration and cancellation. It's far more efficient than java.util.Timer or ScheduledThreadPoolExecutor when you have thousands of pending timers.
⚠️ Not thread-safe.
DeadlineTimerWheelis designed for the single-threaded duty-cycle model from the agents article: schedule, cancel, and poll from the same thread (or guard it externally).
How a Timer Wheel Works
Usage
import org.agrona.DeadlineTimerWheel;
import java.util.concurrent.TimeUnit;
DeadlineTimerWheel timerWheel = new DeadlineTimerWheel(
TimeUnit.MILLISECONDS, // Time unit
System.currentTimeMillis(), // Start time
16, // Tick resolution in time units (16ms per tick, power of 2)
256 // Ticks per wheel (power of 2)
);
// Schedule a timer (returns a timer ID for cancellation)
long timerId1 = timerWheel.scheduleTimer(
System.currentTimeMillis() + 500 // Deadline: 500ms from now
);
long timerId2 = timerWheel.scheduleTimer(
System.currentTimeMillis() + 2000 // Deadline: 2 seconds from now
);
// Cancel a timer — O(1)!
boolean cancelled = timerWheel.cancelTimer(timerId1);
// In your duty cycle — advance time and fire expired timers
int expired = timerWheel.poll(
System.currentTimeMillis(), // Current time
(timeUnit, now, timerId) -> { // Handler for expired timers
System.out.println("Timer fired: " + timerId);
// Return true to consume the timer.
// Return false to keep it active and abort further polling
// (useful for backpressure — poll again on the next duty cycle).
return true;
},
Integer.MAX_VALUE // Max timers to process
);
Two things to watch when constructing the wheel:
- Both
tickResolutionandticksPerWheelmust be powers of 2 — the wheel uses bit masking instead of modulo for speed, and the constructor throwsIllegalArgumentExceptionotherwise. - Tick resolution is your accuracy/overhead trade-off. Timers only fire on tick boundaries, so a 16ms resolution means a deadline can fire up to ~16ms late. Finer resolution costs more wheel slots (or more wrap-around scans) for long deadlines; pick the coarsest resolution your timeouts can tolerate.
Connection Timeout Example
import org.agrona.DeadlineTimerWheel;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import java.util.concurrent.TimeUnit;
class ConnectionManager implements Agent {
private final DeadlineTimerWheel timerWheel = new DeadlineTimerWheel(
TimeUnit.MILLISECONDS, System.currentTimeMillis(), 64, 1024);
private final Long2ObjectHashMap<Connection> connections = new Long2ObjectHashMap<>();
private final Long2ObjectHashMap<Connection> connectionByTimerId = new Long2ObjectHashMap<>();
void onNewConnection(Connection conn) {
long timerId = timerWheel.scheduleTimer(
System.currentTimeMillis() + 30_000 // 30-second timeout
);
conn.setTimerId(timerId);
connections.put(conn.id(), conn);
connectionByTimerId.put(timerId, conn);
}
void onMessageReceived(Connection conn) {
// Reset the timeout
timerWheel.cancelTimer(conn.getTimerId());
connectionByTimerId.remove(conn.getTimerId());
long newTimerId = timerWheel.scheduleTimer(
System.currentTimeMillis() + 30_000
);
conn.setTimerId(newTimerId);
connectionByTimerId.put(newTimerId, conn);
}
@Override
public int doWork() {
return timerWheel.poll(
System.currentTimeMillis(),
(timeUnit, now, timerId) -> {
// Timer expired — close the connection
Connection conn = connectionByTimerId.remove(timerId);
if (conn != null) {
conn.close("Timeout");
connections.remove(conn.id());
}
return true; // consume the timer
},
10
);
}
@Override
public String roleName() {
return "connection-manager";
}
}
Note the second Long2ObjectHashMap keyed by timer ID — the wheel hands you back only the timerId on expiry, so you need your own reverse index to find the owning connection. If a connection churns messages frequently, the cancel-and-reschedule pattern above is O(1) per message, which is exactly what the wheel is built for.
Clocks — CachedEpochClock & SystemEpochClock
The Problem with System.currentTimeMillis()
Calling System.currentTimeMillis() on every message is comparatively expensive — depending on the OS and clock source it's a VDSO call or a full system call, and on a typical Linux box it costs on the order of tens of nanoseconds versus a sub-nanosecond field read. (The exact numbers vary by platform and clock source — measure with JMH on your own hardware before trusting anyone's figures, including these.)
CachedEpochClock — Read the Clock Once Per Tick
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SystemEpochClock;
// System clock — reads the real time (expensive)
EpochClock systemClock = SystemEpochClock.INSTANCE;
long now = systemClock.time(); // System.currentTimeMillis()
// Cached clock — returns the last cached value (cheap!)
CachedEpochClock cachedClock = new CachedEpochClock();
// In your duty cycle:
class MyAgent implements Agent {
private final CachedEpochClock clock = new CachedEpochClock();
@Override
public int doWork() {
// Update the clock ONCE per duty cycle iteration
clock.update(System.currentTimeMillis());
// All messages in this batch use the same timestamp
return processMessages(clock);
}
int processMessages(EpochClock clock) {
long now = clock.time(); // Returns cached value — a field read, not a syscall
// Process 1000 messages, all with the same "now"
return 0;
}
@Override
public String roleName() {
return "my-agent";
}
}
CachedEpochClock follows the single-writer principle: one thread calls update() (your duty cycle), while time() can be read safely from any thread. The staleness of the timestamp is bounded by your duty-cycle iteration time — fine for message timestamps and timeout checks, not for measuring sub-millisecond latencies (use a NanoClock for that).
CachedNanoClock — For High-Resolution Timing
import org.agrona.concurrent.CachedNanoClock;
CachedNanoClock nanoClock = new CachedNanoClock();
nanoClock.update(System.nanoTime());
long nanos = nanoClock.nanoTime(); // Cached nanosecond timestamp
Why Two Clock Interfaces?
// EpochClock — milliseconds since Unix epoch
interface EpochClock {
long time(); // like System.currentTimeMillis()
}
// NanoClock — nanoseconds (monotonic, no wall-clock meaning)
interface NanoClock {
long nanoTime(); // like System.nanoTime()
}
Both are interfaces, making them easy to mock for testing:
// In tests — control time precisely!
CachedEpochClock testClock = new CachedEpochClock();
testClock.update(1000L);
myService.setEpochClock(testClock);
// Advance time by 5 seconds
testClock.advance(5000L);
// Now test timeout behavior without Thread.sleep()!
myService.doWork();
assertTrue(connection.isClosed()); // 30s timeout fired deterministically
DistinctErrorLog — Deduplication for Errors
In production, the same error often repeats thousands of times. Traditional logging fills disks. Agrona's DistinctErrorLog records each distinct error only once, with a count and timestamps:
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.errors.DistinctErrorLog;
import java.nio.ByteBuffer;
UnsafeBuffer errorBuffer = new UnsafeBuffer(
ByteBuffer.allocateDirect(1024 * 1024) // 1 MB for error log
);
DistinctErrorLog errorLog = new DistinctErrorLog(errorBuffer, SystemEpochClock.INSTANCE);
// Record errors — duplicates are counted, not re-logged
try {
riskyOperation();
} catch (Exception e) {
errorLog.record(e); // First time: stores full stack trace
}
// Same exception again:
try {
riskyOperation();
} catch (Exception e) {
errorLog.record(e); // Just increments counter + updates timestamp
}
Unlike the timer wheel, DistinctErrorLog is thread-safe — any thread can record(). Two operational details worth knowing:
- Errors are considered duplicates when the exception class, message, and stack trace match.
record()returnsboolean—falsemeans the buffer is full and a new distinct error couldn't be stored (existing ones still get counted). Size the buffer for your worst day, not your average one, and alert on afalsereturn.
Reading Errors from Another Process
Because the log lives in a plain buffer, you can place it in a memory-mapped file and read it from a separate monitoring process while the service runs:
import org.agrona.concurrent.errors.ErrorLogReader;
// Monitoring process reads the error log
ErrorLogReader.read(errorBuffer, (observationCount, firstObservedTimestamp,
lastObservedTimestamp, encodedException) -> {
System.out.printf("Error (seen %d times, last at %d):%n%s%n",
observationCount, lastObservedTimestamp, encodedException);
});
SnowflakeIdGenerator — Distributed Unique IDs
Agrona includes a lock-free Snowflake-style ID generator for generating unique IDs across distributed nodes. The interface is org.agrona.concurrent.IdGenerator; the implementation is SnowflakeIdGenerator:
import org.agrona.concurrent.IdGenerator;
import org.agrona.concurrent.SnowflakeIdGenerator;
// Each node gets a unique nodeId (0-1023 with the default 10 node-id bits)
IdGenerator idGenerator = new SnowflakeIdGenerator(nodeId);
// Generate unique, roughly time-ordered IDs
long uniqueId = idGenerator.nextId();
// Format: [timestamp bits][nodeId bits (10)][sequence bits (12)]
// Unique across nodes without coordination!
The defaults give you 1,024 nodes and 4,096 IDs per millisecond per node; if a node exhausts its sequence within a millisecond, nextId() busy-spins until the next one. A longer constructor lets you rebalance the node/sequence bits and supply a custom epoch offset and EpochClock:
new SnowflakeIdGenerator(
nodeIdBits, sequenceBits, nodeId, timestampOffsetMs, clock);
One caveat inherent to all Snowflake schemes: uniqueness depends on the wall clock never going backwards, so run NTP in a mode that slews rather than steps time.
CloseHelper & Other Utilities
CloseHelper — Safe Resource Cleanup
import org.agrona.CloseHelper;
// Closes an AutoCloseable, dealing with nulls; exceptions are swallowed silently
CloseHelper.quietClose(agentRunner);
// Prefer the ErrorHandler variants in production so close failures are visible
CloseHelper.close(errorHandler, agentRunner);
// Close multiple at once (with or without an ErrorHandler)
CloseHelper.closeAll(agentRunner, counter1, counter2, ringBuffer);
CloseHelper.closeAll(errorHandler, agentRunner, counter1, counter2);
BitUtil — Bit Manipulation
import org.agrona.BitUtil;
// Align to cache line boundaries
int aligned = BitUtil.align(100, 64); // 128 (next multiple of 64)
// Check power of 2
BitUtil.isPowerOfTwo(1024); // true
BitUtil.isPowerOfTwo(1000); // false
// Find next power of 2
BitUtil.findNextPositivePowerOfTwo(1000); // 1024
BufferUtil — Buffer Operations
import org.agrona.BufferUtil;
// Allocate page-aligned direct ByteBuffer
ByteBuffer aligned = BufferUtil.allocateDirectAligned(4096, 4096);
// Useful for O_DIRECT file IO and DMA operations
// Free a direct ByteBuffer immediately (don't wait for GC)
BufferUtil.free(directByteBuffer);
SystemUtil — System Information
import org.agrona.SystemUtil;
// Load system properties from files or URLs
SystemUtil.loadPropertiesFiles("config.properties");
// Parse durations and sizes (first arg is the property name, used in error messages)
long durationNs = SystemUtil.parseDuration("my.timeout", "5s"); // also "100ms", "10us", "1ns"
long bytes = SystemUtil.parseSize("my.buffer.size", "256m"); // k/m/g suffixes → bytes
// Format durations and sizes for human-readable output
String formatted = SystemUtil.formatDuration(TimeUnit.SECONDS.toNanos(125));
String size = SystemUtil.formatSize(1024 * 1024 * 256);
Putting It All Together
Here's how these components compose into a production service:
public class ProductionService implements Agent {
private final OneToOneRingBuffer inbound;
private final OneToOneRingBuffer outbound;
private final DeadlineTimerWheel timerWheel;
private final CachedEpochClock clock;
private final AtomicCounter messagesProcessed;
private final DistinctErrorLog errorLog;
private final Int2IntHashMap instrumentPrices;
@Override
public int doWork() {
int work = 0;
// Update clock once per tick
clock.update(System.currentTimeMillis());
// Process inbound messages
work += inbound.read((msgTypeId, buf, idx, len) -> {
try {
processAndPublish(buf, idx, len);
messagesProcessed.increment();
} catch (Exception e) {
errorLog.record(e);
}
}, 10);
// Fire expired timers
work += timerWheel.poll(clock.time(), this::onTimerExpired, 5);
return work;
}
@Override
public String roleName() {
return "production-service";
}
public static void main(String[] args) {
// ... build all components ...
ProductionService service = new ProductionService(/* ... */);
AgentRunner runner = new AgentRunner(
new BackoffIdleStrategy(100, 10, 1, 1_000_000),
errorLog::record, // route agent errors through the distinct error log
errorCounter,
service
);
AgentRunner.startOnThread(runner);
ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
barrier.await();
// Closing the runner stops the thread and invokes the agent's onClose()
CloseHelper.quietClose(runner);
}
}
A note on the duty-cycle shape: everything in doWork() is bounded — at most 10 messages and 5 timers per iteration. Bounded batches keep the loop responsive (timers still fire while a message flood is being drained) and make per-iteration latency predictable.
Summary
The one-liner: Agrona's utilities — timer wheels for O(1) scheduling, cached clocks for cheap timestamps, distinct error logs for deduplicated monitoring, and Snowflake ID generators — are the production-grade infrastructure pieces that complement the core buffers, collections, and agents.
Choosing quickly:
| You need… | Reach for… | Instead of… |
|---|---|---|
| Thousands of timeouts on one thread | DeadlineTimerWheel |
ScheduledThreadPoolExecutor |
| Timestamps on every message | CachedEpochClock |
System.currentTimeMillis() per call |
| Error visibility without log spam | DistinctErrorLog |
unbounded SLF4J error logging |
| Coordination-free unique IDs | SnowflakeIdGenerator |
UUID.randomUUID() (128-bit, allocates) |
References
- Agrona on GitHub — source for every class covered here
- Agrona Javadoc
- Aeron Cookbook — Agrona section — complementary coverage of the same utilities