001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import org.jpos.core.Configurable;
022import org.jpos.core.Configuration;
023import org.jpos.core.ConfigurationException;
024import org.jpos.iso.ISOException;
025
026import java.io.*;
027import java.text.SimpleDateFormat;
028import java.util.*;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032
033/**
034 * DirPoll operates on a set of directories which defaults to
035 * <ul>
036 *  <li>request
037 *  <li>response
038 *  <li>tmp
039 *  <li>run
040 *  <li>bad
041 *  <li>archive
042 * </ul>
043 * scanning for incoming requests (of varying priorities)
044 * on the request directory and processing them by means of
045 * DirPoll.Processor or DirPoll.FileProcessor
046 * 
047 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a>
048 * @author <a href="mailto:mmilliss@moneyswitch.net">Matthew Milliss</a>
049 * @since jPOS 1.2.7
050 * @version $Revision$ $Date$
051 */
052@SuppressWarnings("unchecked")
053public class DirPoll extends SimpleLogSource
054    implements Runnable, FilenameFilter, Configurable, Destroyable
055{
056    private long pollInterval;
057    private File requestDir;
058    private File responseDir;
059    private File tmpDir;
060    private File badDir;
061    private File runDir;
062    private File archiveDir;
063    private Vector prio;
064    private int currentPriority;
065    private String basePath;
066    private String responseSuffix;
067    private ExecutorService executor;
068    private Object processor;
069    private final Object shutdownMonitor = new Object();
070
071    private boolean shutdown;
072    private boolean paused = false;
073    private boolean shouldArchive;
074    private boolean shouldCompressArchive;
075    private boolean shouldTimestampArchive;
076    private String archiveDateFormat;
077    private boolean acceptZeroLength = false;
078    private boolean regexPriorityMatching = false;
079    private List<String> poolBatchFiles = new ArrayList<>();
080    private static final long SHUTDOWN_WAIT = 15000;
081    protected Configuration cfg;
082    
083    public DirPoll () {
084        prio = new Vector();
085        setPollInterval(1000);
086        setPath (".");
087        executor = null;
088    }
089    public synchronized void setPath(String base) {
090        this.basePath = base;
091        requestDir  = new File(base, "request");
092        responseDir = new File(base, "response");
093        tmpDir      = new File(base, "tmp");
094        badDir      = new File(base, "bad");
095        runDir      = new File(base, "run");
096        archiveDir  = new File(base, "archive");
097    }
098    public void setShouldTimestampArchive(boolean shouldTimestampArchive) {
099        this.shouldTimestampArchive = shouldTimestampArchive;
100    }
101    public void setArchiveDateFormat(String dateFormat) {
102        this.archiveDateFormat = dateFormat;
103    }
104    public void setShouldArchive(boolean shouldArchive) {
105        this.shouldArchive = shouldArchive;
106    }
107    public void setShouldCompressArchive(boolean shouldCompressArchive) {
108        this.shouldCompressArchive = shouldCompressArchive;
109    }
110    public void setAcceptZeroLength(boolean acceptZeroLength) {
111        this.acceptZeroLength = acceptZeroLength;
112    }
113    public String getPath() {
114        return basePath;
115    }
116    public void setRequestDir (String dir) {
117        requestDir = new File (basePath, dir);
118    }
119    public void setResponseDir (String dir) {
120        responseDir = new File (basePath, dir);
121    }
122    public void setTmpDir (String dir) {
123        tmpDir = new File (basePath, dir);
124    }
125    public void setBadDir (String dir) {
126        badDir = new File (basePath, dir);
127    }
128    public void setRunDir (String dir) {
129        runDir = new File (basePath, dir);
130    }
131    public void setArchiveDir (String dir) {
132        archiveDir = new File (basePath, dir);
133    }
134    public void setPollInterval(long pollInterval) {
135        this.pollInterval = pollInterval;
136    }
137    public void setResponseSuffix (String suffix) {
138        this.responseSuffix = suffix;
139    }
140    public long getPollInterval() {
141        return pollInterval;
142    }
143    public void setProcessor (Object processor) {
144        this.processor = processor;
145    }
146
147    protected File getRequestDir() {
148        return requestDir;
149    }
150
151    protected File getResponseDir() {
152        return responseDir;
153    }
154
155    protected File getTmpDir() {
156        return tmpDir;
157    }
158
159    protected File getBadDir() {
160        return badDir;
161    }
162
163    protected File getRunDir() {
164        return runDir;
165    }
166
167    protected File getArchiveDir() {
168        return archiveDir;
169    }
170
171    public boolean isRegexPriorityMatching() {
172        return regexPriorityMatching;
173    }
174
175    public void setRegexPriorityMatching(boolean regexPriorityMatching) {
176        this.regexPriorityMatching = regexPriorityMatching;
177    }
178
179    /**
180     * Return instance implementing {@link FileProcessor} or {@link Processor}
181     * @return
182     * Object - need to be casted to {@link FileProcessor} or {@link Processor}
183     */
184    public Object getProcessor()
185    {
186        return this.processor;
187    }
188    /**
189     * DirPool receives Configuration requests
190     * and pass along them to the underlying processor.
191     * @param cfg Configuration object
192     * @throws ConfigurationException on errors
193     */
194    public void setConfiguration (Configuration cfg)
195        throws ConfigurationException
196    {
197        this.cfg = cfg;
198        if (cfg != null) {
199            if (processor instanceof Configurable) {
200                ((Configurable) processor).setConfiguration (cfg);
201            }
202            setRequestDir  (cfg.get ("request.dir",  "request"));
203            setResponseDir(cfg.get("response.dir", "response"));
204            setTmpDir(cfg.get("tmp.dir", "tmp"));
205            setRunDir(cfg.get("run.dir", "run"));
206            setBadDir(cfg.get("bad.dir", "bad"));
207            setArchiveDir(cfg.get("archive.dir", "archive"));
208            setResponseSuffix(cfg.get("response.suffix", null));
209            setShouldArchive(cfg.getBoolean("archive", false));
210            setShouldCompressArchive(cfg.getBoolean("archive.compress", false));
211            setAcceptZeroLength (cfg.getBoolean ("zero-length", false));
212            setArchiveDateFormat (
213                cfg.get ("archive.dateformat", "yyyyMMddHHmmss")
214            );
215            setShouldTimestampArchive (cfg.getBoolean ("archive.timestamp", false));
216            setRegexPriorityMatching(cfg.getBoolean("priority.regex", false));
217        }
218    }
219    /**
220     * @param priorities blank separated list of extensions
221     */
222    public void setPriorities (String priorities) {
223        StringTokenizer st = new StringTokenizer (priorities);
224        Vector v = new Vector();
225        while (st.hasMoreTokens()) {
226            String ext = st.nextToken();
227            v.addElement (ext.equals ("*") ? "" : ext);
228        }
229        if (v.isEmpty())
230            v.addElement ("");
231        synchronized (this) {
232            prio = v;
233        }
234    }
235    public synchronized void setThreadPool (ExecutorService executor) {
236        this.executor = executor;
237    }
238    
239    //--------------------------------------- FilenameFilter implementation
240    public boolean accept(File dir, String name) {
241        boolean result;
242        String ext = currentPriority >= 0 ?
243                (String) prio.elementAt(currentPriority) : null;
244        if (ext != null) {
245            if (isRegexPriorityMatching()) {
246                if (!name.matches(ext))
247                    return false;
248            } else {
249                if (!name.endsWith(ext))
250                    return false;
251            }
252        }
253        File f = new File (dir, name);
254        if (acceptZeroLength){
255             result = f.isFile();
256        } else {
257             result = f.isFile() && f.length() > 0;
258        }
259        return result;
260    }
261    //--------------------------------------------- Runnable implementation
262
263    public void run() { 
264        Thread.currentThread().setName ("DirPoll-"+basePath);
265        if (prio.isEmpty())
266            addPriority("");
267        while (!shutdown) {
268            synchronized (this) {
269                if (paused) {
270                    try {
271                        wait();
272                        paused = false;
273                    } catch (InterruptedException e) {
274                    }
275                }
276            }
277
278            try {
279                File f;
280                synchronized (this) {
281                    f = scan();
282                }
283                if (f != null) {
284                    executor.submit(new ProcessorRunner (f));
285                    Thread.yield(); // be nice
286                }
287                else {
288                    synchronized (shutdownMonitor) {
289                        if (!shutdown && pollInterval > 0L) {
290                            shutdownMonitor.wait(pollInterval);
291                        }
292                    }
293                }
294            } catch (InterruptedException e) {
295            } catch (Throwable e) {
296                Logger.log (new LogEvent (this, "dirpoll", e));
297                try {
298                    synchronized (shutdownMonitor) {
299                        if (!shutdown && pollInterval > 0L) {
300                            shutdownMonitor.wait(pollInterval * 10);
301                        }
302                    }
303                } catch (InterruptedException ex) { }
304            }
305        }
306    }
307    public void destroy () {
308        synchronized (shutdownMonitor) {
309            shutdown = true;
310            shutdownMonitor.notifyAll();
311
312            if (executor != null) {
313                executor.shutdown();
314                try {
315                    if (!executor.awaitTermination(SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) {
316                        executor.shutdownNow();
317                    }
318                } catch (InterruptedException e) {
319                    Thread.currentThread().interrupt();
320                }
321            }
322        }
323    }
324
325    //----------------------------------------------------- public helpers
326
327    public void createDirs() {
328        requestDir.mkdirs();
329        responseDir.mkdirs();
330        tmpDir.mkdirs();
331        badDir.mkdirs();
332        runDir.mkdirs();
333        archiveDir.mkdirs();
334    }
335    public void addPriority(String fileExtension) {
336        prio.addElement (fileExtension);
337    }
338
339    //----------------------------------------------------- private helpers
340    private byte[] readRequest (File f) throws IOException {
341        byte[] b = new byte[(int) f.length()];
342        FileInputStream in = new FileInputStream(f);
343        in.read(b);
344        in.close();
345        return b;
346    }
347    private void writeResponse (String requestName, byte[] b) 
348        throws IOException
349    {
350        if (responseSuffix != null) {
351            int pos = requestName.lastIndexOf ('.');
352            if (pos > 0)
353                requestName = requestName.substring (0, pos) + responseSuffix;
354        }
355
356        File tmp = new File(tmpDir, requestName);
357        FileOutputStream out = new FileOutputStream(tmp);
358        out.write(b);
359        out.close();
360        moveTo (tmp, responseDir);
361    }
362
363    private File moveTo(File f, File dir) throws IOException {
364        File destination = new File(dir, f.getName());
365        if (!f.renameTo(destination))
366            throw new IOException("Unable to move "+f.getName());
367        return destination;
368    }
369
370    private File store(File f, File destinationDirectory) throws IOException {
371        String storedFilename = f.getName();
372        if (shouldTimestampArchive)
373            storedFilename = f.getName() + "." + new SimpleDateFormat(archiveDateFormat).format(new Date());
374        File destination = new File(destinationDirectory, storedFilename);
375        if (!f.renameTo(destination))
376            throw new IOException("Unable to archive " + "'" + f.getName() + "' in directory " + destinationDirectory);
377        return destination;
378    }
379
380    private void compress(File f) throws IOException {
381        ZipUtil.zipFile(f, new File(f.getAbsolutePath() + ".zip"));
382        f.delete();
383    }
384
385    protected File scan() {
386        if (prio.size() > 1) {
387            for (currentPriority = 0; currentPriority < prio.size(); currentPriority++) {
388                if (poolBatchFiles.isEmpty()) {
389                    String[] files = requestDir.list(this);
390                    if (files != null && files.length > 0) {
391                        poolBatchFiles = new ArrayList(Arrays.asList(files));
392                        return new File(requestDir, poolBatchFiles.remove(0));
393                    }
394                } else {
395                    return new File(requestDir, poolBatchFiles.remove(0));
396                }
397            }
398        } else {
399            if (poolBatchFiles.isEmpty()) {
400                String[] files = requestDir.list();
401                if (files != null && files.length > 0) {
402                    poolBatchFiles = new ArrayList(Arrays.asList(files));
403                    return new File(requestDir, poolBatchFiles.remove(0));
404                }
405            } else {
406                return new File(requestDir, poolBatchFiles.remove(0));
407            }
408        }
409        return null;
410    }
411
412    private synchronized ExecutorService getExecutor() {
413        if (executor == null) {
414                if (cfg.getBoolean("virtual-threads", true)) {
415                         executor = Executors.newFixedThreadPool(10, Thread.ofVirtual().factory());
416                } else {
417                        executor = Executors.newFixedThreadPool(10, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory());
418                }
419        }
420        return executor;
421    }
422
423    // ------------------------------------------------ inner interfaces
424    public interface Processor {
425        /**
426         * @param name request name
427         * @param request request image
428         * @return response (or null)
429         */
430        byte[] process(String name, byte[] request)
431            throws DirPollException;
432    }
433    public interface FileProcessor {
434        /**
435         * @param name request File
436         * @throws org.jpos.util.DirPoll.DirPollException on errors
437         */
438        void process(File name) throws DirPollException;
439    }
440    public class ProcessorRunner implements Runnable {
441        File request;
442        LogEvent logEvent;
443        public ProcessorRunner (File request) throws IOException {
444            this.request = moveTo (request, runDir);
445            this.logEvent = null;
446        }
447        public void run() {
448            LogEvent evt = 
449                new LogEvent (
450                    DirPoll.this, "dirpoll", request.getName()
451                );
452            try {
453                if (processor == null) 
454                    throw new DirPollException 
455                        ("null processor - nothing to do");
456                else if (processor instanceof Processor) {
457                    byte[] resp = ((Processor) processor).process (
458                        request.getName(), readRequest (request)
459                    );
460                    if (resp != null) 
461                        writeResponse (request.getName(), resp);
462                } else if (processor instanceof FileProcessor) 
463                    ((FileProcessor) processor).process (request);
464
465                if (shouldArchive) {
466                    File archivedFile = store(request, archiveDir);
467                    if (shouldCompressArchive) {
468                        compress(archivedFile);
469                    }
470                } else {
471                    if (!request.delete ())
472                        throw new DirPollException 
473                            ("error: can't unlink request " + request.getName());                    
474                }
475
476            } catch (Throwable e) {
477                logEvent = evt;
478                evt.addMessage (e);
479                try {
480                    if (e instanceof DirPollException && ((DirPollException)e).isRetry()) {
481                        synchronized (shutdownMonitor) {
482                            if (!shutdown) {
483                                try {
484                                    if (pollInterval > 0L)
485                                        shutdownMonitor.wait(pollInterval * 10); // retry delay (pollInterval defaults to 100ms)
486                                } catch (InterruptedException ie) {
487                                }
488                            }
489                        }
490                        evt.addMessage("retrying");
491                        moveTo(request, requestDir);
492                    } else {
493                        store(request, badDir);
494                    }
495                } catch (IOException _e) {
496                    evt.addMessage ("Can't move to "+badDir.getPath());
497                    evt.addMessage (_e);
498                }
499            } finally {
500                if (logEvent != null) 
501                    Logger.log (logEvent);
502            }
503        }
504    }
505    public static class DirPollException extends ISOException {
506        boolean retry;
507        public DirPollException () {
508            super();
509        }
510        public DirPollException (String detail) {
511            super(detail);
512        }
513        public DirPollException (Exception nested) {
514            super(nested);
515        }
516        public DirPollException (String detail, Exception nested) {
517            super(detail, nested);
518        }
519        public boolean isRetry() {
520            return retry;
521        }
522        public void setRetry(boolean retry) {
523            this.retry = retry;
524        }
525    }
526    
527    public void pause() {
528        synchronized (this) {
529            if (!paused) {
530                paused = true;
531                // Wake up the run() method from sleeping and tell it to pause
532                notify();
533            }
534        }
535    }
536
537    public void unpause() {
538        synchronized (this) {
539            if (paused) {
540                paused = false;
541                // Wake up the wait()ing thread from being paused
542                notify();
543                // The run() method will reset the paused flag
544            }
545        }
546    }
547    
548    public boolean isPaused() {
549        synchronized (this) {
550            return paused;
551        }
552    }
553}