001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Executors; 023import java.util.concurrent.TimeUnit; 024 025import org.jpos.core.Configurable; 026import org.jpos.core.Configuration; 027import org.jpos.q2.QBeanSupport; 028import org.jpos.q2.QFactory; 029import org.jpos.util.DirPoll; 030import org.jpos.util.LogSource; 031import org.jpos.util.ThreadPool; 032 033/** 034 * DirPoll Adaptor 035 * 036 * @author Alejandro Revilla 037 * @version $Revision$ $Date$ 038 */ 039public class DirPollAdaptor 040 extends QBeanSupport 041 implements DirPollAdaptorMBean 042{ 043 String path, priorities, processorClass; 044 int poolSize; 045 long pollInterval; 046 protected DirPoll dirPoll; 047 protected ExecutorService dirPollExecutor; 048 049 public DirPollAdaptor () { 050 super (); 051 poolSize = 1; 052 pollInterval = 1000; 053 } 054 055 protected void initService () throws Exception { 056 QFactory factory = getServer().getFactory(); 057 dirPoll = createDirPoll(); 058 dirPoll.setPath (getPath ()); 059 if(cfg.getBoolean("virtual-threads", true)) { 060 dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofVirtual().factory())); 061 }else { 062 dirPoll.setThreadPool(Executors.newFixedThreadPool(poolSize, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory())); 063 } 064 dirPoll.setPollInterval (pollInterval); 065 if (priorities != null) 066 dirPoll.setPriorities (priorities); 067 dirPoll.setLogger (getLog().getLogger(), getLog().getRealm ()); 068 Configuration cfg = factory.getConfiguration (getPersist()); 069 dirPoll.setConfiguration (cfg); 070 dirPoll.createDirs (); 071 Object dpp = factory.newInstance (getProcessor()); 072 if (dpp instanceof LogSource) { 073 ((LogSource) dpp).setLogger ( 074 getLog().getLogger(), getLog().getRealm () 075 ); 076 } 077 if (dpp instanceof Configurable) { 078 ((Configurable) dpp).setConfiguration (cfg); 079 } 080 dirPoll.setProcessor (dpp); 081 } 082 083 protected DirPoll createDirPoll() { 084 return new DirPoll(); 085 } 086 087 protected void startService () throws Exception { 088 if (dirPoll == null) { 089 throw new IllegalStateException("Not initialized!"); 090 } 091 synchronized (dirPoll) { 092 dirPollExecutor = Executors.newVirtualThreadPerTaskExecutor(); 093 dirPollExecutor.submit(dirPoll); 094 } 095 } 096 097 protected void stopService () throws Exception { 098 dirPoll.destroy (); 099 synchronized (dirPoll) { 100 long shutdownTimeout = cfg.getLong("shutdown-timeout", 60000); 101 dirPollExecutor.shutdown(); 102 try { 103 if (!dirPollExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { 104 dirPollExecutor.shutdownNow(); 105 } 106 } catch (InterruptedException e) { 107 Thread.currentThread().interrupt(); 108 } 109 } 110 } 111 112 113 public synchronized void setPath (String path) { 114 this.path = path; 115 setModified (true); 116 } 117 118 public synchronized void setPoolSize (int size) { 119 this.poolSize = size; 120 setModified (true); 121 } 122 123 public int getPoolSize () { 124 return poolSize; 125 } 126 127 public String getPath () { 128 return path == null ? "." : path; 129 } 130 131 public synchronized void setPollInterval (long pollInterval) { 132 this.pollInterval = pollInterval; 133 setModified (true); 134 } 135 136 public long getPollInterval () { 137 return pollInterval; 138 } 139 140 public synchronized void setPriorities (String priorities) { 141 this.priorities = priorities; 142 setModified (true); 143 } 144 145 public String getPriorities () { 146 return priorities; 147 } 148 149 public synchronized void setProcessor (String processor) { 150 this.processorClass = processor; 151 setModified (true); 152 } 153 154 public String getProcessor() { 155 return processorClass; 156 } 157}