001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import java.io.PrintStream; 022import java.time.Duration; 023import java.time.Instant; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.function.LongSupplier; 030import java.util.concurrent.Executors; 031import java.lang.ref.WeakReference; 032import java.util.concurrent.atomic.AtomicReference; 033 034/** 035 * TPS can be used to measure Transactions Per Second (or transactions during 036 * other period of time). 037 * 038 * <p>It can operate in two different modes: 039 * <ul> 040 * <li>Auto update.</li> 041 * <li>Manual update.</li> 042 * </ul> 043 * 044 * <p>When operating in <b>auto update</b> mode, a shared scheduler is used and the 045 * number of transactions (calls to tick()) is automatically calculated for 046 * every period. In this mode, callers should invoke <b>stop()</b> (or 047 * <b>close()</b>) when the TPS object is no longer needed to cancel its scheduled 048 * task. The shared scheduler thread is daemon and reused across instances.</p> 049 * 050 * <p>When operating in <b>manual update</b> mode, user has to call one of its 051 * floatValue() or intValue() method at regular intervals. The returned value 052 * will be the average TPS for the given period since the last call.</p> 053 * 054 * @author Alejandro Revilla, Jeronimo Paolleti and Thiago Moretto 055 * @since 1.6.7 r2912 056 */ 057@SuppressWarnings("unused") 058public class TPS implements Loggeable, AutoCloseable { 059 /** 060 * If set (via setSimulatedNanoTime), getNanoTime() returns this value. 061 * This is intended for deterministic tests. 062 */ 063 protected volatile long simulatedNanoTime = 0L; 064 065 static final long FROM_NANOS = 1_000_000L; // nanos -> millis conversion factor (kept for compatibility) 066 067 private final AtomicLong count = new AtomicLong(0L); 068 069 private final Duration period; 070 071 private volatile boolean autoupdate; 072 073 // Published metrics: written under lock, read lock-free. 074 private volatile float tps; 075 private volatile float avg; 076 private volatile int peak; 077 private volatile Instant peakWhen; 078 079 // State for manual/auto interval measurement (monotonic). 080 private volatile long lastSampleNanos; 081 082 // State for getElapsed() (wall clock), updated when sampling. 083 private volatile Instant startWall; 084 085 // Average computation state (guarded by lock). 086 private long readings; 087 088 // Auto-update scheduler (shared across TPS instances). 089 private static final ScheduledExecutorService SHARED_SCHEDULER = Executors.newSingleThreadScheduledExecutor( 090 new ThreadFactory() { 091 @Override 092 public Thread newThread(Runnable r) { 093 Thread t = new Thread(r, "TPS-auto-update"); 094 t.setDaemon(true); 095 return t; 096 } 097 } 098 ); 099 private ScheduledFuture<?> scheduledTask; 100 101 // Pluggable time source (monotonic nanos). 102 private final LongSupplier nanoTimeSource; 103 104 /** Default constructor. */ 105 public TPS() { 106 this(1000L, false); 107 } 108 109 /** 110 * Creates a TPS meter with the default one-second period. 111 * 112 * @param autoupdate true to auto update 113 */ 114 public TPS(boolean autoupdate) { 115 this(1000L, autoupdate); 116 } 117 118 /** 119 * Creates a TPS meter with the given sampling period. 120 * 121 * @param period sampling period in milliseconds 122 * @param autoupdate true to auto update 123 */ 124 public TPS(final long period, boolean autoupdate) { 125 this(period, autoupdate, null); 126 } 127 128 /** 129 * Internal constructor that allows injecting a nano time source for tests. 130 * If nanoTimeSource is null, System.nanoTime() is used (or simulatedNanoTime when set). 131 */ 132 TPS(final long periodMillis, boolean autoupdate, LongSupplier nanoTimeSource) { 133 if (periodMillis <= 0L) 134 throw new IllegalArgumentException("period must be > 0 ms"); 135 136 this.period = Duration.ofMillis(periodMillis); 137 this.autoupdate = autoupdate; 138 139 this.nanoTimeSource = nanoTimeSource != null ? nanoTimeSource : this::getNanoTime; 140 141 Instant nowWall = Instant.now(); 142 this.startWall = nowWall; 143 this.peakWhen = nowWall; 144 145 long nowNanos = this.nanoTimeSource.getAsLong(); 146 this.lastSampleNanos = nowNanos; 147 148 if (autoupdate) { 149 startScheduler(periodMillis); 150 } 151 } 152 153 /** Increments the transaction counter for the current sampling window. */ 154 public void tick() { 155 count.incrementAndGet(); 156 } 157 158 /** 159 * Returns the current transactions-per-second value. 160 * 161 * @return current TPS as a floating-point value 162 */ 163 public float floatValue() { 164 if (autoupdate) { 165 return tps; 166 } else { 167 return calcTPSIfDue(); 168 } 169 } 170 171 /** 172 * Returns the current transactions-per-second value rounded to an integer. 173 * 174 * @return current TPS rounded to an integer 175 */ 176 public int intValue() { 177 return Math.round(floatValue()); 178 } 179 180 /** 181 * Returns the average TPS across sampled intervals. 182 * 183 * @return average TPS 184 */ 185 public float getAvg() { 186 return avg; 187 } 188 189 /** 190 * Returns the highest recorded TPS peak. 191 * 192 * @return peak TPS 193 */ 194 public int getPeak() { 195 return peak; 196 } 197 198 /** 199 * Returns the wall-clock time when the peak TPS was recorded. 200 * 201 * @return peak timestamp in epoch milliseconds, or {@code -1} if unavailable 202 */ 203 public long getPeakWhen() { 204 Instant pw = peakWhen; 205 return pw != null ? pw.toEpochMilli() : -1L; 206 } 207 208 /** 209 * resets average and peak. 210 */ 211 public void reset() { 212 synchronized (this) { 213 avg = 0f; 214 peak = 0; 215 peakWhen = null; 216 readings = 0L; 217 } 218 } 219 220 /** 221 * Returns the configured sampling period. 222 * 223 * @return period in milliseconds 224 */ 225 public long getPeriod() { 226 return period.toMillis(); 227 } 228 229 /** 230 * Returns elapsed wall-clock time since the current sampling window started. 231 * 232 * @return elapsed time in milliseconds 233 */ 234 public long getElapsed() { 235 // Wall-clock elapsed since last sampling start (manual) or since construction (auto). 236 Instant sw = startWall; 237 return sw != null ? Duration.between(sw, Instant.now()).toMillis() : 0L; 238 } 239 240 public String toString() { 241 return String.format("tps=%d, peak=%d, avg=%.2f", intValue(), getPeak(), getAvg()); 242 } 243 244 /** Stops auto-update scheduling and leaves the meter in manual mode. */ 245 public void stop() { 246 synchronized (this) { 247 autoupdate = false; // can still use it in manual mode 248 if (scheduledTask != null) { 249 scheduledTask.cancel(false); 250 scheduledTask = null; 251 } 252 } 253 } 254 255 @Override 256 public void close() { 257 stop(); 258 } 259 260 public void dump(PrintStream p, String indent) { 261 p.println(indent 262 + "<tps" 263 + (autoupdate ? " auto='true'>" : ">") 264 + this 265 + "</tps>"); 266 } 267 268 private void startScheduler(long periodMillis) { 269 final WeakReference<TPS> ref = new WeakReference<>(this); 270 final AtomicReference<ScheduledFuture<?>> self = new AtomicReference<>(); 271 272 // scheduleAtFixedRate keeps cadence; we still compute based on actual elapsed nanos 273 // to be robust against pauses or scheduling delays. 274 ScheduledFuture<?> future = SHARED_SCHEDULER.scheduleAtFixedRate(() -> { 275 TPS t = ref.get(); 276 if (t == null) { 277 ScheduledFuture<?> f = self.get(); 278 if (f != null) { 279 f.cancel(false); 280 } 281 return; 282 } 283 try { 284 t.calcTPSSampled(); 285 } catch (Throwable ignored) { 286 // Avoid terminating the scheduler thread due to an unchecked exception. 287 // Intentionally swallow: TPS is best-effort telemetry. 288 } 289 }, periodMillis, periodMillis, TimeUnit.MILLISECONDS); 290 291 self.set(future); 292 scheduledTask = future; 293 } 294 295 /** 296 * Manual mode: compute and publish TPS only when at least one period has elapsed. 297 */ 298 private float calcTPSIfDue() { 299 long nowNanos = nanoTimeSource.getAsLong(); 300 long elapsedNanos = nowNanos - lastSampleNanos; 301 302 if (elapsedNanos >= period.toNanos()) { 303 return calcTPS(elapsedNanos, nowNanos); 304 } 305 return tps; 306 } 307 308 /** 309 * Auto mode: compute and publish TPS every scheduler tick, based on actual elapsed nanos. 310 */ 311 private void calcTPSSampled() { 312 if (!autoupdate) 313 return; 314 315 long nowNanos = nanoTimeSource.getAsLong(); 316 long elapsedNanos = nowNanos - lastSampleNanos; 317 318 // If for any reason elapsed is non-positive (e.g., simulated time misuse), skip safely. 319 if (elapsedNanos <= 0L) 320 return; 321 322 calcTPS(elapsedNanos, nowNanos); 323 } 324 325 /** 326 * Computes TPS as transactions per second (count / elapsedSeconds) using monotonic time. 327 * Publishes tps/avg/peak/peakWhen atomically under lock; count is atomically captured. 328 */ 329 private float calcTPS(long elapsedNanos, long nowNanos) { 330 final long c = count.getAndSet(0L); // atomic capture-and-reset; avoids lost ticks 331 final float newTps = (c <= 0L) ? 0f : (c * 1_000_000_000f) / (float) elapsedNanos; 332 333 synchronized (this) { 334 tps = newTps; 335 336 // Online average of sampled TPS values. 337 long r = readings++; 338 avg = (r * avg + newTps) / (r + 1L); 339 340 int rounded = Math.round(newTps); 341 if (rounded > peak) { 342 peak = rounded; 343 peakWhen = Instant.now(); 344 } 345 346 lastSampleNanos = nowNanos; 347 startWall = Instant.now(); 348 return tps; 349 } 350 } 351 352 /** 353 * Overrides the monotonic nano time source for deterministic tests. 354 * 355 * @param simulatedNanoTime simulated monotonic time value in nanoseconds 356 */ 357 public void setSimulatedNanoTime(long simulatedNanoTime) { 358 // This is a monotonic nano time value intended for tests. 359 // We do not attempt to convert it to epoch-based Instants. 360 if (this.simulatedNanoTime == 0L) { 361 long now = simulatedNanoTime; 362 if (now > 0L) { 363 lastSampleNanos = now; 364 } 365 } 366 this.simulatedNanoTime = simulatedNanoTime; 367 } 368 369 /** 370 * Returns the current monotonic time in nanoseconds. 371 * 372 * @return monotonic nano time, simulated when configured 373 */ 374 protected long getNanoTime() { 375 long s = simulatedNanoTime; 376 return s > 0L ? s : System.nanoTime(); 377 } 378}