001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Executors; 023import java.util.concurrent.TimeUnit; 024 025import org.jpos.core.Configurable; 026import org.jpos.core.Configuration; 027import org.jpos.q2.QBeanSupport; 028import org.jpos.q2.QFactory; 029import org.jpos.util.DirPoll; 030import org.jpos.util.LogSource; 031import org.jpos.util.ThreadPool; 032 033/** 034 * DirPoll Adaptor 035 * 036 * @author Alejandro Revilla 037 * @version $Revision$ $Date$ 038 */ 039public class DirPollAdaptor 040 extends QBeanSupport 041 implements DirPollAdaptorMBean 042{ 043 /** Base directory path and configuration strings. */ 044 String path, priorities, processorClass; 045 /** Thread pool size for the DirPoll executor. */ 046 int poolSize; 047 /** Polling interval in milliseconds. */ 048 long pollInterval; 049 /** The managed DirPoll instance. */ 050 protected DirPoll dirPoll; 051 /** The executor service driving the DirPoll. */ 052 protected ExecutorService dirPollExecutor; 053 054 /** Default constructor. */ 055 public DirPollAdaptor () { 056 super (); 057 poolSize = 1; 058 pollInterval = 1000; 059 } 060 061 protected void initService () throws Exception { 062 QFactory factory = getServer().getFactory(); 063 dirPoll = createDirPoll(); 064 dirPoll.setPath (getPath ()); 065 if(cfg.getBoolean("virtual-threads", true)) { 066 dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofVirtual().factory())); 067 }else { 068 dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory())); 069 } 070 dirPoll.setPollInterval (pollInterval); 071 if (priorities != null) 072 dirPoll.setPriorities (priorities); 073 dirPoll.setLogger (getLog().getLogger(), getLog().getRealm ()); 074 factory.setConfiguration(dirPoll, getPersist()); 075 dirPoll.createDirs (); 076 Object dpp = factory.newInstance (getProcessor()); 077 if (dpp instanceof LogSource) { 078 ((LogSource) dpp).setLogger ( 079 getLog().getLogger(), getLog().getRealm () 080 ); 081 } 082 factory.setConfiguration(dpp, getPersist()); 083 dirPoll.setProcessor (dpp); 084 } 085 086 /** 087 * Creates and returns the {@link DirPoll} instance managed by this adaptor. 088 * Subclasses may override to supply a custom DirPoll. 089 * @return a new DirPoll instance 090 */ 091 protected DirPoll createDirPoll() { 092 return new DirPoll(); 093 } 094 095 protected void startService () throws Exception { 096 if (dirPoll == null) { 097 throw new IllegalStateException("Not initialized!"); 098 } 099 synchronized (dirPoll) { 100 dirPollExecutor = Executors.newVirtualThreadPerTaskExecutor(); 101 dirPollExecutor.submit(dirPoll); 102 } 103 } 104 105 protected void stopService () throws Exception { 106 dirPoll.destroy (); 107 synchronized (dirPoll) { 108 long shutdownTimeout = cfg.getLong("shutdown-timeout", 60000); 109 dirPollExecutor.shutdown(); 110 try { 111 if (!dirPollExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 112 dirPollExecutor.shutdownNow(); 113 } 114 } catch (InterruptedException e) { 115 Thread.currentThread().interrupt(); 116 } 117 } 118 } 119 120 121 public synchronized void setPath (String path) { 122 this.path = path; 123 setModified (true); 124 } 125 126 public synchronized void setPoolSize (int size) { 127 this.poolSize = size; 128 setModified (true); 129 } 130 131 public int getPoolSize () { 132 return poolSize; 133 } 134 135 public String getPath () { 136 return path == null ? "." : path; 137 } 138 139 public synchronized void setPollInterval (long pollInterval) { 140 this.pollInterval = pollInterval; 141 setModified (true); 142 } 143 144 public long getPollInterval () { 145 return pollInterval; 146 } 147 148 public synchronized void setPriorities (String priorities) { 149 this.priorities = priorities; 150 setModified (true); 151 } 152 153 public String getPriorities () { 154 return priorities; 155 } 156 157 public synchronized void setProcessor (String processor) { 158 this.processorClass = processor; 159 setModified (true); 160 } 161 162 public String getProcessor() { 163 return processorClass; 164 } 165}