001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import org.jpos.core.Configurable;
022import org.jpos.core.Configuration;
023import org.jpos.core.ConfigurationException;
024import org.jpos.util.BlockingQueue.Closed;
025import org.jpos.util.NameRegistrar.NotFoundException;
026
027import java.io.PrintStream;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicInteger;
031
032/**
033 * Implements a ThreadPool with the ability to run simple Runnable
034 * tasks as well as Jobs (supervised Runnable tasks)
035 * @since 1.1
036 * @author apr@cs.com.uy
037 * @deprecated Used Executor framework
038 */
039@Deprecated
040public class ThreadPool extends ThreadGroup implements LogSource, Loggeable, Configurable, ThreadPoolMBean {
041    private static AtomicInteger poolNumber = new AtomicInteger(0);
042    private static AtomicInteger threadNumber = new AtomicInteger(0);
043    private int maxPoolSize = 1;
044    private int available;
045    private int running = 0;
046    private int active  = 0;
047    private BlockingQueue pool = new BlockingQueue();
048    private Logger logger;
049    private String realm;
050    private int jobs = 0;
051    private final String namePrefix;
052    public static final int DEFAULT_MAX_THREADS = 100;
053
054    
055    private AtomicInteger threadCount = new AtomicInteger();
056    private ExecutorService executor = Executors.newThreadPerTaskExecutor(
057      Thread.ofVirtual()
058        .inheritInheritableThreadLocals(false)
059        .factory());
060
061
062    public interface Supervised {
063        boolean expired();
064    }
065
066    private class PooledThread extends Thread {
067        Object currentJob = null;
068
069        public PooledThread() {
070            super (ThreadPool.this,
071                    ThreadPool.this.namePrefix + ".PooledThread-" + threadNumber.getAndIncrement());
072            setDaemon(true);
073        }
074
075        public void run () {
076            String name = getName();
077            try {
078                while (pool.ready()) {
079                    Object job = pool.dequeue();
080                    if (job instanceof Runnable) {
081                        setName (name + "-running");
082                        synchronized (ThreadPool.this) {
083                            currentJob = job;
084                            active++;
085                        }
086                        try {
087                            ((Runnable) job).run();
088                            setName (name + "-idle");
089                        } catch (Throwable t) {
090                            setName (name + "-idle-"+t.getMessage());
091                        }
092                        synchronized (ThreadPool.this) {
093                            currentJob = null;
094                            available++;
095                            active--;
096                        }
097                    } else {
098                        synchronized (ThreadPool.this) {
099                            currentJob = null;
100                            available++;
101                        }
102                    }
103                }
104            } catch (InterruptedException e) {
105                if (logger != null) {
106                    Logger.log(new LogEvent(ThreadPool.this, e.getMessage()));
107                }
108            } catch (Closed e) {
109                if (logger != null) {
110                    Logger.log(new LogEvent(ThreadPool.this, e.getMessage()));
111                }
112            }
113        }
114        public synchronized void supervise () {
115            if (currentJob != null && currentJob instanceof Supervised && ((Supervised)currentJob).expired())
116                this.interrupt();
117        }
118    }
119
120    /**
121     * @param poolSize starting pool size
122     * @param maxPoolSize maximum number of threads on this pool
123     */
124    public ThreadPool (int poolSize, int maxPoolSize) {
125        this(poolSize, maxPoolSize, "ThreadPool");
126    }
127    /**
128     * @param name pool name
129     * @param poolSize starting pool size
130     * @param maxPoolSize maximum number of threads on this pool
131     */
132    public ThreadPool (int poolSize, int maxPoolSize, String name) {
133        super(name + "-" + poolNumber.getAndIncrement());
134        this.maxPoolSize = maxPoolSize > 0 ? maxPoolSize : DEFAULT_MAX_THREADS ;
135        this.available = this.maxPoolSize;
136        this.namePrefix = name;
137        init (poolSize);
138    }
139    
140    private void init(int poolSize){
141        while (running < Math.min (poolSize > 0 ? poolSize : 1, maxPoolSize)) {
142            running++;
143            new PooledThread().start();
144        }
145    }
146    /**
147     * Default constructor for ThreadPool
148     */
149    public ThreadPool () {
150        this(1, DEFAULT_MAX_THREADS);
151    }
152    public void close () {
153        pool.close();
154    }
155
156    public synchronized void execute(Runnable action) throws Closed {
157        executor.submit(() -> {
158            threadCount.incrementAndGet();
159            action.run();
160            threadCount.decrementAndGet();
161        });
162
163
164
165        if (!pool.ready())
166            throw new Closed();
167
168        if (++jobs % this.maxPoolSize == 0 || pool.consumerCount() <= 0)
169            supervise();
170
171        if (running < maxPoolSize && pool.consumerDeficit() >= 0) {
172            new PooledThread().start();
173            running++;
174        }
175        available--;
176        pool.enqueue (action);
177    }
178    public void dump (PrintStream p, String indent) {
179        String inner = indent + "  ";
180        p.println (indent + "<thread-pool name=\""+getName()+"\">");
181        if (!pool.ready())
182            p.println (inner  + "<closed/>");
183        p.println (inner  + "<jobs>" + getJobCount() + "</jobs>");
184        p.println (inner  + "<size>" + getPoolSize() + "</size>");
185        p.println (inner  + "<max>"  + getMaxPoolSize() + "</max>");
186        p.println (inner  + "<idle>"  + getIdleCount() + "</idle>");
187        p.println (inner  + "<active>"  + getActiveCount() + "</active>");
188        p.println (inner  + "<pending>" + getPendingCount() + "</pending>");
189        p.println (indent + "</thread-pool>");
190    }
191
192    /**
193     * @return number of jobs processed by this pool
194     */
195    public int getJobCount () {
196        return jobs;
197    }
198    /**
199     * @return number of running threads
200     */
201    public int getPoolSize () {
202        return running;
203    }
204    /**
205     * @return max number of active threads allowed
206     */
207    public int getMaxPoolSize () {
208        return maxPoolSize;
209    }
210    /**
211     * @return number of active threads
212     */
213    public int getActiveCount () {
214        return active;
215    }
216    /**
217     * @return number of idle threads
218     */
219    public int getIdleCount () {
220        return pool.consumerCount ();
221    }
222    /**
223     * @return number of available threads
224     */
225    synchronized public int getAvailableCount () {
226        return available;
227    }
228
229    /**
230     * @return number of Pending jobs
231     */
232    public int getPendingCount () {
233        return pool.pending ();
234    }
235
236    public void supervise () {
237        Thread[] t = new Thread[maxPoolSize];
238        int cnt = enumerate (t);
239        for (int i=0; i<cnt; i++) 
240            if (t[i] instanceof PooledThread)
241                ((PooledThread) t[i]).supervise();
242    }
243
244    public void setLogger (Logger logger, String realm) {
245        this.logger = logger;
246        this.realm  = realm;
247    }
248    public String getRealm () {
249        return realm;
250    }
251    public Logger getLogger() {
252        return logger;
253    }
254    
255   /** 
256    * @param cfg Configuration object
257    * @throws ConfigurationException
258    */
259    public void setConfiguration(Configuration cfg) throws ConfigurationException {
260        maxPoolSize = cfg.getInt("max-size", DEFAULT_MAX_THREADS);
261        init (cfg.getInt("initial-size"));
262    }
263    
264    /** 
265     * Retrieves a thread pool from NameRegistrar given its name, unique identifier.
266     *
267     * @param name Name of the thread pool to retrieve, must be the same as the name property of the thread-pool tag in the QSP config file
268     * @throws NotFoundException thrown when there is not a thread-pool registered under this name.
269     * @return returns the retrieved instance of thread pool
270     */    
271    public static ThreadPool getThreadPool(java.lang.String name) throws NotFoundException {
272        return (ThreadPool)NameRegistrar.get("thread.pool." + name);
273    }
274}