001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023import java.util.concurrent.TimeUnit;
024
025import org.jpos.core.Configurable;
026import org.jpos.core.Configuration;
027import org.jpos.q2.QBeanSupport;
028import org.jpos.q2.QFactory;
029import org.jpos.util.DirPoll;
030import org.jpos.util.LogSource;
031import org.jpos.util.ThreadPool;
032
033/**
034 * DirPoll Adaptor
035 *
036 * @author Alejandro Revilla
037 * @version $Revision$ $Date$
038 */
039public class DirPollAdaptor
040    extends QBeanSupport
041    implements DirPollAdaptorMBean
042{
043    String path, priorities, processorClass;
044    int poolSize;
045    long pollInterval;
046    protected DirPoll dirPoll;
047    protected ExecutorService dirPollExecutor;
048
049    public DirPollAdaptor () {
050        super ();
051        poolSize = 1;
052        pollInterval = 1000;
053    }
054
055    protected void initService () throws Exception {
056        QFactory factory = getServer().getFactory();
057        dirPoll  = createDirPoll();
058        dirPoll.setPath (getPath ());
059        if(cfg.getBoolean("virtual-threads", true)) {
060            dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofVirtual().factory()));
061        }else {
062            dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory()));
063        }
064        dirPoll.setPollInterval (pollInterval);
065        if (priorities != null)
066            dirPoll.setPriorities (priorities);
067        dirPoll.setLogger (getLog().getLogger(), getLog().getRealm ());
068        Configuration cfg = factory.getConfiguration (getPersist());
069        dirPoll.setConfiguration (cfg);
070        dirPoll.createDirs ();
071        Object dpp = factory.newInstance (getProcessor());
072        if (dpp instanceof LogSource) {
073            ((LogSource) dpp).setLogger (
074                getLog().getLogger(), getLog().getRealm ()
075            );
076        }
077        if (dpp instanceof Configurable) {
078            ((Configurable) dpp).setConfiguration (cfg);
079        }
080        dirPoll.setProcessor (dpp);
081    }
082
083    protected DirPoll createDirPoll() {
084        return new DirPoll();
085    }
086
087    protected void startService () throws Exception {
088        if (dirPoll == null) {
089            throw new IllegalStateException("Not initialized!");
090        }
091        synchronized (dirPoll) {
092            dirPollExecutor =  Executors.newVirtualThreadPerTaskExecutor();
093            dirPollExecutor.submit(dirPoll);
094        }
095    }
096
097    protected void stopService () throws Exception {
098        dirPoll.destroy ();
099        synchronized (dirPoll) {
100            long shutdownTimeout = cfg.getLong("shutdown-timeout", 60000);
101            dirPollExecutor.shutdown();
102            try {
103                if (!dirPollExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
104                    dirPollExecutor.shutdownNow();
105                }
106            } catch (InterruptedException e) {
107                Thread.currentThread().interrupt();
108            }
109        }
110    }
111
112
113    public synchronized void setPath (String path) {
114        this.path = path;
115        setModified (true);
116    }
117
118    public synchronized void setPoolSize (int size) {
119        this.poolSize = size;
120        setModified (true);
121    }
122
123    public int getPoolSize () {
124        return poolSize;
125    }
126
127    public String getPath () {
128        return path == null ? "." : path;
129    }
130
131    public synchronized void setPollInterval (long pollInterval) {
132        this.pollInterval = pollInterval;
133        setModified (true);
134    }
135
136    public long getPollInterval () {
137        return pollInterval;
138    }
139
140    public synchronized void setPriorities (String priorities) {
141        this.priorities = priorities;
142        setModified (true);
143    }
144
145    public String getPriorities () {
146        return priorities;
147    }
148
149    public synchronized void setProcessor (String processor) {
150        this.processorClass = processor;
151        setModified (true);
152    }
153
154    public String getProcessor() {
155        return processorClass;
156    }
157}