001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.qbean;
020
021import org.jdom2.Attribute;
022import org.jdom2.Element;
023import org.jpos.core.ConfigurationException;
024import org.jpos.q2.QBeanSupport;
025import org.jpos.util.NameRegistrar;
026import org.jpos.util.NameRegistrar.NotFoundException;
027
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032
033/**
034 * A qbean dedicated to thread pool executor creation and registration by Q2
035 * NameRegistrar registry<br>
036 * 
037 * @author dgrandemange
038 */
039public class QThreadPoolExecutor extends QBeanSupport implements
040        QThreadPoolExecutorMBean {
041
042    public static final String THREAD_POOL_EXECUTOR__QBEAN_PREFIX = "thread.pool.executor.";
043
044    public static final String XML_CONFIG_ATTR__EXEC_SRV_TYPE = "type";
045
046    public static final String XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE = "corePoolSize";
047
048    public static final String XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER = "terminationTimer";
049
050    public static final int DEFAULT_TERMINATION_TIMER = 15;
051
052    private String execSrvType;
053
054    private int initialCorePoolSize;
055
056    private int terminationTimer = DEFAULT_TERMINATION_TIMER;
057
058    /**
059     * Handle specific config elements
060     * 
061     * type := "fixed" | "scheduled" | "cached" corePoolSize := integer
062     * (required for "fixed" and "scheduled" kinds, optional for "cached" kind)
063     * 
064     */
065    @Override
066    protected void initService() throws Exception {
067        Element rootElt = this.getPersist();
068
069        Attribute execSrvTypeAttr = getAttribute(rootElt,
070                XML_CONFIG_ATTR__EXEC_SRV_TYPE, true,
071                "(thread pool executor type among {fixed|cached|scheduled|single})");
072        execSrvType = execSrvTypeAttr.getValue().trim();
073
074        if ("fixed".equals(execSrvType)) {
075            Attribute corePoolSizeAttr = getAttribute(rootElt,
076                    XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true,
077                    "(number of threads in the pool)");
078            initialCorePoolSize = corePoolSizeAttr.getIntValue();
079
080        } else if ("cached".equals(execSrvType)) {
081            Attribute corePoolSizeAttr = getAttribute(rootElt,
082                    XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, false,
083                    "(number of threads in the pool)");
084            if (null != corePoolSizeAttr) {
085                initialCorePoolSize = corePoolSizeAttr.getIntValue();
086            }
087
088        } else if ("scheduled".equals(execSrvType)) {
089            Attribute corePoolSizeAttr = getAttribute(rootElt,
090                    XML_CONFIG_ATTR__EXEC_SRV_COREPOOLSIZE, true,
091                    "(number of threads in the pool)");
092            initialCorePoolSize = corePoolSizeAttr.getIntValue();
093
094        } else {
095            throw new ConfigurationException(
096                    "Invalid thread pool executor type '%s' (valid types={fixed|cached|scheduled} )");
097        }
098
099        Attribute terminationTimerAttr = getAttribute(rootElt,
100                XML_CONFIG_ATTR__EXEC_SRV_TERMINATION_TIMER, false,
101                "(termination timer in seconds)");
102        if (null != terminationTimerAttr) {
103            terminationTimer = terminationTimerAttr.getIntValue();
104        }
105
106    }
107
108    @Override
109    protected void startService() throws Exception {
110        ExecutorService execSrv = null;
111
112        try {
113            if ("fixed".equals(execSrvType)) {
114                execSrv = Executors.newFixedThreadPool(initialCorePoolSize);
115            } else if ("cached".equals(execSrvType)) {
116                execSrv = Executors.newCachedThreadPool();
117                if (initialCorePoolSize != 0) {
118                    ((ThreadPoolExecutor) execSrv)
119                            .setCorePoolSize(initialCorePoolSize);
120                }
121            } else if ("scheduled".equals(execSrvType)) {
122                execSrv = Executors.newScheduledThreadPool(initialCorePoolSize);
123            }
124
125            if (null != execSrv) {
126                NameRegistrar.register(getRegistrationName(), execSrv);
127            } else {
128                throw new Exception(
129                        "Unable to start service : thread pool executor instance is null");
130            }
131        } catch (Exception e) {
132            if (null != execSrv) {
133                try {
134                    execSrv.shutdownNow();
135                } catch (Exception ee) {
136                    getLog().warn(ee);
137                }
138            }
139            throw e;
140        }
141    }
142
143    @Override
144    protected void stopService() throws Exception {
145        ThreadPoolExecutor execSrv = getThreadPoolExecutor(getName(),
146                ThreadPoolExecutor.class);
147
148        if (null != execSrv) {
149            execSrv.shutdownNow();
150
151            boolean awaitTermination = execSrv.awaitTermination(
152                    terminationTimer, TimeUnit.SECONDS);
153
154            if (awaitTermination) {
155                NameRegistrar.unregister(getRegistrationName());
156            } else {
157                throw new Exception(
158                        String.format(
159                                "Unable to shutdown thread pool executor : executor termination delay (%d seconds) has expired",
160                                terminationTimer));
161            }
162        } else {
163            throw new Exception(
164                    String.format(
165                            "Unable to stop thread pool executor : no executor '%s' found registered under name '%s'",
166                            getName(), getRegistrationName()));
167        }
168    }
169
170    protected String getRegistrationName() {
171        return THREAD_POOL_EXECUTOR__QBEAN_PREFIX + getName();
172    }
173
174    /**
175     * @param elt
176     * @param attrName
177     * @param mandatory
178     * @param errDesc
179     * @throws ConfigurationException
180     */
181    protected Attribute getAttribute(Element elt, String attrName,
182            boolean mandatory, String errDesc) throws ConfigurationException {
183        Attribute attr = elt.getAttribute(attrName);
184
185        if (null == attr || "".equals(attr.getValue().trim())) {
186            if (mandatory) {
187                throw new ConfigurationException(String.format(
188                        "'%s' attribute has not been found or is empty %s",
189                        XML_CONFIG_ATTR__EXEC_SRV_TYPE, errDesc));
190            } else {
191                return null;
192            }
193        } else {
194            return attr;
195        }
196    }
197
198    /**
199     * Retrieves a thread pool executor from NameRegistrar given its name
200     * 
201     * @param name
202     * @throws NotFoundException
203     */
204    public static ThreadPoolExecutor getThreadPoolExecutor(java.lang.String name)
205            throws NotFoundException {
206        ThreadPoolExecutor res = null;
207        Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX
208                + name);
209        if (object instanceof ThreadPoolExecutor) {
210            res = (ThreadPoolExecutor) object;
211        } else {
212            throw new NotFoundException(name);
213        }
214
215        return res;
216    }
217
218    /**
219     * Retrieves a thread pool executor from NameRegistrar given its name, and
220     * its expected class
221     * 
222     * @param name
223     * @param clazz
224     * @throws NotFoundException
225     */
226    @SuppressWarnings("unchecked")
227    public static <T extends ThreadPoolExecutor> T getThreadPoolExecutor(
228            java.lang.String name, Class<T> clazz) throws NotFoundException {
229        T res = null;
230
231        Object object = NameRegistrar.get(THREAD_POOL_EXECUTOR__QBEAN_PREFIX
232                + name);
233
234        if (clazz.isAssignableFrom(object.getClass())) {
235            res = (T) object;
236        } else {
237            throw new NotFoundException(name);
238        }
239
240        return res;
241    }
242
243    /*
244     * (non-Javadoc)
245     * 
246     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getExecSrvType()
247     */
248    public String getExecSrvType() {
249        return execSrvType;
250    }
251
252    /*
253     * (non-Javadoc)
254     * 
255     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTerminationTimer()
256     */
257    public int getTerminationTimer() {
258        return terminationTimer;
259    }
260
261    /*
262     * (non-Javadoc)
263     * 
264     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getActiveCount()
265     */
266    public int getActiveCount() throws NotFoundException {
267        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
268                ThreadPoolExecutor.class);
269        return executorService.getActiveCount();
270    }
271
272    /*
273     * (non-Javadoc)
274     * 
275     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCompletedTaskCount()
276     */
277    public long getCompletedTaskCount() throws NotFoundException {
278        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
279                ThreadPoolExecutor.class);
280        return executorService.getCompletedTaskCount();
281    }
282
283    /*
284     * (non-Javadoc)
285     * 
286     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getCorePoolSize()
287     */
288    public int getCorePoolSize() throws NotFoundException {
289        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
290                ThreadPoolExecutor.class);
291        return executorService.getCorePoolSize();
292    }
293
294    /*
295     * (non-Javadoc)
296     * 
297     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getKeepAliveTimeMS()
298     */
299    public long getKeepAliveTimeMS() throws NotFoundException {
300        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
301                ThreadPoolExecutor.class);
302        return executorService.getKeepAliveTime(TimeUnit.MILLISECONDS);
303    }
304
305    /*
306     * (non-Javadoc)
307     * 
308     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getLargestPoolSize()
309     */
310    public int getLargestPoolSize() throws NotFoundException {
311        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
312                ThreadPoolExecutor.class);
313        return executorService.getLargestPoolSize();
314    }
315
316    /*
317     * (non-Javadoc)
318     * 
319     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getMaximumPoolSize()
320     */
321    public int getMaximumPoolSize() throws NotFoundException {
322        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
323                ThreadPoolExecutor.class);
324        return executorService.getMaximumPoolSize();
325    }
326
327    /*
328     * (non-Javadoc)
329     * 
330     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getPoolSize()
331     */
332    public int getPoolSize() throws NotFoundException {
333        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
334                ThreadPoolExecutor.class);
335        return executorService.getPoolSize();
336    }
337
338    /*
339     * (non-Javadoc)
340     * 
341     * @see org.jpos.q2.qbean.QExecutorServiceMBean#getTaskCount()
342     */
343    public long getTaskCount() throws NotFoundException {
344        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
345                ThreadPoolExecutor.class);
346        return executorService.getTaskCount();
347    }
348
349    /*
350     * (non-Javadoc)
351     * 
352     * @see org.jpos.q2.qbean.QExecutorServiceMBean#isShutdown()
353     */
354    public boolean isShutdown() throws NotFoundException {
355        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
356                ThreadPoolExecutor.class);
357        return executorService.isShutdown();
358    }
359
360    /*
361     * (non-Javadoc)
362     * 
363     * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminated()
364     */
365    public boolean isTerminated() throws NotFoundException {
366        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
367                ThreadPoolExecutor.class);
368        return executorService.isTerminated();
369    }
370
371    /*
372     * (non-Javadoc)
373     * 
374     * @see org.jpos.q2.qbean.QExecutorServiceMBean#isTerminating()
375     */
376    public boolean isTerminating() throws NotFoundException {
377        ThreadPoolExecutor executorService = getThreadPoolExecutor(getName(),
378                ThreadPoolExecutor.class);
379        return executorService.isTerminating();
380    }
381
382    public int getInitialCorePoolSize() {
383        return initialCorePoolSize;
384    }
385
386    protected void setExecSrvType(String execSrvType) {
387        this.execSrvType = execSrvType;
388    }
389
390    protected void setInitialCorePoolSize(int initialCorePoolSize) {
391        this.initialCorePoolSize = initialCorePoolSize;
392    }
393
394    protected void setTerminationTimer(int terminationTimer) {
395        this.terminationTimer = terminationTimer;
396    }
397
398}