001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.qbean; 020 021import org.jdom2.Attribute; 022import org.jdom2.Element; 023import org.jpos.core.ConfigurationException; 024import org.jpos.q2.QBeanSupport; 025import org.jpos.util.NameRegistrar; 026import org.jpos.util.NameRegistrar.NotFoundException; 027 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032 033/** 034 * A qbean dedicated to thread pool executor creation and registration by Q2 035 * NameRegistrar registry<br> 036 * 037 * @author dgrandemange 038 */ 039public class QThreadPoolExecutor extends QBeanSupport implements 040 QThreadPoolExecutorMBean { 041 042 public static final String THREAD_POOL_EXECUTOR__QBEAN_PREFIX = "thread.pool.executor."; 043 044 public static final String XML_CONFIG_ATTR__EXEC_SRV_TYPE = "type"; 045 046 public static final String XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE = "corePoolSize"; 047 048 public static final String XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER = "terminationTimer"; 049 050 public static final int DEFAULT_TERMINATION_TIMER = 15; 051 052 private String execSrvType; 053 054 private int initialCorePoolSize; 055 056 private int terminationTimer = DEFAULT_TERMINATION_TIMER; 057 058 /** 059 * Handle specific config elements 060 * 061 * type := "fixed" | "scheduled" | "cached" corePoolSize := integer 062 * (required for "fixed" and "scheduled" kinds, optional for "cached" kind) 063 * 064 */ 065 @Override 066 protected void initService() throws Exception { 067 Element rootElt = this.getPersist(); 068 069 Attribute execSrvTypeAttr = getAttribute(rootElt, 070 XML_CONFIG_ATTR__EXEC_SRV_TYPE, true, 071 "(thread pool executor type among {fixed|cached|scheduled|single})"); 072 execSrvType = execSrvTypeAttr.getValue().trim(); 073 074 if ("fixed".equals(execSrvType)) { 075 Attribute corePoolSizeAttr = getAttribute(rootElt, 076 XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true, 077 "(number of threads in the pool)"); 078 initialCorePoolSize = corePoolSizeAttr.getIntValue(); 079 080 } else if ("cached".equals(execSrvType)) { 081 Attribute corePoolSizeAttr = getAttribute(rootElt, 082 XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, false, 083 "(number of threads in the pool)"); 084 if (null != corePoolSizeAttr) { 085 initialCorePoolSize = corePoolSizeAttr.getIntValue(); 086 } 087 088 } else if ("scheduled".equals(execSrvType)) { 089 Attribute corePoolSizeAttr = getAttribute(rootElt, 090 XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true, 091 "(number of threads in the pool)"); 092 initialCorePoolSize = corePoolSizeAttr.getIntValue(); 093 094 } else { 095 throw new ConfigurationException( 096 "Invalid thread pool executor type '%s' (valid types={fixed|cached|scheduled} )"); 097 } 098 099 Attribute terminationTimerAttr = getAttribute(rootElt, 100 XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER, false, 101 "(termination timer in seconds)"); 102 if (null != terminationTimerAttr) { 103 terminationTimer = terminationTimerAttr.getIntValue(); 104 } 105 106 } 107 108 @Override 109 protected void startService() throws Exception { 110 ExecutorService execSrv = null; 111 112 try { 113 if ("fixed".equals(execSrvType)) { 114 execSrv = Executors.newFixedThreadPool(initialCorePoolSize); 115 } else if ("cached".equals(execSrvType)) { 116 execSrv = Executors.newCachedThreadPool(); 117 if (initialCorePoolSize != 0) { 118 ((ThreadPoolExecutor) execSrv) 119 .setCorePoolSize(initialCorePoolSize); 120 } 121 } else if ("scheduled".equals(execSrvType)) { 122 execSrv = Executors.newScheduledThreadPool(initialCorePoolSize); 123 } 124 125 if (null != execSrv) { 126 NameRegistrar.register(getRegistrationName(), execSrv); 127 } else { 128 throw new Exception( 129 "Unable to start service : thread pool executor instance is null"); 130 } 131 } catch (Exception e) { 132 if (null != execSrv) { 133 try { 134 execSrv.shutdownNow(); 135 } catch (Exception ee) { 136 getLog().warn(ee); 137 } 138 } 139 throw e; 140 } 141 } 142 143 @Override 144 protected void stopService() throws Exception { 145 ThreadPoolExecutor execSrv = getThreadPoolExecutor(getName(), 146 ThreadPoolExecutor.class); 147 148 if (null != execSrv) { 149 execSrv.shutdownNow(); 150 151 boolean awaitTermination = execSrv.awaitTermination( 152 terminationTimer, TimeUnit.SECONDS); 153 154 if (awaitTermination) { 155 NameRegistrar.unregister(getRegistrationName()); 156 } else { 157 throw new Exception( 158 String.format( 159 "Unable to shutdown thread pool executor : executor termination delay (%d seconds) has expired", 160 terminationTimer)); 161 } 162 } else { 163 throw new Exception( 164 String.format( 165 "Unable to stop thread pool executor : no executor '%s' found registered under name '%s'", 166 getName(), getRegistrationName())); 167 } 168 } 169 170 protected String getRegistrationName() { 171 return THREAD_POOL_EXECUTOR__QBEAN_PREFIX + getName(); 172 } 173 174 /** 175 * @param elt 176 * @param attrName 177 * @param mandatory 178 * @param errDesc 179 * @throws ConfigurationException 180 */ 181 protected Attribute getAttribute(Element elt, String attrName, 182 boolean mandatory, String errDesc) throws ConfigurationException { 183 Attribute attr = elt.getAttribute(attrName); 184 185 if (null == attr || "".equals(attr.getValue().trim())) { 186 if (mandatory) { 187 throw new ConfigurationException(String.format( 188 "'%s' attribute has not been found or is empty %s", 189 XML_CONFIG_ATTR__EXEC_SRV_TYPE, errDesc)); 190 } else { 191 return null; 192 } 193 } else { 194 return attr; 195 } 196 } 197 198 /** 199 * Retrieves a thread pool executor from NameRegistrar given its name 200 * 201 * @param name 202 * @throws NotFoundException 203 */ 204 public static ThreadPoolExecutor getThreadPoolExecutor(java.lang.String name) 205 throws NotFoundException { 206 ThreadPoolExecutor res = null; 207 Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX 208 + name); 209 if (object instanceof ThreadPoolExecutor) { 210 res = (ThreadPoolExecutor) object; 211 } else { 212 throw new NotFoundException(name); 213 } 214 215 return res; 216 } 217 218 /** 219 * Retrieves a thread pool executor from NameRegistrar given its name, and 220 * its expected class 221 * 222 * @param name 223 * @param clazz 224 * @throws NotFoundException 225 */ 226 @SuppressWarnings("unchecked") 227 public static <T extends ThreadPoolExecutor> T getThreadPoolExecutor( 228 java.lang.String name, Class<T> clazz) throws NotFoundException { 229 T res = null; 230 231 Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX 232 + name); 233 234 if (clazz.isAssignableFrom(object.getClass())) { 235 res = (T) object; 236 } else { 237 throw new NotFoundException(name); 238 } 239 240 return res; 241 } 242 243 /* 244 * (non-Javadoc) 245 * 246 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getExecSrvType() 247 */ 248 public String getExecSrvType() { 249 return execSrvType; 250 } 251 252 /* 253 * (non-Javadoc) 254 * 255 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTerminationTimer() 256 */ 257 public int getTerminationTimer() { 258 return terminationTimer; 259 } 260 261 /* 262 * (non-Javadoc) 263 * 264 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getActiveCount() 265 */ 266 public int getActiveCount() throws NotFoundException { 267 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 268 ThreadPoolExecutor.class); 269 return executorService.getActiveCount(); 270 } 271 272 /* 273 * (non-Javadoc) 274 * 275 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCompletedTaskCount() 276 */ 277 public long getCompletedTaskCount() throws NotFoundException { 278 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 279 ThreadPoolExecutor.class); 280 return executorService.getCompletedTaskCount(); 281 } 282 283 /* 284 * (non-Javadoc) 285 * 286 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCorePoolSize() 287 */ 288 public int getCorePoolSize() throws NotFoundException { 289 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 290 ThreadPoolExecutor.class); 291 return executorService.getCorePoolSize(); 292 } 293 294 /* 295 * (non-Javadoc) 296 * 297 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getKeepAliveTimeMS() 298 */ 299 public long getKeepAliveTimeMS() throws NotFoundException { 300 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 301 ThreadPoolExecutor.class); 302 return executorService.getKeepAliveTime(TimeUnit.MILLISECONDS); 303 } 304 305 /* 306 * (non-Javadoc) 307 * 308 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getLargestPoolSize() 309 */ 310 public int getLargestPoolSize() throws NotFoundException { 311 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 312 ThreadPoolExecutor.class); 313 return executorService.getLargestPoolSize(); 314 } 315 316 /* 317 * (non-Javadoc) 318 * 319 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getMaximumPoolSize() 320 */ 321 public int getMaximumPoolSize() throws NotFoundException { 322 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 323 ThreadPoolExecutor.class); 324 return executorService.getMaximumPoolSize(); 325 } 326 327 /* 328 * (non-Javadoc) 329 * 330 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getPoolSize() 331 */ 332 public int getPoolSize() throws NotFoundException { 333 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 334 ThreadPoolExecutor.class); 335 return executorService.getPoolSize(); 336 } 337 338 /* 339 * (non-Javadoc) 340 * 341 * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTaskCount() 342 */ 343 public long getTaskCount() throws NotFoundException { 344 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 345 ThreadPoolExecutor.class); 346 return executorService.getTaskCount(); 347 } 348 349 /* 350 * (non-Javadoc) 351 * 352 * @see org.jpos.q2.qbean.QExecutorServiceMBean#isShutdown() 353 */ 354 public boolean isShutdown() throws NotFoundException { 355 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 356 ThreadPoolExecutor.class); 357 return executorService.isShutdown(); 358 } 359 360 /* 361 * (non-Javadoc) 362 * 363 * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminated() 364 */ 365 public boolean isTerminated() throws NotFoundException { 366 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 367 ThreadPoolExecutor.class); 368 return executorService.isTerminated(); 369 } 370 371 /* 372 * (non-Javadoc) 373 * 374 * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminating() 375 */ 376 public boolean isTerminating() throws NotFoundException { 377 ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(), 378 ThreadPoolExecutor.class); 379 return executorService.isTerminating(); 380 } 381 382 public int getInitialCorePoolSize() { 383 return initialCorePoolSize; 384 } 385 386 protected void setExecSrvType(String execSrvType) { 387 this.execSrvType = execSrvType; 388 } 389 390 protected void setInitialCorePoolSize(int initialCorePoolSize) { 391 this.initialCorePoolSize = initialCorePoolSize; 392 } 393 394 protected void setTerminationTimer(int terminationTimer) { 395 this.terminationTimer = terminationTimer; 396 } 397 398}