001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.util.BlockingQueue.Closed; 025import org.jpos.util.NameRegistrar.NotFoundException; 026 027import java.io.PrintStream; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicInteger; 031 032/** 033 * Implements a ThreadPool with the ability to run simple Runnable 034 * tasks as well as Jobs (supervised Runnable tasks) 035 * @since 1.1 036 * @author apr@cs.com.uy 037 * @deprecated Used Executor framework 038 */ 039@Deprecated 040public class ThreadPool extends ThreadGroup implements LogSource, Loggeable, Configurable, ThreadPoolMBean { 041 private static AtomicInteger poolNumber = new AtomicInteger(0); 042 private static AtomicInteger threadNumber = new AtomicInteger(0); 043 private int maxPoolSize = 1; 044 private int available; 045 private int running = 0; 046 private int active = 0; 047 private BlockingQueue pool = new BlockingQueue(); 048 private Logger logger; 049 private String realm; 050 private int jobs = 0; 051 private final String namePrefix; 052 /** Default maximum number of threads when no explicit limit is configured. */ 053 public static final int DEFAULT_MAX_THREADS = 100; 054 055 056 private AtomicInteger threadCount = new AtomicInteger(); 057 private ExecutorService executor = Executors.newThreadPerTaskExecutor( 058 Thread.ofVirtual() 059 .inheritInheritableThreadLocals(false) 060 .factory()); 061 062 063 /** 064 * Marks jobs that can be supervised and interrupted when expired. 065 */ 066 public interface Supervised { 067 /** 068 * Indicates whether the supervised job has expired. 069 * 070 * @return {@code true} if the job should be interrupted 071 */ 072 boolean expired(); 073 } 074 075 private class PooledThread extends Thread { 076 Object currentJob = null; 077 078 public PooledThread() { 079 super (ThreadPool.this, 080 ThreadPool.this.namePrefix + ".PooledThread-" + threadNumber.getAndIncrement()); 081 setDaemon(true); 082 } 083 084 public void run () { 085 String name = getName(); 086 try { 087 while (pool.ready()) { 088 Object job = pool.dequeue(); 089 if (job instanceof Runnable) { 090 setName (name + "-running"); 091 synchronized (ThreadPool.this) { 092 currentJob = job; 093 active++; 094 } 095 try { 096 ((Runnable) job).run(); 097 setName (name + "-idle"); 098 } catch (Throwable t) { 099 setName (name + "-idle-"+t.getMessage()); 100 } 101 synchronized (ThreadPool.this) { 102 currentJob = null; 103 available++; 104 active--; 105 } 106 } else { 107 synchronized (ThreadPool.this) { 108 currentJob = null; 109 available++; 110 } 111 } 112 } 113 } catch (InterruptedException e) { 114 if (logger != null) { 115 Logger.log(new LogEvent(ThreadPool.this, e.getMessage())); 116 } 117 } catch (Closed e) { 118 if (logger != null) { 119 Logger.log(new LogEvent(ThreadPool.this, e.getMessage())); 120 } 121 } 122 } 123 public synchronized void supervise () { 124 if (currentJob != null && currentJob instanceof Supervised && ((Supervised)currentJob).expired()) 125 this.interrupt(); 126 } 127 } 128 129 /** 130 * Constructs a ThreadPool with the default name {@code "ThreadPool"}. 131 * 132 * @param poolSize starting pool size 133 * @param maxPoolSize maximum number of threads on this pool 134 */ 135 public ThreadPool (int poolSize, int maxPoolSize) { 136 this(poolSize, maxPoolSize, "ThreadPool"); 137 } 138 /** 139 * Constructs a ThreadPool with an explicit name. 140 * 141 * @param name pool name 142 * @param poolSize starting pool size 143 * @param maxPoolSize maximum number of threads on this pool 144 */ 145 public ThreadPool (int poolSize, int maxPoolSize, String name) { 146 super(name + "-" + poolNumber.getAndIncrement()); 147 this.maxPoolSize = maxPoolSize > 0 ? maxPoolSize : DEFAULT_MAX_THREADS ; 148 this.available = this.maxPoolSize; 149 this.namePrefix = name; 150 init (poolSize); 151 } 152 153 private void init(int poolSize){ 154 while (running < Math.min (poolSize > 0 ? poolSize : 1, maxPoolSize)) { 155 running++; 156 new PooledThread().start(); 157 } 158 } 159 /** 160 * Default constructor for ThreadPool 161 */ 162 public ThreadPool () { 163 this(1, DEFAULT_MAX_THREADS); 164 } 165 166 /** 167 * Closes the pool queue and stops accepting new jobs. 168 */ 169 public void close () { 170 pool.close(); 171 } 172 173 /** 174 * Executes a runnable using the pool infrastructure. 175 * 176 * @param action runnable to execute 177 * @throws Closed if the pool is no longer accepting jobs 178 */ 179 public synchronized void execute(Runnable action) throws Closed { 180 executor.submit(() -> { 181 threadCount.incrementAndGet(); 182 action.run(); 183 threadCount.decrementAndGet(); 184 }); 185 186 187 188 if (!pool.ready()) 189 throw new Closed(); 190 191 if (++jobs % this.maxPoolSize == 0 || pool.consumerCount() <= 0) 192 supervise(); 193 194 if (running < maxPoolSize && pool.consumerDeficit() >= 0) { 195 new PooledThread().start(); 196 running++; 197 } 198 available--; 199 pool.enqueue (action); 200 } 201 public void dump (PrintStream p, String indent) { 202 String inner = indent + " "; 203 p.println (indent + "<thread-pool name=\""+getName()+"\">"); 204 if (!pool.ready()) 205 p.println (inner + "<closed/>"); 206 p.println (inner + "<jobs>" + getJobCount() + "</jobs>"); 207 p.println (inner + "<size>" + getPoolSize() + "</size>"); 208 p.println (inner + "<max>" + getMaxPoolSize() + "</max>"); 209 p.println (inner + "<idle>" + getIdleCount() + "</idle>"); 210 p.println (inner + "<active>" + getActiveCount() + "</active>"); 211 p.println (inner + "<pending>" + getPendingCount() + "</pending>"); 212 p.println (indent + "</thread-pool>"); 213 } 214 215 /** 216 * @return number of jobs processed by this pool 217 */ 218 public int getJobCount () { 219 return jobs; 220 } 221 /** 222 * @return number of running threads 223 */ 224 public int getPoolSize () { 225 return running; 226 } 227 /** 228 * @return max number of active threads allowed 229 */ 230 public int getMaxPoolSize () { 231 return maxPoolSize; 232 } 233 /** 234 * Returns the number of threads currently executing a job. 235 * 236 * @return number of active threads 237 */ 238 public int getActiveCount () { 239 return active; 240 } 241 /** 242 * Returns the number of threads currently waiting for work. 243 * 244 * @return number of idle threads 245 */ 246 public int getIdleCount () { 247 return pool.consumerCount (); 248 } 249 /** 250 * Returns the number of threads available to accept new work without spawning. 251 * 252 * @return number of available threads 253 */ 254 synchronized public int getAvailableCount () { 255 return available; 256 } 257 258 /** 259 * @return number of Pending jobs 260 */ 261 public int getPendingCount () { 262 return pool.pending (); 263 } 264 265 /** 266 * Supervises pooled threads and interrupts expired supervised jobs. 267 */ 268 public void supervise () { 269 Thread[] t = new Thread[maxPoolSize]; 270 int cnt = enumerate (t); 271 for (int i=0; i<cnt; i++) 272 if (t[i] instanceof PooledThread) 273 ((PooledThread) t[i]).supervise(); 274 } 275 276 public void setLogger (Logger logger, String realm) { 277 this.logger = logger; 278 this.realm = realm; 279 } 280 public String getRealm () { 281 return realm; 282 } 283 public Logger getLogger() { 284 return logger; 285 } 286 287 /** 288 * @param cfg Configuration object 289 * @throws ConfigurationException if configuration is invalid 290 */ 291 public void setConfiguration(Configuration cfg) throws ConfigurationException { 292 maxPoolSize = cfg.getInt("max-size", DEFAULT_MAX_THREADS); 293 init (cfg.getInt("initial-size")); 294 } 295 296 /** 297 * Retrieves a thread pool from NameRegistrar given its name, unique identifier. 298 * 299 * @param name Name of the thread pool to retrieve, must be the same as the name property of the thread-pool tag in the QSP config file 300 * @throws NotFoundException thrown when there is not a thread-pool registered under this name. 301 * @return returns the retrieved instance of thread pool 302 */ 303 public static ThreadPool getThreadPool(java.lang.String name) throws NotFoundException { 304 return (ThreadPool)NameRegistrar.get("thread.pool." + name); 305 } 306}