001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.qbean;
020
021import org.jdom2.Attribute;
022import org.jdom2.Element;
023import org.jpos.core.ConfigurationException;
024import org.jpos.q2.QBeanSupport;
025import org.jpos.util.NameRegistrar;
026import org.jpos.util.NameRegistrar.NotFoundException;
027
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032
033/**
034 * A qbean dedicated to thread pool executor creation and registration by Q2
035 * NameRegistrar registry<br>
036 * 
037 * @author dgrandemange
038 */
039public class QThreadPoolExecutor extends QBeanSupport implements
040        QThreadPoolExecutorMBean {
041    /** Default constructor; no instance state to initialise. */
042    public QThreadPoolExecutor() {}
043
044    /** Prefix used when registering pooled executors in {@link NameRegistrar}. */
045    public static final String THREAD_POOL_EXECUTOR__QBEAN_PREFIX = "thread.pool.executor.";
046
047    /** XML attribute selecting the executor type ({@code fixed}, {@code cached}, {@code scheduled}, {@code single}). */
048    public static final String XML_CONFIG_ATTR__EXEC_SRV_TYPE = "type";
049
050    /** XML attribute setting the executor's core pool size. */
051    public static final String XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE = "corePoolSize";
052
053    /** XML attribute setting the awaitTermination timeout, in seconds. */
054    public static final String XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER = "terminationTimer";
055
056    /** Default {@link #XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER} value, in seconds. */
057    public static final int DEFAULT_TERMINATION_TIMER = 15;
058
059    private String execSrvType;
060
061    private int initialCorePoolSize;
062
063    private int terminationTimer = DEFAULT_TERMINATION_TIMER;
064
065    /**
066     * Handle specific config elements
067     * 
068     * type := "fixed" | "scheduled" | "cached" corePoolSize := integer
069     * (required for "fixed" and "scheduled" kinds, optional for "cached" kind)
070     * 
071     */
072    @Override
073    protected void initService() throws Exception {
074        Element rootElt = this.getPersist();
075
076        Attribute execSrvTypeAttr = getAttribute(rootElt,
077                XML_CONFIG_ATTR__EXEC_SRV_TYPE, true,
078                "(thread pool executor type among {fixed|cached|scheduled|single})");
079        execSrvType = execSrvTypeAttr.getValue().trim();
080
081        if ("fixed".equals(execSrvType)) {
082            Attribute corePoolSizeAttr = getAttribute(rootElt,
083                    XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true,
084                    "(number of threads in the pool)");
085            initialCorePoolSize = corePoolSizeAttr.getIntValue();
086
087        } else if ("cached".equals(execSrvType)) {
088            Attribute corePoolSizeAttr = getAttribute(rootElt,
089                    XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, false,
090                    "(number of threads in the pool)");
091            if (null != corePoolSizeAttr) {
092                initialCorePoolSize = corePoolSizeAttr.getIntValue();
093            }
094
095        } else if ("scheduled".equals(execSrvType)) {
096            Attribute corePoolSizeAttr = getAttribute(rootElt,
097                    XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true,
098                    "(number of threads in the pool)");
099            initialCorePoolSize = corePoolSizeAttr.getIntValue();
100
101        } else {
102            throw new ConfigurationException(
103                    "Invalid thread pool executor type '%s' (valid types={fixed|cached|scheduled} )");
104        }
105
106        Attribute terminationTimerAttr = getAttribute(rootElt,
107                XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER, false,
108                "(termination timer in seconds)");
109        if (null != terminationTimerAttr) {
110            terminationTimer = terminationTimerAttr.getIntValue();
111        }
112
113    }
114
115    @Override
116    protected void startService() throws Exception {
117        ExecutorService execSrv = null;
118
119        try {
120            if ("fixed".equals(execSrvType)) {
121                execSrv = Executors.newFixedThreadPool(initialCorePoolSize);
122            } else if ("cached".equals(execSrvType)) {
123                execSrv = Executors.newCachedThreadPool();
124                if (initialCorePoolSize != 0) {
125                    ((ThreadPoolExecutor) execSrv)
126                            .setCorePoolSize(initialCorePoolSize);
127                }
128            } else if ("scheduled".equals(execSrvType)) {
129                execSrv = Executors.newScheduledThreadPool(initialCorePoolSize);
130            }
131
132            if (null != execSrv) {
133                NameRegistrar.register(getRegistrationName(), execSrv);
134            } else {
135                throw new Exception(
136                        "Unable to start service : thread pool executor instance is null");
137            }
138        } catch (Exception e) {
139            if (null != execSrv) {
140                try {
141                    execSrv.shutdownNow();
142                } catch (Exception ee) {
143                    getLog().warn(ee);
144                }
145            }
146            throw e;
147        }
148    }
149
150    @Override
151    protected void stopService() throws Exception {
152        ThreadPoolExecutor execSrv = getThreadPoolExecutor(getName(),
153                ThreadPoolExecutor.class);
154
155        if (null != execSrv) {
156            execSrv.shutdownNow();
157
158            boolean awaitTermination = execSrv.awaitTermination(
159                    terminationTimer, TimeUnit.SECONDS);
160
161            if (awaitTermination) {
162                NameRegistrar.unregister(getRegistrationName());
163            } else {
164                throw new Exception(
165                        String.format(
166                                "Unable to shutdown thread pool executor : executor termination delay (%d seconds) has expired",
167                                terminationTimer));
168            }
169        } else {
170            throw new Exception(
171                    String.format(
172                            "Unable to stop thread pool executor : no executor '%s' found registered under name '%s'",
173                            getName(), getRegistrationName()));
174        }
175    }
176
177    /**
178     * Returns the {@link NameRegistrar} key under which this bean's executor is registered.
179     *
180     * @return the registration name (prefix concatenated with this bean's configured name)
181     */
182    protected String getRegistrationName() {
183        return THREAD_POOL_EXECUTOR__QBEAN_PREFIX + getName();
184    }
185
186    /**
187     * Returns a required or optional XML attribute, raising a configuration error
188     * with {@code errDesc} as context when a mandatory attribute is missing or empty.
189     *
190     * @param elt source element
191     * @param attrName attribute name to look up
192     * @param mandatory if {@code true}, missing/empty attributes raise an exception
193     * @param errDesc human-readable description appended to the error message
194     * @return the attribute, or {@code null} when not mandatory and absent
195     * @throws ConfigurationException if the attribute is mandatory and missing/empty
196     */
197    protected Attribute getAttribute(Element elt, String attrName,
198            boolean mandatory, String errDesc) throws ConfigurationException {
199        Attribute attr = elt.getAttribute(attrName);
200
201        if (null == attr || "".equals(attr.getValue().trim())) {
202            if (mandatory) {
203                throw new ConfigurationException(String.format(
204                        "'%s' attribute has not been found or is empty %s",
205                        XML_CONFIG_ATTR__EXEC_SRV_TYPE, errDesc));
206            } else {
207                return null;
208            }
209        } else {
210            return attr;
211        }
212    }
213
214    /**
215     * Retrieves a thread pool executor from NameRegistrar given its name.
216     *
217     * @param name bean name (without the {@link #THREAD_POOL_EXECUTOR__QBEAN_PREFIX} prefix)
218     * @return the registered {@link ThreadPoolExecutor}
219     * @throws NotFoundException if no executor is registered under that name
220     */
221    public static ThreadPoolExecutor getThreadPoolExecutor(java.lang.String name)
222            throws NotFoundException {
223        ThreadPoolExecutor res = null;
224        Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX
225                + name);
226        if (object instanceof ThreadPoolExecutor) {
227            res = (ThreadPoolExecutor) object;
228        } else {
229            throw new NotFoundException(name);
230        }
231
232        return res;
233    }
234
235    /**
236     * Retrieves a thread pool executor from NameRegistrar given its name and expected class.
237     *
238     * @param <T> expected concrete executor type
239     * @param name bean name (without the {@link #THREAD_POOL_EXECUTOR__QBEAN_PREFIX} prefix)
240     * @param clazz expected executor class
241     * @return the registered executor, narrowed to {@code T}
242     * @throws NotFoundException if no executor of the expected class is registered under that name
243     */
244    @SuppressWarnings("unchecked")
245    public static <T extends ThreadPoolExecutor> T getThreadPoolExecutor(
246            java.lang.String name, Class<T> clazz) throws NotFoundException {
247        T res = null;
248
249        Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX
250                + name);
251
252        if (clazz.isAssignableFrom(object.getClass())) {
253            res = (T) object;
254        } else {
255            throw new NotFoundException(name);
256        }
257
258        return res;
259    }
260
261    /*
262     * (non-Javadoc)
263     * 
264     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getExecSrvType()
265     */
266    public String getExecSrvType() {
267        return execSrvType;
268    }
269
270    /*
271     * (non-Javadoc)
272     * 
273     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTerminationTimer()
274     */
275    public int getTerminationTimer() {
276        return terminationTimer;
277    }
278
279    /*
280     * (non-Javadoc)
281     * 
282     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getActiveCount()
283     */
284    public int getActiveCount() throws NotFoundException {
285        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
286                ThreadPoolExecutor.class);
287        return executorService.getActiveCount();
288    }
289
290    /*
291     * (non-Javadoc)
292     * 
293     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCompletedTaskCount()
294     */
295    public long getCompletedTaskCount() throws NotFoundException {
296        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
297                ThreadPoolExecutor.class);
298        return executorService.getCompletedTaskCount();
299    }
300
301    /*
302     * (non-Javadoc)
303     * 
304     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCorePoolSize()
305     */
306    public int getCorePoolSize() throws NotFoundException {
307        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
308                ThreadPoolExecutor.class);
309        return executorService.getCorePoolSize();
310    }
311
312    /*
313     * (non-Javadoc)
314     * 
315     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getKeepAliveTimeMS()
316     */
317    public long getKeepAliveTimeMS() throws NotFoundException {
318        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
319                ThreadPoolExecutor.class);
320        return executorService.getKeepAliveTime(TimeUnit.MILLISECONDS);
321    }
322
323    /*
324     * (non-Javadoc)
325     * 
326     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getLargestPoolSize()
327     */
328    public int getLargestPoolSize() throws NotFoundException {
329        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
330                ThreadPoolExecutor.class);
331        return executorService.getLargestPoolSize();
332    }
333
334    /*
335     * (non-Javadoc)
336     * 
337     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getMaximumPoolSize()
338     */
339    public int getMaximumPoolSize() throws NotFoundException {
340        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
341                ThreadPoolExecutor.class);
342        return executorService.getMaximumPoolSize();
343    }
344
345    /*
346     * (non-Javadoc)
347     * 
348     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getPoolSize()
349     */
350    public int getPoolSize() throws NotFoundException {
351        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
352                ThreadPoolExecutor.class);
353        return executorService.getPoolSize();
354    }
355
356    /*
357     * (non-Javadoc)
358     * 
359     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTaskCount()
360     */
361    public long getTaskCount() throws NotFoundException {
362        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
363                ThreadPoolExecutor.class);
364        return executorService.getTaskCount();
365    }
366
367    /*
368     * (non-Javadoc)
369     * 
370     * @see org.jpos.q2.qbean.QExecutorServiceMBean#isShutdown()
371     */
372    public boolean isShutdown() throws NotFoundException {
373        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
374                ThreadPoolExecutor.class);
375        return executorService.isShutdown();
376    }
377
378    /*
379     * (non-Javadoc)
380     * 
381     * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminated()
382     */
383    public boolean isTerminated() throws NotFoundException {
384        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
385                ThreadPoolExecutor.class);
386        return executorService.isTerminated();
387    }
388
389    /*
390     * (non-Javadoc)
391     * 
392     * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminating()
393     */
394    public boolean isTerminating() throws NotFoundException {
395        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
396                ThreadPoolExecutor.class);
397        return executorService.isTerminating();
398    }
399
400    /**
401     * Returns the core pool size requested at configuration time.
402     *
403     * @return the initially configured core pool size
404     */
405    public int getInitialCorePoolSize() {
406        return initialCorePoolSize;
407    }
408
409    /**
410     * Sets the executor type ({@code fixed}, {@code cached}, {@code scheduled}, or {@code single}).
411     *
412     * @param execSrvType the new executor type
413     */
414    protected void setExecSrvType(String execSrvType) {
415        this.execSrvType = execSrvType;
416    }
417
418    /**
419     * Sets the initial core pool size used when the executor is created.
420     *
421     * @param initialCorePoolSize core pool size
422     */
423    protected void setInitialCorePoolSize(int initialCorePoolSize) {
424        this.initialCorePoolSize = initialCorePoolSize;
425    }
426
427    /**
428     * Sets the awaitTermination timeout, in seconds.
429     *
430     * @param terminationTimer timeout used during shutdown
431     */
432    protected void setTerminationTimer(int terminationTimer) {
433        this.terminationTimer = terminationTimer;
434    }
435
436}