001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.qbean; 020 021import org.jdom2.Attribute; 022import org.jdom2.Element; 023import org.jpos.core.ConfigurationException; 024import org.jpos.q2.QBeanSupport; 025import org.jpos.util.NameRegistrar; 026import org.jpos.util.NameRegistrar.NotFoundException; 027 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032 033/** 034 * A qbean dedicated to thread pool executor creation and registration by Q2 035 * NameRegistrar registry<br> 036 * 037 * @author dgrandemange 038 */ 039public class QThreadPoolExecutor extends QBeanSupport implements 040 QThreadPoolExecutorMBean { 041 /** Default constructor; no instance state to initialise. */ 042 public QThreadPoolExecutor() {} 043 044 /** Prefix used when registering pooled executors in {@link NameRegistrar}. */ 045 public static final String THREAD_POOL_EXECUTOR__QBEAN_PREFIX = "thread.pool.executor."; 046 047 /** XML attribute selecting the executor type ({@code fixed}, {@code cached}, {@code scheduled}, {@code single}). */ 048 public static final String XML_CONFIG_ATTR__EXEC_SRV_TYPE = "type"; 049 050 /** XML attribute setting the executor's core pool size. */ 051 public static final String XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE = "corePoolSize"; 052 053 /** XML attribute setting the awaitTermination timeout, in seconds. */ 054 public static final String XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER = "terminationTimer"; 055 056 /** Default {@link #XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER} value, in seconds. */ 057 public static final int DEFAULT_TERMINATION_TIMER = 15; 058 059 private String execSrvType; 060 061 private int initialCorePoolSize; 062 063 private int terminationTimer = DEFAULT_TERMINATION_TIMER; 064 065 /** 066 * Handle specific config elements 067 * 068 * type := "fixed" | "scheduled" | "cached" corePoolSize := integer 069 * (required for "fixed" and "scheduled" kinds, optional for "cached" kind) 070 * 071 */ 072 @Override 073 protected void initService() throws Exception { 074 Element rootElt = this.getPersist(); 075 076 Attribute execSrvTypeAttr = getAttribute(rootElt, 077 XML_CONFIG_ATTR__EXEC_SRV_TYPE, true, 078 "(thread pool executor type among {fixed|cached|scheduled|single})"); 079 execSrvType = execSrvTypeAttr.getValue().trim(); 080 081 if ("fixed".equals(execSrvType)) { 082 Attribute corePoolSizeAttr = getAttribute(rootElt, 083 XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true, 084 "(number of threads in the pool)"); 085 initialCorePoolSize = corePoolSizeAttr.getIntValue(); 086 087 } else if ("cached".equals(execSrvType)) { 088 Attribute corePoolSizeAttr = getAttribute(rootElt, 089 XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, false, 090 "(number of threads in the pool)"); 091 if (null != corePoolSizeAttr) { 092 initialCorePoolSize = corePoolSizeAttr.getIntValue(); 093 } 094 095 } else if ("scheduled".equals(execSrvType)) { 096 Attribute corePoolSizeAttr = getAttribute(rootElt, 097 XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true, 098 "(number of threads in the pool)"); 099 initialCorePoolSize = corePoolSizeAttr.getIntValue(); 100 101 } else { 102 throw new ConfigurationException( 103 "Invalid thread pool executor type '%s' (valid types={fixed|cached|scheduled} )"); 104 } 105 106 Attribute terminationTimerAttr = getAttribute(rootElt, 107 XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER, false, 108 "(termination timer in seconds)"); 109 if (null != terminationTimerAttr) { 110 terminationTimer = terminationTimerAttr.getIntValue(); 111 } 112 113 } 114 115 @Override 116 protected void startService() throws Exception { 117 ExecutorService execSrv = null; 118 119 try { 120 if ("fixed".equals(execSrvType)) { 121 execSrv = Executors.newFixedThreadPool(initialCorePoolSize); 122 } else if ("cached".equals(execSrvType)) { 123 execSrv = Executors.newCachedThreadPool(); 124 if (initialCorePoolSize != 0) { 125 ((ThreadPoolExecutor) execSrv) 126 .setCorePoolSize(initialCorePoolSize); 127 } 128 } else if ("scheduled".equals(execSrvType)) { 129 execSrv = Executors.newScheduledThreadPool(initialCorePoolSize); 130 } 131 132 if (null != execSrv) { 133 NameRegistrar.register(getRegistrationName(), execSrv); 134 } else { 135 throw new Exception( 136 "Unable to start service : thread pool executor instance is null"); 137 } 138 } catch (Exception e) { 139 if (null != execSrv) { 140 try { 141 execSrv.shutdownNow(); 142 } catch (Exception ee) { 143 getLog().warn(ee); 144 } 145 } 146 throw e; 147 } 148 } 149 150 @Override 151 protected void stopService() throws Exception { 152 ThreadPoolExecutor execSrv = getThreadPoolExecutor(getName(), 153 ThreadPoolExecutor.class); 154 155 if (null != execSrv) { 156 execSrv.shutdownNow(); 157 158 boolean awaitTermination = execSrv.awaitTermination( 159 terminationTimer, TimeUnit.SECONDS); 160 161 if (awaitTermination) { 162 NameRegistrar.unregister(getRegistrationName()); 163 } else { 164 throw new Exception( 165 String.format( 166 "Unable to shutdown thread pool executor : executor termination delay (%d seconds) has expired", 167 terminationTimer)); 168 } 169 } else { 170 throw new Exception( 171 String.format( 172 "Unable to stop thread pool executor : no executor '%s' found registered under name '%s'", 173 getName(), getRegistrationName())); 174 } 175 } 176 177 /** 178 * Returns the {@link NameRegistrar} key under which this bean's executor is registered. 179 * 180 * @return the registration name (prefix concatenated with this bean's configured name) 181 */ 182 protected String getRegistrationName() { 183 return THREAD_POOL_EXECUTOR__QBEAN_PREFIX + getName(); 184 } 185 186 /** 187 * Returns a required or optional XML attribute, raising a configuration error 188 * with {@code errDesc} as context when a mandatory attribute is missing or empty. 189 * 190 * @param elt source element 191 * @param attrName attribute name to look up 192 * @param mandatory if {@code true}, missing/empty attributes raise an exception 193 * @param errDesc human-readable description appended to the error message 194 * @return the attribute, or {@code null} when not mandatory and absent 195 * @throws ConfigurationException if the attribute is mandatory and missing/empty 196 */ 197 protected Attribute getAttribute(Element elt, String attrName, 198 boolean mandatory, String errDesc) throws ConfigurationException { 199 Attribute attr = elt.getAttribute(attrName); 200 201 if (null == attr || "".equals(attr.getValue().trim())) { 202 if (mandatory) { 203 throw new ConfigurationException(String.format( 204 "'%s' attribute has not been found or is empty %s", 205 XML_CONFIG_ATTR__EXEC_SRV_TYPE, errDesc)); 206 } else { 207 return null; 208 } 209 } else { 210 return attr; 211 } 212 } 213 214 /** 215 * Retrieves a thread pool executor from NameRegistrar given its name. 216 * 217 * @param name bean name (without the {@link #THREAD_POOL_EXECUTOR__QBEAN_PREFIX} prefix) 218 * @return the registered {@link ThreadPoolExecutor} 219 * @throws NotFoundException if no executor is registered under that name 220 */ 221 public static ThreadPoolExecutor getThreadPoolExecutor(java.lang.String name) 222 throws NotFoundException { 223 ThreadPoolExecutor res = null; 224 Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX 225 + name); 226 if (object instanceof ThreadPoolExecutor) { 227 res = (ThreadPoolExecutor) object; 228 } else { 229 throw new NotFoundException(name); 230 } 231 232 return res; 233 } 234 235 /** 236 * Retrieves a thread pool executor from NameRegistrar given its name and expected class. 237 * 238 * @param <T> expected concrete executor type 239 * @param name bean name (without the {@link #THREAD_POOL_EXECUTOR__QBEAN_PREFIX} prefix) 240 * @param clazz expected executor class 241 * @return the registered executor, narrowed to {@code T} 242 * @throws NotFoundException if no executor of the expected class is registered under that name 243 */ 244 @SuppressWarnings("unchecked") 245 public static <T extends ThreadPoolExecutor> T getThreadPoolExecutor( 246 java.lang.String name, Class<T> clazz) throws NotFoundException { 247 T res = null; 248 249 Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX 250 + name); 251 252 if (clazz.isAssignableFrom(object.getClass())) { 253 res = (T) object; 254 } else { 255 throw new NotFoundException(name); 256 } 257 258 return res; 259 } 260 261 /* 262 * (non-Javadoc) 263 * 264 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getExecSrvType() 265 */ 266 public String getExecSrvType() { 267 return execSrvType; 268 } 269 270 /* 271 * (non-Javadoc) 272 * 273 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTerminationTimer() 274 */ 275 public int getTerminationTimer() { 276 return terminationTimer; 277 } 278 279 /* 280 * (non-Javadoc) 281 * 282 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getActiveCount() 283 */ 284 public int getActiveCount() throws NotFoundException { 285 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 286 ThreadPoolExecutor.class); 287 return executorService.getActiveCount(); 288 } 289 290 /* 291 * (non-Javadoc) 292 * 293 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCompletedTaskCount() 294 */ 295 public long getCompletedTaskCount() throws NotFoundException { 296 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 297 ThreadPoolExecutor.class); 298 return executorService.getCompletedTaskCount(); 299 } 300 301 /* 302 * (non-Javadoc) 303 * 304 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCorePoolSize() 305 */ 306 public int getCorePoolSize() throws NotFoundException { 307 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 308 ThreadPoolExecutor.class); 309 return executorService.getCorePoolSize(); 310 } 311 312 /* 313 * (non-Javadoc) 314 * 315 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getKeepAliveTimeMS() 316 */ 317 public long getKeepAliveTimeMS() throws NotFoundException { 318 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 319 ThreadPoolExecutor.class); 320 return executorService.getKeepAliveTime(TimeUnit.MILLISECONDS); 321 } 322 323 /* 324 * (non-Javadoc) 325 * 326 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getLargestPoolSize() 327 */ 328 public int getLargestPoolSize() throws NotFoundException { 329 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 330 ThreadPoolExecutor.class); 331 return executorService.getLargestPoolSize(); 332 } 333 334 /* 335 * (non-Javadoc) 336 * 337 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getMaximumPoolSize() 338 */ 339 public int getMaximumPoolSize() throws NotFoundException { 340 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 341 ThreadPoolExecutor.class); 342 return executorService.getMaximumPoolSize(); 343 } 344 345 /* 346 * (non-Javadoc) 347 * 348 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getPoolSize() 349 */ 350 public int getPoolSize() throws NotFoundException { 351 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 352 ThreadPoolExecutor.class); 353 return executorService.getPoolSize(); 354 } 355 356 /* 357 * (non-Javadoc) 358 * 359 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTaskCount() 360 */ 361 public long getTaskCount() throws NotFoundException { 362 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 363 ThreadPoolExecutor.class); 364 return executorService.getTaskCount(); 365 } 366 367 /* 368 * (non-Javadoc) 369 * 370 * @see org.jpos.q2.qbean.QExecutorServiceMBean#isShutdown() 371 */ 372 public boolean isShutdown() throws NotFoundException { 373 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 374 ThreadPoolExecutor.class); 375 return executorService.isShutdown(); 376 } 377 378 /* 379 * (non-Javadoc) 380 * 381 * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminated() 382 */ 383 public boolean isTerminated() throws NotFoundException { 384 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 385 ThreadPoolExecutor.class); 386 return executorService.isTerminated(); 387 } 388 389 /* 390 * (non-Javadoc) 391 * 392 * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminating() 393 */ 394 public boolean isTerminating() throws NotFoundException { 395 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 396 ThreadPoolExecutor.class); 397 return executorService.isTerminating(); 398 } 399 400 /** 401 * Returns the core pool size requested at configuration time. 402 * 403 * @return the initially configured core pool size 404 */ 405 public int getInitialCorePoolSize() { 406 return initialCorePoolSize; 407 } 408 409 /** 410 * Sets the executor type ({@code fixed}, {@code cached}, {@code scheduled}, or {@code single}). 411 * 412 * @param execSrvType the new executor type 413 */ 414 protected void setExecSrvType(String execSrvType) { 415 this.execSrvType = execSrvType; 416 } 417 418 /** 419 * Sets the initial core pool size used when the executor is created. 420 * 421 * @param initialCorePoolSize core pool size 422 */ 423 protected void setInitialCorePoolSize(int initialCorePoolSize) { 424 this.initialCorePoolSize = initialCorePoolSize; 425 } 426 427 /** 428 * Sets the awaitTermination timeout, in seconds. 429 * 430 * @param terminationTimer timeout used during shutdown 431 */ 432 protected void setTerminationTimer(int terminationTimer) { 433 this.terminationTimer = terminationTimer; 434 } 435 436}