001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import org.jpos.core.Configurable;
022import org.jpos.core.Configuration;
023import org.jpos.core.ConfigurationException;
024import org.jpos.util.BlockingQueue.Closed;
025import org.jpos.util.NameRegistrar.NotFoundException;
026
027import java.io.PrintStream;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicInteger;
031
032/**
033 * Implements a ThreadPool with the ability to run simple Runnable
034 * tasks as well as Jobs (supervised Runnable tasks)
035 * @since 1.1
036 * @author apr@cs.com.uy
037 * @deprecated Used Executor framework
038 */
039@Deprecated
040public class ThreadPool extends ThreadGroup implements LogSource, Loggeable, Configurable, ThreadPoolMBean {
041    private static AtomicInteger poolNumber = new AtomicInteger(0);
042    private static AtomicInteger threadNumber = new AtomicInteger(0);
043    private int maxPoolSize = 1;
044    private int available;
045    private int running = 0;
046    private int active  = 0;
047    private BlockingQueue pool = new BlockingQueue();
048    private Logger logger;
049    private String realm;
050    private int jobs = 0;
051    private final String namePrefix;
052    /** Default maximum number of threads when no explicit limit is configured. */
053    public static final int DEFAULT_MAX_THREADS = 100;
054
055    
056    private AtomicInteger threadCount = new AtomicInteger();
057    private ExecutorService executor = Executors.newThreadPerTaskExecutor(
058      Thread.ofVirtual()
059        .inheritInheritableThreadLocals(false)
060        .factory());
061
062
063    /**
064     * Marks jobs that can be supervised and interrupted when expired.
065     */
066    public interface Supervised {
067        /**
068         * Indicates whether the supervised job has expired.
069         *
070         * @return {@code true} if the job should be interrupted
071         */
072        boolean expired();
073    }
074
075    private class PooledThread extends Thread {
076        Object currentJob = null;
077
078        public PooledThread() {
079            super (ThreadPool.this,
080                    ThreadPool.this.namePrefix + ".PooledThread-" + threadNumber.getAndIncrement());
081            setDaemon(true);
082        }
083
084        public void run () {
085            String name = getName();
086            try {
087                while (pool.ready()) {
088                    Object job = pool.dequeue();
089                    if (job instanceof Runnable) {
090                        setName (name + "-running");
091                        synchronized (ThreadPool.this) {
092                            currentJob = job;
093                            active++;
094                        }
095                        try {
096                            ((Runnable) job).run();
097                            setName (name + "-idle");
098                        } catch (Throwable t) {
099                            setName (name + "-idle-"+t.getMessage());
100                        }
101                        synchronized (ThreadPool.this) {
102                            currentJob = null;
103                            available++;
104                            active--;
105                        }
106                    } else {
107                        synchronized (ThreadPool.this) {
108                            currentJob = null;
109                            available++;
110                        }
111                    }
112                }
113            } catch (InterruptedException e) {
114                if (logger != null) {
115                    Logger.log(new LogEvent(ThreadPool.this, e.getMessage()));
116                }
117            } catch (Closed e) {
118                if (logger != null) {
119                    Logger.log(new LogEvent(ThreadPool.this, e.getMessage()));
120                }
121            }
122        }
123        public synchronized void supervise () {
124            if (currentJob != null && currentJob instanceof Supervised && ((Supervised)currentJob).expired())
125                this.interrupt();
126        }
127    }
128
129    /**
130     * Constructs a ThreadPool with the default name {@code "ThreadPool"}.
131     *
132     * @param poolSize starting pool size
133     * @param maxPoolSize maximum number of threads on this pool
134     */
135    public ThreadPool (int poolSize, int maxPoolSize) {
136        this(poolSize, maxPoolSize, "ThreadPool");
137    }
138    /**
139     * Constructs a ThreadPool with an explicit name.
140     *
141     * @param name pool name
142     * @param poolSize starting pool size
143     * @param maxPoolSize maximum number of threads on this pool
144     */
145    public ThreadPool (int poolSize, int maxPoolSize, String name) {
146        super(name + "-" + poolNumber.getAndIncrement());
147        this.maxPoolSize = maxPoolSize > 0 ? maxPoolSize : DEFAULT_MAX_THREADS ;
148        this.available = this.maxPoolSize;
149        this.namePrefix = name;
150        init (poolSize);
151    }
152    
153    private void init(int poolSize){
154        while (running < Math.min (poolSize > 0 ? poolSize : 1, maxPoolSize)) {
155            running++;
156            new PooledThread().start();
157        }
158    }
159    /**
160     * Default constructor for ThreadPool
161     */
162    public ThreadPool () {
163        this(1, DEFAULT_MAX_THREADS);
164    }
165
166    /**
167     * Closes the pool queue and stops accepting new jobs.
168     */
169    public void close () {
170        pool.close();
171    }
172
173    /**
174     * Executes a runnable using the pool infrastructure.
175     *
176     * @param action runnable to execute
177     * @throws Closed if the pool is no longer accepting jobs
178     */
179    public synchronized void execute(Runnable action) throws Closed {
180        executor.submit(() -> {
181            threadCount.incrementAndGet();
182            action.run();
183            threadCount.decrementAndGet();
184        });
185
186
187
188        if (!pool.ready())
189            throw new Closed();
190
191        if (++jobs % this.maxPoolSize == 0 || pool.consumerCount() <= 0)
192            supervise();
193
194        if (running < maxPoolSize && pool.consumerDeficit() >= 0) {
195            new PooledThread().start();
196            running++;
197        }
198        available--;
199        pool.enqueue (action);
200    }
201    public void dump (PrintStream p, String indent) {
202        String inner = indent + "  ";
203        p.println (indent + "<thread-pool name=\""+getName()+"\">");
204        if (!pool.ready())
205            p.println (inner  + "<closed/>");
206        p.println (inner  + "<jobs>" + getJobCount() + "</jobs>");
207        p.println (inner  + "<size>" + getPoolSize() + "</size>");
208        p.println (inner  + "<max>"  + getMaxPoolSize() + "</max>");
209        p.println (inner  + "<idle>"  + getIdleCount() + "</idle>");
210        p.println (inner  + "<active>"  + getActiveCount() + "</active>");
211        p.println (inner  + "<pending>" + getPendingCount() + "</pending>");
212        p.println (indent + "</thread-pool>");
213    }
214
215    /**
216     * @return number of jobs processed by this pool
217     */
218    public int getJobCount () {
219        return jobs;
220    }
221    /**
222     * @return number of running threads
223     */
224    public int getPoolSize () {
225        return running;
226    }
227    /**
228     * @return max number of active threads allowed
229     */
230    public int getMaxPoolSize () {
231        return maxPoolSize;
232    }
233    /**
234     * Returns the number of threads currently executing a job.
235     *
236     * @return number of active threads
237     */
238    public int getActiveCount () {
239        return active;
240    }
241    /**
242     * Returns the number of threads currently waiting for work.
243     *
244     * @return number of idle threads
245     */
246    public int getIdleCount () {
247        return pool.consumerCount ();
248    }
249    /**
250     * Returns the number of threads available to accept new work without spawning.
251     *
252     * @return number of available threads
253     */
254    synchronized public int getAvailableCount () {
255        return available;
256    }
257
258    /**
259     * @return number of Pending jobs
260     */
261    public int getPendingCount () {
262        return pool.pending ();
263    }
264
265    /**
266     * Supervises pooled threads and interrupts expired supervised jobs.
267     */
268    public void supervise () {
269        Thread[] t = new Thread[maxPoolSize];
270        int cnt = enumerate (t);
271        for (int i=0; i<cnt; i++) 
272            if (t[i] instanceof PooledThread)
273                ((PooledThread) t[i]).supervise();
274    }
275
276    public void setLogger (Logger logger, String realm) {
277        this.logger = logger;
278        this.realm  = realm;
279    }
280    public String getRealm () {
281        return realm;
282    }
283    public Logger getLogger() {
284        return logger;
285    }
286    
287   /** 
288    * @param cfg Configuration object
289    * @throws ConfigurationException if configuration is invalid
290    */
291    public void setConfiguration(Configuration cfg) throws ConfigurationException {
292        maxPoolSize = cfg.getInt("max-size", DEFAULT_MAX_THREADS);
293        init (cfg.getInt("initial-size"));
294    }
295    
296    /** 
297     * Retrieves a thread pool from NameRegistrar given its name, unique identifier.
298     *
299     * @param name Name of the thread pool to retrieve, must be the same as the name property of the thread-pool tag in the QSP config file
300     * @throws NotFoundException thrown when there is not a thread-pool registered under this name.
301     * @return returns the retrieved instance of thread pool
302     */    
303    public static ThreadPool getThreadPool(java.lang.String name) throws NotFoundException {
304        return (ThreadPool)NameRegistrar.get("thread.pool." + name);
305    }
306}