001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import java.io.PrintStream; 022import java.time.Duration; 023import java.time.Instant; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicLong; 029import java.util.function.LongSupplier; 030import java.util.concurrent.Executors; 031import java.lang.ref.WeakReference; 032import java.util.concurrent.atomic.AtomicReference; 033 034/** 035 * TPS can be used to measure Transactions Per Second (or transactions during 036 * other period of time). 037 * 038 * <p>It can operate in two different modes: 039 * <ul> 040 * <li>Auto update.</li> 041 * <li>Manual update.</li> 042 * </ul></p> 043 * 044 * <p>When operating in <b>auto update</b> mode, a shared scheduler is used and the 045 * number of transactions (calls to tick()) is automatically calculated for 046 * every period. In this mode, callers should invoke <b>stop()</b> (or 047 * <b>close()</b>) when the TPS object is no longer needed to cancel its scheduled 048 * task. The shared scheduler thread is daemon and reused across instances.</p> 049 * 050 * <p>When operating in <b>manual update</b> mode, user has to call one of its 051 * floatValue() or intValue() method at regular intervals. The returned value 052 * will be the average TPS for the given period since the last call.</p> 053 * 054 * @author Alejandro Revilla, Jeronimo Paolleti and Thiago Moretto 055 * @since 1.6.7 r2912 056 */ 057@SuppressWarnings("unused") 058public class TPS implements Loggeable, AutoCloseable { 059 /** 060 * If set (via setSimulatedNanoTime), getNanoTime() returns this value. 061 * This is intended for deterministic tests. 062 */ 063 protected volatile long simulatedNanoTime = 0L; 064 065 static final long FROM_NANOS = 1_000_000L; // nanos -> millis conversion factor (kept for compatibility) 066 067 private final AtomicLong count = new AtomicLong(0L); 068 069 private final Duration period; 070 071 private volatile boolean autoupdate; 072 073 // Published metrics: written under lock, read lock-free. 074 private volatile float tps; 075 private volatile float avg; 076 private volatile int peak; 077 private volatile Instant peakWhen; 078 079 // State for manual/auto interval measurement (monotonic). 080 private volatile long lastSampleNanos; 081 082 // State for getElapsed() (wall clock), updated when sampling. 083 private volatile Instant startWall; 084 085 // Average computation state (guarded by lock). 086 private long readings; 087 088 // Auto-update scheduler (shared across TPS instances). 089 private static final ScheduledExecutorService SHARED_SCHEDULER = Executors.newSingleThreadScheduledExecutor( 090 new ThreadFactory() { 091 @Override 092 public Thread newThread(Runnable r) { 093 Thread t = new Thread(r, "TPS-auto-update"); 094 t.setDaemon(true); 095 return t; 096 } 097 } 098 ); 099 private ScheduledFuture<?> scheduledTask; 100 101 // Pluggable time source (monotonic nanos). 102 private final LongSupplier nanoTimeSource; 103 104 public TPS() { 105 this(1000L, false); 106 } 107 108 /** 109 * @param autoupdate true to auto update. 110 */ 111 public TPS(boolean autoupdate) { 112 this(1000L, autoupdate); 113 } 114 115 /** 116 * @param period in millis. 117 * @param autoupdate true to autoupdate. 118 */ 119 public TPS(final long period, boolean autoupdate) { 120 this(period, autoupdate, null); 121 } 122 123 /** 124 * Internal constructor that allows injecting a nano time source for tests. 125 * If nanoTimeSource is null, System.nanoTime() is used (or simulatedNanoTime when set). 126 */ 127 TPS(final long periodMillis, boolean autoupdate, LongSupplier nanoTimeSource) { 128 if (periodMillis <= 0L) 129 throw new IllegalArgumentException("period must be > 0 ms"); 130 131 this.period = Duration.ofMillis(periodMillis); 132 this.autoupdate = autoupdate; 133 134 this.nanoTimeSource = nanoTimeSource != null ? nanoTimeSource : this::getNanoTime; 135 136 Instant nowWall = Instant.now(); 137 this.startWall = nowWall; 138 this.peakWhen = nowWall; 139 140 long nowNanos = this.nanoTimeSource.getAsLong(); 141 this.lastSampleNanos = nowNanos; 142 143 if (autoupdate) { 144 startScheduler(periodMillis); 145 } 146 } 147 148 public void tick() { 149 count.incrementAndGet(); 150 } 151 152 public float floatValue() { 153 if (autoupdate) { 154 return tps; 155 } else { 156 return calcTPSIfDue(); 157 } 158 } 159 160 public int intValue() { 161 return Math.round(floatValue()); 162 } 163 164 public float getAvg() { 165 return avg; 166 } 167 168 public int getPeak() { 169 return peak; 170 } 171 172 public long getPeakWhen() { 173 Instant pw = peakWhen; 174 return pw != null ? pw.toEpochMilli() : -1L; 175 } 176 177 /** 178 * resets average and peak. 179 */ 180 public void reset() { 181 synchronized (this) { 182 avg = 0f; 183 peak = 0; 184 peakWhen = null; 185 readings = 0L; 186 } 187 } 188 189 public long getPeriod() { 190 return period.toMillis(); 191 } 192 193 public long getElapsed() { 194 // Wall-clock elapsed since last sampling start (manual) or since construction (auto). 195 Instant sw = startWall; 196 return sw != null ? Duration.between(sw, Instant.now()).toMillis() : 0L; 197 } 198 199 public String toString() { 200 return String.format("tps=%d, peak=%d, avg=%.2f", intValue(), getPeak(), getAvg()); 201 } 202 203 public void stop() { 204 synchronized (this) { 205 autoupdate = false; // can still use it in manual mode 206 if (scheduledTask != null) { 207 scheduledTask.cancel(false); 208 scheduledTask = null; 209 } 210 } 211 } 212 213 @Override 214 public void close() { 215 stop(); 216 } 217 218 public void dump(PrintStream p, String indent) { 219 p.println(indent 220 + "<tps" 221 + (autoupdate ? " auto='true'>" : ">") 222 + this 223 + "</tps>"); 224 } 225 226 private void startScheduler(long periodMillis) { 227 final WeakReference<TPS> ref = new WeakReference<>(this); 228 final AtomicReference<ScheduledFuture<?>> self = new AtomicReference<>(); 229 230 // scheduleAtFixedRate keeps cadence; we still compute based on actual elapsed nanos 231 // to be robust against pauses or scheduling delays. 232 ScheduledFuture<?> future = SHARED_SCHEDULER.scheduleAtFixedRate(() -> { 233 TPS t = ref.get(); 234 if (t == null) { 235 ScheduledFuture<?> f = self.get(); 236 if (f != null) { 237 f.cancel(false); 238 } 239 return; 240 } 241 try { 242 t.calcTPSSampled(); 243 } catch (Throwable ignored) { 244 // Avoid terminating the scheduler thread due to an unchecked exception. 245 // Intentionally swallow: TPS is best-effort telemetry. 246 } 247 }, periodMillis, periodMillis, TimeUnit.MILLISECONDS); 248 249 self.set(future); 250 scheduledTask = future; 251 } 252 253 /** 254 * Manual mode: compute and publish TPS only when at least one period has elapsed. 255 */ 256 private float calcTPSIfDue() { 257 long nowNanos = nanoTimeSource.getAsLong(); 258 long elapsedNanos = nowNanos - lastSampleNanos; 259 260 if (elapsedNanos >= period.toNanos()) { 261 return calcTPS(elapsedNanos, nowNanos); 262 } 263 return tps; 264 } 265 266 /** 267 * Auto mode: compute and publish TPS every scheduler tick, based on actual elapsed nanos. 268 */ 269 private void calcTPSSampled() { 270 if (!autoupdate) 271 return; 272 273 long nowNanos = nanoTimeSource.getAsLong(); 274 long elapsedNanos = nowNanos - lastSampleNanos; 275 276 // If for any reason elapsed is non-positive (e.g., simulated time misuse), skip safely. 277 if (elapsedNanos <= 0L) 278 return; 279 280 calcTPS(elapsedNanos, nowNanos); 281 } 282 283 /** 284 * Computes TPS as transactions per second (count / elapsedSeconds) using monotonic time. 285 * Publishes tps/avg/peak/peakWhen atomically under lock; count is atomically captured. 286 */ 287 private float calcTPS(long elapsedNanos, long nowNanos) { 288 final long c = count.getAndSet(0L); // atomic capture-and-reset; avoids lost ticks 289 final float newTps = (c <= 0L) ? 0f : (c * 1_000_000_000f) / (float) elapsedNanos; 290 291 synchronized (this) { 292 tps = newTps; 293 294 // Online average of sampled TPS values. 295 long r = readings++; 296 avg = (r * avg + newTps) / (r + 1L); 297 298 int rounded = Math.round(newTps); 299 if (rounded > peak) { 300 peak = rounded; 301 peakWhen = Instant.now(); 302 } 303 304 lastSampleNanos = nowNanos; 305 startWall = Instant.now(); 306 return tps; 307 } 308 } 309 310 public void setSimulatedNanoTime(long simulatedNanoTime) { 311 // This is a monotonic nano time value intended for tests. 312 // We do not attempt to convert it to epoch-based Instants. 313 if (this.simulatedNanoTime == 0L) { 314 long now = simulatedNanoTime; 315 if (now > 0L) { 316 lastSampleNanos = now; 317 } 318 } 319 this.simulatedNanoTime = simulatedNanoTime; 320 } 321 322 protected long getNanoTime() { 323 long s = simulatedNanoTime; 324 return s > 0L ? s : System.nanoTime(); 325 } 326}