001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import java.io.PrintStream;
022import java.time.Duration;
023import java.time.Instant;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.function.LongSupplier;
030import java.util.concurrent.Executors;
031import java.lang.ref.WeakReference;
032import java.util.concurrent.atomic.AtomicReference;
033
034/**
035 * TPS can be used to measure Transactions Per Second (or transactions during
036 * other period of time).
037 *
038 * <p>It can operate in two different modes:
039 * <ul>
040 *   <li>Auto update.</li>
041 *   <li>Manual update.</li>
042 * </ul>
043 *
044 * <p>When operating in <b>auto update</b> mode, a shared scheduler is used and the
045 * number of transactions (calls to tick()) is automatically calculated for
046 * every period. In this mode, callers should invoke <b>stop()</b> (or
047 * <b>close()</b>) when the TPS object is no longer needed to cancel its scheduled
048 * task. The shared scheduler thread is daemon and reused across instances.</p>
049 *
050 * <p>When operating in <b>manual update</b> mode, user has to call one of its
051 * floatValue() or intValue() method at regular intervals. The returned value
052 * will be the average TPS for the given period since the last call.</p>
053 *
054 * @author Alejandro Revilla, Jeronimo Paolleti and Thiago Moretto
055 * @since 1.6.7 r2912
056 */
057@SuppressWarnings("unused")
058public class TPS implements Loggeable, AutoCloseable {
059    /**
060     * If set (via setSimulatedNanoTime), getNanoTime() returns this value.
061     * This is intended for deterministic tests.
062     */
063    protected volatile long simulatedNanoTime = 0L;
064
065    static final long FROM_NANOS = 1_000_000L; // nanos -> millis conversion factor (kept for compatibility)
066
067    private final AtomicLong count = new AtomicLong(0L);
068
069    private final Duration period;
070
071    private volatile boolean autoupdate;
072
073    // Published metrics: written under lock, read lock-free.
074    private volatile float tps;
075    private volatile float avg;
076    private volatile int peak;
077    private volatile Instant peakWhen;
078
079    // State for manual/auto interval measurement (monotonic).
080    private volatile long lastSampleNanos;
081
082    // State for getElapsed() (wall clock), updated when sampling.
083    private volatile Instant startWall;
084
085    // Average computation state (guarded by lock).
086    private long readings;
087
088    // Auto-update scheduler (shared across TPS instances).
089    private static final ScheduledExecutorService SHARED_SCHEDULER = Executors.newSingleThreadScheduledExecutor(
090      new ThreadFactory() {
091          @Override
092          public Thread newThread(Runnable r) {
093              Thread t = new Thread(r, "TPS-auto-update");
094              t.setDaemon(true);
095              return t;
096          }
097      }
098    );
099    private ScheduledFuture<?> scheduledTask;
100
101    // Pluggable time source (monotonic nanos).
102    private final LongSupplier nanoTimeSource;
103
104    /** Default constructor. */
105    public TPS() {
106        this(1000L, false);
107    }
108
109    /**
110     * Creates a TPS meter with the default one-second period.
111     *
112     * @param autoupdate true to auto update
113     */
114    public TPS(boolean autoupdate) {
115        this(1000L, autoupdate);
116    }
117
118    /**
119     * Creates a TPS meter with the given sampling period.
120     *
121     * @param period sampling period in milliseconds
122     * @param autoupdate true to auto update
123     */
124    public TPS(final long period, boolean autoupdate) {
125        this(period, autoupdate, null);
126    }
127
128    /**
129     * Internal constructor that allows injecting a nano time source for tests.
130     * If nanoTimeSource is null, System.nanoTime() is used (or simulatedNanoTime when set).
131     */
132    TPS(final long periodMillis, boolean autoupdate, LongSupplier nanoTimeSource) {
133        if (periodMillis <= 0L)
134            throw new IllegalArgumentException("period must be > 0 ms");
135
136        this.period = Duration.ofMillis(periodMillis);
137        this.autoupdate = autoupdate;
138
139        this.nanoTimeSource = nanoTimeSource != null ? nanoTimeSource : this::getNanoTime;
140
141        Instant nowWall = Instant.now();
142        this.startWall = nowWall;
143        this.peakWhen = nowWall;
144
145        long nowNanos = this.nanoTimeSource.getAsLong();
146        this.lastSampleNanos = nowNanos;
147
148        if (autoupdate) {
149            startScheduler(periodMillis);
150        }
151    }
152
153    /** Increments the transaction counter for the current sampling window. */
154    public void tick() {
155        count.incrementAndGet();
156    }
157
158    /**
159     * Returns the current transactions-per-second value.
160     *
161     * @return current TPS as a floating-point value
162     */
163    public float floatValue() {
164        if (autoupdate) {
165            return tps;
166        } else {
167            return calcTPSIfDue();
168        }
169    }
170
171    /**
172     * Returns the current transactions-per-second value rounded to an integer.
173     *
174     * @return current TPS rounded to an integer
175     */
176    public int intValue() {
177        return Math.round(floatValue());
178    }
179
180    /**
181     * Returns the average TPS across sampled intervals.
182     *
183     * @return average TPS
184     */
185    public float getAvg() {
186        return avg;
187    }
188
189    /**
190     * Returns the highest recorded TPS peak.
191     *
192     * @return peak TPS
193     */
194    public int getPeak() {
195        return peak;
196    }
197
198    /**
199     * Returns the wall-clock time when the peak TPS was recorded.
200     *
201     * @return peak timestamp in epoch milliseconds, or {@code -1} if unavailable
202     */
203    public long getPeakWhen() {
204        Instant pw = peakWhen;
205        return pw != null ? pw.toEpochMilli() : -1L;
206    }
207
208    /**
209     * resets average and peak.
210     */
211    public void reset() {
212        synchronized (this) {
213            avg = 0f;
214            peak = 0;
215            peakWhen = null;
216            readings = 0L;
217        }
218    }
219
220    /**
221     * Returns the configured sampling period.
222     *
223     * @return period in milliseconds
224     */
225    public long getPeriod() {
226        return period.toMillis();
227    }
228
229    /**
230     * Returns elapsed wall-clock time since the current sampling window started.
231     *
232     * @return elapsed time in milliseconds
233     */
234    public long getElapsed() {
235        // Wall-clock elapsed since last sampling start (manual) or since construction (auto).
236        Instant sw = startWall;
237        return sw != null ? Duration.between(sw, Instant.now()).toMillis() : 0L;
238    }
239
240    public String toString() {
241        return String.format("tps=%d, peak=%d, avg=%.2f", intValue(), getPeak(), getAvg());
242    }
243
244    /** Stops auto-update scheduling and leaves the meter in manual mode. */
245    public void stop() {
246        synchronized (this) {
247            autoupdate = false; // can still use it in manual mode
248            if (scheduledTask != null) {
249                scheduledTask.cancel(false);
250                scheduledTask = null;
251            }
252        }
253    }
254
255    @Override
256    public void close() {
257        stop();
258    }
259
260    public void dump(PrintStream p, String indent) {
261        p.println(indent
262          + "<tps"
263          + (autoupdate ? " auto='true'>" : ">")
264          + this
265          + "</tps>");
266    }
267
268    private void startScheduler(long periodMillis) {
269        final WeakReference<TPS> ref = new WeakReference<>(this);
270        final AtomicReference<ScheduledFuture<?>> self = new AtomicReference<>();
271
272        // scheduleAtFixedRate keeps cadence; we still compute based on actual elapsed nanos
273        // to be robust against pauses or scheduling delays.
274        ScheduledFuture<?> future = SHARED_SCHEDULER.scheduleAtFixedRate(() -> {
275            TPS t = ref.get();
276            if (t == null) {
277                ScheduledFuture<?> f = self.get();
278                if (f != null) {
279                    f.cancel(false);
280                }
281                return;
282            }
283            try {
284                t.calcTPSSampled();
285            } catch (Throwable ignored) {
286                // Avoid terminating the scheduler thread due to an unchecked exception.
287                // Intentionally swallow: TPS is best-effort telemetry.
288            }
289        }, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
290
291        self.set(future);
292        scheduledTask = future;
293    }
294
295    /**
296     * Manual mode: compute and publish TPS only when at least one period has elapsed.
297     */
298    private float calcTPSIfDue() {
299        long nowNanos = nanoTimeSource.getAsLong();
300        long elapsedNanos = nowNanos - lastSampleNanos;
301
302        if (elapsedNanos >= period.toNanos()) {
303            return calcTPS(elapsedNanos, nowNanos);
304        }
305        return tps;
306    }
307
308    /**
309     * Auto mode: compute and publish TPS every scheduler tick, based on actual elapsed nanos.
310     */
311    private void calcTPSSampled() {
312        if (!autoupdate)
313            return;
314
315        long nowNanos = nanoTimeSource.getAsLong();
316        long elapsedNanos = nowNanos - lastSampleNanos;
317
318        // If for any reason elapsed is non-positive (e.g., simulated time misuse), skip safely.
319        if (elapsedNanos <= 0L)
320            return;
321
322        calcTPS(elapsedNanos, nowNanos);
323    }
324
325    /**
326     * Computes TPS as transactions per second (count / elapsedSeconds) using monotonic time.
327     * Publishes tps/avg/peak/peakWhen atomically under lock; count is atomically captured.
328     */
329    private float calcTPS(long elapsedNanos, long nowNanos) {
330        final long c = count.getAndSet(0L); // atomic capture-and-reset; avoids lost ticks
331        final float newTps = (c <= 0L) ? 0f : (c * 1_000_000_000f) / (float) elapsedNanos;
332
333        synchronized (this) {
334            tps = newTps;
335
336            // Online average of sampled TPS values.
337            long r = readings++;
338            avg = (r * avg + newTps) / (r + 1L);
339
340            int rounded = Math.round(newTps);
341            if (rounded > peak) {
342                peak = rounded;
343                peakWhen = Instant.now();
344            }
345
346            lastSampleNanos = nowNanos;
347            startWall = Instant.now();
348            return tps;
349        }
350    }
351
352    /**
353     * Overrides the monotonic nano time source for deterministic tests.
354     *
355     * @param simulatedNanoTime simulated monotonic time value in nanoseconds
356     */
357    public void setSimulatedNanoTime(long simulatedNanoTime) {
358        // This is a monotonic nano time value intended for tests.
359        // We do not attempt to convert it to epoch-based Instants.
360        if (this.simulatedNanoTime == 0L) {
361            long now = simulatedNanoTime;
362            if (now > 0L) {
363                lastSampleNanos = now;
364            }
365        }
366        this.simulatedNanoTime = simulatedNanoTime;
367    }
368
369    /**
370     * Returns the current monotonic time in nanoseconds.
371     *
372     * @return monotonic nano time, simulated when configured
373     */
374    protected long getNanoTime() {
375        long s = simulatedNanoTime;
376        return s > 0L ? s : System.nanoTime();
377    }
378}