001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import java.time.Instant;
022
023/**
024 * ThroughputControl limits the throughput 
025 * of a process to a maximum number of transactions in 
026 * a given period of time.
027 *
028 * As an example, the following code will cap the transaction count
029 * at 15 every second (a.k.a. 15 TPS).
030 *
031 * <pre>
032 *
033 *  ThroughputControl throughput = new ThroughputControl(15, 1000);
034 *
035 *  while (isConditionTrue()) {
036 *      throughput.control();
037 *      // Do stuff.
038 *  }
039 *
040 * </pre>
041 */
042public class ThroughputControl {
043    private int[] period;
044    private int[] max;
045    private int[] cnt;
046    private long[] start;
047    private long[] sleep;
048
049    /**
050     * Constructs a throttle limiting throughput to {@code maxTransactions} per period.
051     *
052     * @param maxTransactions Transaction count threshold.
053     * @param periodInMillis Time window, expressed in milliseconds.
054     */
055    public ThroughputControl (int maxTransactions, int periodInMillis) {
056        this (new int[] { maxTransactions },
057              new int[] { periodInMillis });
058    }
059    /**
060     * Constructs a throttle with multiple parallel rate-limit windows.
061     *
062     * @param maxTransactions An array with transaction count thresholds.
063     * @param periodInMillis An array of time windows, expressed in milliseconds.
064     */
065    public ThroughputControl (int[] maxTransactions, int[] periodInMillis) {
066        super();
067        int l = maxTransactions.length;
068        period = new int[l];
069        max = new int[l];
070        cnt = new int[l];
071        start = new long[l];
072        sleep = new long[l];
073        for (int i=0; i<l; i++) {
074            this.max[i]    = maxTransactions[i];
075            this.period[i] = periodInMillis[i];
076            this.sleep[i]  = Math.min(Math.max (periodInMillis[i]/10, 500L),50L);
077            this.start[i]  = Instant.now().toEpochMilli();
078        }
079    }
080
081    /**
082     * This method should be called on every transaction.
083     * It will pause the thread for a while when the threshold is reached 
084     * in order to control the process throughput.
085     * 
086     * @return Returns sleep time in milliseconds when threshold is reached. Otherwise, zero.
087     */
088    public long control() {
089        boolean delayed = false;
090        long init = Instant.now().toEpochMilli();
091        for (int i=0; i<cnt.length; i++) {
092            synchronized (this) {
093                cnt[i]++;
094            }
095            do {
096                if (cnt[i] > max[i]) {
097                    delayed = true;
098                    try { 
099                        Thread.sleep (sleep[i]); 
100                    } catch (InterruptedException e) { }
101                }
102                synchronized (this) {
103                    long now = Instant.now().toEpochMilli();
104                    if (now - start[i] > period[i]) {
105                        long elapsed = now - start[i];
106                        int  allowed = (int) (elapsed * max[i] / period[i]);
107                        start[i] = now;
108                        cnt[i] = Math.max (cnt[i] - allowed, 0);
109                    }
110                }
111            } while (cnt[i] > max[i]);
112        }
113        return delayed ? Instant.now().toEpochMilli() - init : 0L;
114    }
115
116    @Override
117    public String toString() {
118        StringBuilder sb = new StringBuilder("ThroughputControl [");
119        for (int i = 0; i < max.length; i++) {
120            sb.append(String.format(
121                "%d: max = %d, period = %dms",
122                i, max[i], period[i]
123            ));
124            if (i < max.length - 1) {
125                sb.append("; ");
126            }
127        }
128        sb.append("]");
129        return sb.toString();
130    }
131}
132