001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import java.io.PrintStream;
022import java.time.Duration;
023import java.time.Instant;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicLong;
029import java.util.function.LongSupplier;
030import java.util.concurrent.Executors;
031import java.lang.ref.WeakReference;
032import java.util.concurrent.atomic.AtomicReference;
033
034/**
035 * TPS can be used to measure Transactions Per Second (or transactions during
036 * other period of time).
037 *
038 * <p>It can operate in two different modes:
039 * <ul>
040 *   <li>Auto update.</li>
041 *   <li>Manual update.</li>
042 * </ul></p>
043 *
044 * <p>When operating in <b>auto update</b> mode, a shared scheduler is used and the
045 * number of transactions (calls to tick()) is automatically calculated for
046 * every period. In this mode, callers should invoke <b>stop()</b> (or
047 * <b>close()</b>) when the TPS object is no longer needed to cancel its scheduled
048 * task. The shared scheduler thread is daemon and reused across instances.</p>
049 *
050 * <p>When operating in <b>manual update</b> mode, user has to call one of its
051 * floatValue() or intValue() method at regular intervals. The returned value
052 * will be the average TPS for the given period since the last call.</p>
053 *
054 * @author Alejandro Revilla, Jeronimo Paolleti and Thiago Moretto
055 * @since 1.6.7 r2912
056 */
057@SuppressWarnings("unused")
058public class TPS implements Loggeable, AutoCloseable {
059    /**
060     * If set (via setSimulatedNanoTime), getNanoTime() returns this value.
061     * This is intended for deterministic tests.
062     */
063    protected volatile long simulatedNanoTime = 0L;
064
065    static final long FROM_NANOS = 1_000_000L; // nanos -> millis conversion factor (kept for compatibility)
066
067    private final AtomicLong count = new AtomicLong(0L);
068
069    private final Duration period;
070
071    private volatile boolean autoupdate;
072
073    // Published metrics: written under lock, read lock-free.
074    private volatile float tps;
075    private volatile float avg;
076    private volatile int peak;
077    private volatile Instant peakWhen;
078
079    // State for manual/auto interval measurement (monotonic).
080    private volatile long lastSampleNanos;
081
082    // State for getElapsed() (wall clock), updated when sampling.
083    private volatile Instant startWall;
084
085    // Average computation state (guarded by lock).
086    private long readings;
087
088    // Auto-update scheduler (shared across TPS instances).
089    private static final ScheduledExecutorService SHARED_SCHEDULER = Executors.newSingleThreadScheduledExecutor(
090      new ThreadFactory() {
091          @Override
092          public Thread newThread(Runnable r) {
093              Thread t = new Thread(r, "TPS-auto-update");
094              t.setDaemon(true);
095              return t;
096          }
097      }
098    );
099    private ScheduledFuture<?> scheduledTask;
100
101    // Pluggable time source (monotonic nanos).
102    private final LongSupplier nanoTimeSource;
103
104    public TPS() {
105        this(1000L, false);
106    }
107
108    /**
109     * @param autoupdate true to auto update.
110     */
111    public TPS(boolean autoupdate) {
112        this(1000L, autoupdate);
113    }
114
115    /**
116     * @param period in millis.
117     * @param autoupdate true to autoupdate.
118     */
119    public TPS(final long period, boolean autoupdate) {
120        this(period, autoupdate, null);
121    }
122
123    /**
124     * Internal constructor that allows injecting a nano time source for tests.
125     * If nanoTimeSource is null, System.nanoTime() is used (or simulatedNanoTime when set).
126     */
127    TPS(final long periodMillis, boolean autoupdate, LongSupplier nanoTimeSource) {
128        if (periodMillis <= 0L)
129            throw new IllegalArgumentException("period must be > 0 ms");
130
131        this.period = Duration.ofMillis(periodMillis);
132        this.autoupdate = autoupdate;
133
134        this.nanoTimeSource = nanoTimeSource != null ? nanoTimeSource : this::getNanoTime;
135
136        Instant nowWall = Instant.now();
137        this.startWall = nowWall;
138        this.peakWhen = nowWall;
139
140        long nowNanos = this.nanoTimeSource.getAsLong();
141        this.lastSampleNanos = nowNanos;
142
143        if (autoupdate) {
144            startScheduler(periodMillis);
145        }
146    }
147
148    public void tick() {
149        count.incrementAndGet();
150    }
151
152    public float floatValue() {
153        if (autoupdate) {
154            return tps;
155        } else {
156            return calcTPSIfDue();
157        }
158    }
159
160    public int intValue() {
161        return Math.round(floatValue());
162    }
163
164    public float getAvg() {
165        return avg;
166    }
167
168    public int getPeak() {
169        return peak;
170    }
171
172    public long getPeakWhen() {
173        Instant pw = peakWhen;
174        return pw != null ? pw.toEpochMilli() : -1L;
175    }
176
177    /**
178     * resets average and peak.
179     */
180    public void reset() {
181        synchronized (this) {
182            avg = 0f;
183            peak = 0;
184            peakWhen = null;
185            readings = 0L;
186        }
187    }
188
189    public long getPeriod() {
190        return period.toMillis();
191    }
192
193    public long getElapsed() {
194        // Wall-clock elapsed since last sampling start (manual) or since construction (auto).
195        Instant sw = startWall;
196        return sw != null ? Duration.between(sw, Instant.now()).toMillis() : 0L;
197    }
198
199    public String toString() {
200        return String.format("tps=%d, peak=%d, avg=%.2f", intValue(), getPeak(), getAvg());
201    }
202
203    public void stop() {
204        synchronized (this) {
205            autoupdate = false; // can still use it in manual mode
206            if (scheduledTask != null) {
207                scheduledTask.cancel(false);
208                scheduledTask = null;
209            }
210        }
211    }
212
213    @Override
214    public void close() {
215        stop();
216    }
217
218    public void dump(PrintStream p, String indent) {
219        p.println(indent
220          + "<tps"
221          + (autoupdate ? " auto='true'>" : ">")
222          + this
223          + "</tps>");
224    }
225
226    private void startScheduler(long periodMillis) {
227        final WeakReference<TPS> ref = new WeakReference<>(this);
228        final AtomicReference<ScheduledFuture<?>> self = new AtomicReference<>();
229
230        // scheduleAtFixedRate keeps cadence; we still compute based on actual elapsed nanos
231        // to be robust against pauses or scheduling delays.
232        ScheduledFuture<?> future = SHARED_SCHEDULER.scheduleAtFixedRate(() -> {
233            TPS t = ref.get();
234            if (t == null) {
235                ScheduledFuture<?> f = self.get();
236                if (f != null) {
237                    f.cancel(false);
238                }
239                return;
240            }
241            try {
242                t.calcTPSSampled();
243            } catch (Throwable ignored) {
244                // Avoid terminating the scheduler thread due to an unchecked exception.
245                // Intentionally swallow: TPS is best-effort telemetry.
246            }
247        }, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
248
249        self.set(future);
250        scheduledTask = future;
251    }
252
253    /**
254     * Manual mode: compute and publish TPS only when at least one period has elapsed.
255     */
256    private float calcTPSIfDue() {
257        long nowNanos = nanoTimeSource.getAsLong();
258        long elapsedNanos = nowNanos - lastSampleNanos;
259
260        if (elapsedNanos >= period.toNanos()) {
261            return calcTPS(elapsedNanos, nowNanos);
262        }
263        return tps;
264    }
265
266    /**
267     * Auto mode: compute and publish TPS every scheduler tick, based on actual elapsed nanos.
268     */
269    private void calcTPSSampled() {
270        if (!autoupdate)
271            return;
272
273        long nowNanos = nanoTimeSource.getAsLong();
274        long elapsedNanos = nowNanos - lastSampleNanos;
275
276        // If for any reason elapsed is non-positive (e.g., simulated time misuse), skip safely.
277        if (elapsedNanos <= 0L)
278            return;
279
280        calcTPS(elapsedNanos, nowNanos);
281    }
282
283    /**
284     * Computes TPS as transactions per second (count / elapsedSeconds) using monotonic time.
285     * Publishes tps/avg/peak/peakWhen atomically under lock; count is atomically captured.
286     */
287    private float calcTPS(long elapsedNanos, long nowNanos) {
288        final long c = count.getAndSet(0L); // atomic capture-and-reset; avoids lost ticks
289        final float newTps = (c <= 0L) ? 0f : (c * 1_000_000_000f) / (float) elapsedNanos;
290
291        synchronized (this) {
292            tps = newTps;
293
294            // Online average of sampled TPS values.
295            long r = readings++;
296            avg = (r * avg + newTps) / (r + 1L);
297
298            int rounded = Math.round(newTps);
299            if (rounded > peak) {
300                peak = rounded;
301                peakWhen = Instant.now();
302            }
303
304            lastSampleNanos = nowNanos;
305            startWall = Instant.now();
306            return tps;
307        }
308    }
309
310    public void setSimulatedNanoTime(long simulatedNanoTime) {
311        // This is a monotonic nano time value intended for tests.
312        // We do not attempt to convert it to epoch-based Instants.
313        if (this.simulatedNanoTime == 0L) {
314            long now = simulatedNanoTime;
315            if (now > 0L) {
316                lastSampleNanos = now;
317            }
318        }
319        this.simulatedNanoTime = simulatedNanoTime;
320    }
321
322    protected long getNanoTime() {
323        long s = simulatedNanoTime;
324        return s > 0L ? s : System.nanoTime();
325    }
326}