001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.util.BlockingQueue.Closed; 025import org.jpos.util.NameRegistrar.NotFoundException; 026 027import java.io.PrintStream; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicInteger; 031 032/** 033 * Implements a ThreadPool with the ability to run simple Runnable 034 * tasks as well as Jobs (supervised Runnable tasks) 035 * @since 1.1 036 * @author apr@cs.com.uy 037 * @deprecated Used Executor framework 038 */ 039@Deprecated 040public class ThreadPool extends ThreadGroup implements LogSource, Loggeable, Configurable, ThreadPoolMBean { 041 private static AtomicInteger poolNumber = new AtomicInteger(0); 042 private static AtomicInteger threadNumber = new AtomicInteger(0); 043 private int maxPoolSize = 1; 044 private int available; 045 private int running = 0; 046 private int active = 0; 047 private BlockingQueue pool = new BlockingQueue(); 048 private Logger logger; 049 private String realm; 050 private int jobs = 0; 051 private final String namePrefix; 052 public static final int DEFAULT_MAX_THREADS = 100; 053 054 055 private AtomicInteger threadCount = new AtomicInteger(); 056 private ExecutorService executor = Executors.newThreadPerTaskExecutor( 057 Thread.ofVirtual() 058 .inheritInheritableThreadLocals(false) 059 .factory()); 060 061 062 public interface Supervised { 063 boolean expired(); 064 } 065 066 private class PooledThread extends Thread { 067 Object currentJob = null; 068 069 public PooledThread() { 070 super (ThreadPool.this, 071 ThreadPool.this.namePrefix + ".PooledThread-" + threadNumber.getAndIncrement()); 072 setDaemon(true); 073 } 074 075 public void run () { 076 String name = getName(); 077 try { 078 while (pool.ready()) { 079 Object job = pool.dequeue(); 080 if (job instanceof Runnable) { 081 setName (name + "-running"); 082 synchronized (ThreadPool.this) { 083 currentJob = job; 084 active++; 085 } 086 try { 087 ((Runnable) job).run(); 088 setName (name + "-idle"); 089 } catch (Throwable t) { 090 setName (name + "-idle-"+t.getMessage()); 091 } 092 synchronized (ThreadPool.this) { 093 currentJob = null; 094 available++; 095 active--; 096 } 097 } else { 098 synchronized (ThreadPool.this) { 099 currentJob = null; 100 available++; 101 } 102 } 103 } 104 } catch (InterruptedException e) { 105 if (logger != null) { 106 Logger.log(new LogEvent(ThreadPool.this, e.getMessage())); 107 } 108 } catch (Closed e) { 109 if (logger != null) { 110 Logger.log(new LogEvent(ThreadPool.this, e.getMessage())); 111 } 112 } 113 } 114 public synchronized void supervise () { 115 if (currentJob != null && currentJob instanceof Supervised && ((Supervised)currentJob).expired()) 116 this.interrupt(); 117 } 118 } 119 120 /** 121 * @param poolSize starting pool size 122 * @param maxPoolSize maximum number of threads on this pool 123 */ 124 public ThreadPool (int poolSize, int maxPoolSize) { 125 this(poolSize, maxPoolSize, "ThreadPool"); 126 } 127 /** 128 * @param name pool name 129 * @param poolSize starting pool size 130 * @param maxPoolSize maximum number of threads on this pool 131 */ 132 public ThreadPool (int poolSize, int maxPoolSize, String name) { 133 super(name + "-" + poolNumber.getAndIncrement()); 134 this.maxPoolSize = maxPoolSize > 0 ? maxPoolSize : DEFAULT_MAX_THREADS ; 135 this.available = this.maxPoolSize; 136 this.namePrefix = name; 137 init (poolSize); 138 } 139 140 private void init(int poolSize){ 141 while (running < Math.min (poolSize > 0 ? poolSize : 1, maxPoolSize)) { 142 running++; 143 new PooledThread().start(); 144 } 145 } 146 /** 147 * Default constructor for ThreadPool 148 */ 149 public ThreadPool () { 150 this(1, DEFAULT_MAX_THREADS); 151 } 152 public void close () { 153 pool.close(); 154 } 155 156 public synchronized void execute(Runnable action) throws Closed { 157 executor.submit(() -> { 158 threadCount.incrementAndGet(); 159 action.run(); 160 threadCount.decrementAndGet(); 161 }); 162 163 164 165 if (!pool.ready()) 166 throw new Closed(); 167 168 if (++jobs % this.maxPoolSize == 0 || pool.consumerCount() <= 0) 169 supervise(); 170 171 if (running < maxPoolSize && pool.consumerDeficit() >= 0) { 172 new PooledThread().start(); 173 running++; 174 } 175 available--; 176 pool.enqueue (action); 177 } 178 public void dump (PrintStream p, String indent) { 179 String inner = indent + " "; 180 p.println (indent + "<thread-pool name=\""+getName()+"\">"); 181 if (!pool.ready()) 182 p.println (inner + "<closed/>"); 183 p.println (inner + "<jobs>" + getJobCount() + "</jobs>"); 184 p.println (inner + "<size>" + getPoolSize() + "</size>"); 185 p.println (inner + "<max>" + getMaxPoolSize() + "</max>"); 186 p.println (inner + "<idle>" + getIdleCount() + "</idle>"); 187 p.println (inner + "<active>" + getActiveCount() + "</active>"); 188 p.println (inner + "<pending>" + getPendingCount() + "</pending>"); 189 p.println (indent + "</thread-pool>"); 190 } 191 192 /** 193 * @return number of jobs processed by this pool 194 */ 195 public int getJobCount () { 196 return jobs; 197 } 198 /** 199 * @return number of running threads 200 */ 201 public int getPoolSize () { 202 return running; 203 } 204 /** 205 * @return max number of active threads allowed 206 */ 207 public int getMaxPoolSize () { 208 return maxPoolSize; 209 } 210 /** 211 * @return number of active threads 212 */ 213 public int getActiveCount () { 214 return active; 215 } 216 /** 217 * @return number of idle threads 218 */ 219 public int getIdleCount () { 220 return pool.consumerCount (); 221 } 222 /** 223 * @return number of available threads 224 */ 225 synchronized public int getAvailableCount () { 226 return available; 227 } 228 229 /** 230 * @return number of Pending jobs 231 */ 232 public int getPendingCount () { 233 return pool.pending (); 234 } 235 236 public void supervise () { 237 Thread[] t = new Thread[maxPoolSize]; 238 int cnt = enumerate (t); 239 for (int i=0; i<cnt; i++) 240 if (t[i] instanceof PooledThread) 241 ((PooledThread) t[i]).supervise(); 242 } 243 244 public void setLogger (Logger logger, String realm) { 245 this.logger = logger; 246 this.realm = realm; 247 } 248 public String getRealm () { 249 return realm; 250 } 251 public Logger getLogger() { 252 return logger; 253 } 254 255 /** 256 * @param cfg Configuration object 257 * @throws ConfigurationException 258 */ 259 public void setConfiguration(Configuration cfg) throws ConfigurationException { 260 maxPoolSize = cfg.getInt("max-size", DEFAULT_MAX_THREADS); 261 init (cfg.getInt("initial-size")); 262 } 263 264 /** 265 * Retrieves a thread pool from NameRegistrar given its name, unique identifier. 266 * 267 * @param name Name of the thread pool to retrieve, must be the same as the name property of the thread-pool tag in the QSP config file 268 * @throws NotFoundException thrown when there is not a thread-pool registered under this name. 269 * @return returns the retrieved instance of thread pool 270 */ 271 public static ThreadPool getThreadPool(java.lang.String name) throws NotFoundException { 272 return (ThreadPool)NameRegistrar.get("thread.pool." + name); 273 } 274}