001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023import java.util.concurrent.TimeUnit;
024
025import org.jpos.core.Configurable;
026import org.jpos.core.Configuration;
027import org.jpos.q2.QBeanSupport;
028import org.jpos.q2.QFactory;
029import org.jpos.util.DirPoll;
030import org.jpos.util.LogSource;
031import org.jpos.util.ThreadPool;
032
033/**
034 * DirPoll Adaptor
035 *
036 * @author Alejandro Revilla
037 * @version $Revision$ $Date$
038 */
039public class DirPollAdaptor
040    extends QBeanSupport
041    implements DirPollAdaptorMBean
042{
043    /** Base directory path and configuration strings. */
044    String path, priorities, processorClass;
045    /** Thread pool size for the DirPoll executor. */
046    int poolSize;
047    /** Polling interval in milliseconds. */
048    long pollInterval;
049    /** The managed DirPoll instance. */
050    protected DirPoll dirPoll;
051    /** The executor service driving the DirPoll. */
052    protected ExecutorService dirPollExecutor;
053
054    /** Default constructor. */
055    public DirPollAdaptor () {
056        super ();
057        poolSize = 1;
058        pollInterval = 1000;
059    }
060
061    protected void initService () throws Exception {
062        QFactory factory = getServer().getFactory();
063        dirPoll  = createDirPoll();
064        dirPoll.setPath (getPath ());
065        if(cfg.getBoolean("virtual-threads", true)) {
066            dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofVirtual().factory()));
067        }else {
068            dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory()));
069        }
070        dirPoll.setPollInterval (pollInterval);
071        if (priorities != null)
072            dirPoll.setPriorities (priorities);
073        dirPoll.setLogger (getLog().getLogger(), getLog().getRealm ());
074        factory.setConfiguration(dirPoll, getPersist());
075        dirPoll.createDirs ();
076        Object dpp = factory.newInstance (getProcessor());
077        if (dpp instanceof LogSource) {
078            ((LogSource) dpp).setLogger (
079                getLog().getLogger(), getLog().getRealm ()
080            );
081        }
082        factory.setConfiguration(dpp, getPersist());
083        dirPoll.setProcessor (dpp);
084    }
085
086    /**
087     * Creates and returns the {@link DirPoll} instance managed by this adaptor.
088     * Subclasses may override to supply a custom DirPoll.
089     * @return a new DirPoll instance
090     */
091    protected DirPoll createDirPoll() {
092        return new DirPoll();
093    }
094
095    protected void startService () throws Exception {
096        if (dirPoll == null) {
097            throw new IllegalStateException("Not initialized!");
098        }
099        synchronized (dirPoll) {
100            dirPollExecutor =  Executors.newVirtualThreadPerTaskExecutor();
101            dirPollExecutor.submit(dirPoll);
102        }
103    }
104
105    protected void stopService () throws Exception {
106        dirPoll.destroy ();
107        synchronized (dirPoll) {
108            long shutdownTimeout = cfg.getLong("shutdown-timeout", 60000);
109            dirPollExecutor.shutdown();
110            try {
111                if (!dirPollExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
112                    dirPollExecutor.shutdownNow();
113                }
114            } catch (InterruptedException e) {
115                Thread.currentThread().interrupt();
116            }
117        }
118    }
119
120
121    public synchronized void setPath (String path) {
122        this.path = path;
123        setModified (true);
124    }
125
126    public synchronized void setPoolSize (int size) {
127        this.poolSize = size;
128        setModified (true);
129    }
130
131    public int getPoolSize () {
132        return poolSize;
133    }
134
135    public String getPath () {
136        return path == null ? "." : path;
137    }
138
139    public synchronized void setPollInterval (long pollInterval) {
140        this.pollInterval = pollInterval;
141        setModified (true);
142    }
143
144    public long getPollInterval () {
145        return pollInterval;
146    }
147
148    public synchronized void setPriorities (String priorities) {
149        this.priorities = priorities;
150        setModified (true);
151    }
152
153    public String getPriorities () {
154        return priorities;
155    }
156
157    public synchronized void setProcessor (String processor) {
158        this.processorClass = processor;
159        setModified (true);
160    }
161
162    public String getProcessor() {
163        return processorClass;
164    }
165}