001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import org.jpos.core.Configurable;
022import org.jpos.core.Configuration;
023import org.jpos.core.ConfigurationException;
024import org.jpos.iso.ISOException;
025
026import java.io.*;
027import java.text.SimpleDateFormat;
028import java.util.*;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.TimeUnit;
032
033/**
034 * DirPoll operates on a set of directories which defaults to
035 * <ul>
036 *  <li>request
037 *  <li>response
038 *  <li>tmp
039 *  <li>run
040 *  <li>bad
041 *  <li>archive
042 * </ul>
043 * scanning for incoming requests (of varying priorities)
044 * on the request directory and processing them by means of
045 * DirPoll.Processor or DirPoll.FileProcessor
046 * 
047 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a>
048 * @author <a href="mailto:mmilliss@moneyswitch.net">Matthew Milliss</a>
049 * @since jPOS 1.2.7
050 * @version $Revision$ $Date$
051 */
052@SuppressWarnings("unchecked")
053public class DirPoll extends SimpleLogSource
054    implements Runnable, FilenameFilter, Configurable, Destroyable
055{
056    private long pollInterval;
057    private File requestDir;
058    private File responseDir;
059    private File tmpDir;
060    private File badDir;
061    private File runDir;
062    private File archiveDir;
063    private Vector prio;
064    private int currentPriority;
065    private String basePath;
066    private String responseSuffix;
067    private ExecutorService executor;
068    private Object processor;
069    private final Object shutdownMonitor = new Object();
070
071    private boolean shutdown;
072    private boolean paused = false;
073    private boolean shouldArchive;
074    private boolean shouldCompressArchive;
075    private boolean shouldTimestampArchive;
076    private String archiveDateFormat;
077    private boolean acceptZeroLength = false;
078    private boolean regexPriorityMatching = false;
079    private List<String> poolBatchFiles = new ArrayList<>();
080    private static final long SHUTDOWN_WAIT = 15000;
081    /** Configuration for this DirPoll instance. */
082    protected Configuration cfg;
083
084    /** Default constructor. */
085    public DirPoll () {
086        prio = new Vector();
087        setPollInterval(1000);
088        setPath (".");
089        executor = null;
090    }
091    /**
092     * Sets the base poll directory and derives the standard subdirectories from it.
093     * @param base the base directory path
094     */
095    public synchronized void setPath(String base) {
096        this.basePath = base;
097        requestDir  = new File(base, "request");
098        responseDir = new File(base, "response");
099        tmpDir      = new File(base, "tmp");
100        badDir      = new File(base, "bad");
101        runDir      = new File(base, "run");
102        archiveDir  = new File(base, "archive");
103    }
104    /**
105     * When {@code true}, archive files are timestamped.
106     * @param shouldTimestampArchive true to timestamp archived files
107     */
108    public void setShouldTimestampArchive(boolean shouldTimestampArchive) {
109        this.shouldTimestampArchive = shouldTimestampArchive;
110    }
111    /**
112     * Sets the date format pattern used when timestamping archived files.
113     * @param dateFormat the date format pattern (see {@link java.text.SimpleDateFormat})
114     */
115    public void setArchiveDateFormat(String dateFormat) {
116        this.archiveDateFormat = dateFormat;
117    }
118    /**
119     * When {@code true}, processed request files are moved to the archive directory.
120     * @param shouldArchive true to archive processed files
121     */
122    public void setShouldArchive(boolean shouldArchive) {
123        this.shouldArchive = shouldArchive;
124    }
125    /**
126     * When {@code true}, archived files are compressed.
127     * @param shouldCompressArchive true to compress archived files
128     */
129    public void setShouldCompressArchive(boolean shouldCompressArchive) {
130        this.shouldCompressArchive = shouldCompressArchive;
131    }
132    /**
133     * When {@code true}, zero-length request files are accepted.
134     * @param acceptZeroLength true to accept zero-length files
135     */
136    public void setAcceptZeroLength(boolean acceptZeroLength) {
137        this.acceptZeroLength = acceptZeroLength;
138    }
139    /**
140     * Returns the base poll directory path.
141     * @return the base path
142     */
143    public String getPath() {
144        return basePath;
145    }
146    /**
147     * Sets the request sub-directory name relative to the base path.
148     * @param dir the sub-directory name
149     */
150    public void setRequestDir (String dir) {
151        requestDir = new File (basePath, dir);
152    }
153    /**
154     * Sets the response sub-directory name relative to the base path.
155     * @param dir the sub-directory name
156     */
157    public void setResponseDir (String dir) {
158        responseDir = new File (basePath, dir);
159    }
160    /**
161     * Sets the tmp sub-directory name relative to the base path.
162     * @param dir the sub-directory name
163     */
164    public void setTmpDir (String dir) {
165        tmpDir = new File (basePath, dir);
166    }
167    /**
168     * Sets the bad (failed) sub-directory name relative to the base path.
169     * @param dir the sub-directory name
170     */
171    public void setBadDir (String dir) {
172        badDir = new File (basePath, dir);
173    }
174    /**
175     * Sets the run (in-progress) sub-directory name relative to the base path.
176     * @param dir the sub-directory name
177     */
178    public void setRunDir (String dir) {
179        runDir = new File (basePath, dir);
180    }
181    /**
182     * Sets the archive sub-directory name relative to the base path.
183     * @param dir the sub-directory name
184     */
185    public void setArchiveDir (String dir) {
186        archiveDir = new File (basePath, dir);
187    }
188    /**
189     * Sets the polling interval in milliseconds.
190     * @param pollInterval the interval in milliseconds
191     */
192    public void setPollInterval(long pollInterval) {
193        this.pollInterval = pollInterval;
194    }
195    /**
196     * Sets the suffix appended to response file names.
197     * @param suffix the response file suffix
198     */
199    public void setResponseSuffix (String suffix) {
200        this.responseSuffix = suffix;
201    }
202    /**
203     * Returns the polling interval in milliseconds.
204     * @return poll interval
205     */
206    public long getPollInterval() {
207        return pollInterval;
208    }
209    /**
210     * Sets the processor used to handle request files.
211     * @param processor a {@link Processor} or {@link FileProcessor} instance
212     */
213    public void setProcessor (Object processor) {
214        this.processor = processor;
215    }
216
217    /**
218     * Returns the request directory.
219     * @return request directory
220     */
221    protected File getRequestDir() {
222        return requestDir;
223    }
224
225    /**
226     * Returns the response directory.
227     * @return response directory
228     */
229    protected File getResponseDir() {
230        return responseDir;
231    }
232
233    /**
234     * Returns the tmp directory.
235     * @return tmp directory
236     */
237    protected File getTmpDir() {
238        return tmpDir;
239    }
240
241    /**
242     * Returns the bad (failed) directory.
243     * @return bad directory
244     */
245    protected File getBadDir() {
246        return badDir;
247    }
248
249    /**
250     * Returns the run (in-progress) directory.
251     * @return run directory
252     */
253    protected File getRunDir() {
254        return runDir;
255    }
256
257    /**
258     * Returns the archive directory.
259     * @return archive directory
260     */
261    protected File getArchiveDir() {
262        return archiveDir;
263    }
264
265    /**
266     * Returns whether regex-based priority matching is enabled.
267     * @return true if regex priority matching is active
268     */
269    public boolean isRegexPriorityMatching() {
270        return regexPriorityMatching;
271    }
272
273    /**
274     * Enables or disables regex-based file extension priority matching.
275     * @param regexPriorityMatching true to enable regex priority matching
276     */
277    public void setRegexPriorityMatching(boolean regexPriorityMatching) {
278        this.regexPriorityMatching = regexPriorityMatching;
279    }
280
281    /**
282     * Return instance implementing {@link FileProcessor} or {@link Processor}
283     * @return
284     * Object - need to be casted to {@link FileProcessor} or {@link Processor}
285     */
286    public Object getProcessor()
287    {
288        return this.processor;
289    }
290    /**
291     * DirPool receives Configuration requests
292     * and pass along them to the underlying processor.
293     * @param cfg Configuration object
294     * @throws ConfigurationException on errors
295     */
296    public void setConfiguration (Configuration cfg)
297        throws ConfigurationException
298    {
299        this.cfg = cfg;
300        if (cfg != null) {
301            if (processor instanceof Configurable) {
302                ((Configurable) processor).setConfiguration (cfg);
303            }
304            setRequestDir  (cfg.get ("request.dir",  "request"));
305            setResponseDir(cfg.get("response.dir", "response"));
306            setTmpDir(cfg.get("tmp.dir", "tmp"));
307            setRunDir(cfg.get("run.dir", "run"));
308            setBadDir(cfg.get("bad.dir", "bad"));
309            setArchiveDir(cfg.get("archive.dir", "archive"));
310            setResponseSuffix(cfg.get("response.suffix", null));
311            setShouldArchive(cfg.getBoolean("archive", false));
312            setShouldCompressArchive(cfg.getBoolean("archive.compress", false));
313            setAcceptZeroLength (cfg.getBoolean ("zero-length", false));
314            setArchiveDateFormat (
315                cfg.get ("archive.dateformat", "yyyyMMddHHmmss")
316            );
317            setShouldTimestampArchive (cfg.getBoolean ("archive.timestamp", false));
318            setRegexPriorityMatching(cfg.getBoolean("priority.regex", false));
319        }
320    }
321    /**
322     * Sets the file extension priority order for polling.
323     * @param priorities blank-separated list of file extensions in priority order
324     */
325    public void setPriorities (String priorities) {
326        StringTokenizer st = new StringTokenizer (priorities);
327        Vector v = new Vector();
328        while (st.hasMoreTokens()) {
329            String ext = st.nextToken();
330            v.addElement (ext.equals ("*") ? "" : ext);
331        }
332        if (v.isEmpty())
333            v.addElement ("");
334        synchronized (this) {
335            prio = v;
336        }
337    }
338    /**
339     * Sets the thread pool used to execute processor tasks.
340     * @param executor the executor service to use
341     */
342    public synchronized void setThreadPool (ExecutorService executor) {
343        this.executor = executor;
344    }
345    
346    //--------------------------------------- FilenameFilter implementation
347    /**
348     * {@link java.io.FilenameFilter} implementation that selects files matching the current priority extension.
349     * @param dir the directory
350     * @param name the file name
351     * @return true if the file should be accepted
352     */
353    public boolean accept(File dir, String name) {
354        boolean result;
355        String ext = currentPriority >= 0 ?
356                (String) prio.elementAt(currentPriority) : null;
357        if (ext != null) {
358            if (isRegexPriorityMatching()) {
359                if (!name.matches(ext))
360                    return false;
361            } else {
362                if (!name.endsWith(ext))
363                    return false;
364            }
365        }
366        File f = new File (dir, name);
367        if (acceptZeroLength){
368             result = f.isFile();
369        } else {
370             result = f.isFile() && f.length() > 0;
371        }
372        return result;
373    }
374    //--------------------------------------------- Runnable implementation
375
376    public void run() { 
377        Thread.currentThread().setName ("DirPoll-"+basePath);
378        if (prio.isEmpty())
379            addPriority("");
380        while (!shutdown) {
381            synchronized (this) {
382                if (paused) {
383                    try {
384                        wait();
385                        paused = false;
386                    } catch (InterruptedException e) {
387                    }
388                }
389            }
390
391            try {
392                File f;
393                synchronized (this) {
394                    f = scan();
395                }
396                if (f != null) {
397                    executor.submit(new ProcessorRunner (f));
398                    Thread.yield(); // be nice
399                }
400                else {
401                    synchronized (shutdownMonitor) {
402                        if (!shutdown && pollInterval > 0L) {
403                            shutdownMonitor.wait(pollInterval);
404                        }
405                    }
406                }
407            } catch (InterruptedException e) {
408            } catch (Throwable e) {
409                Logger.log (new LogEvent (this, "dirpoll", e));
410                try {
411                    synchronized (shutdownMonitor) {
412                        if (!shutdown && pollInterval > 0L) {
413                            shutdownMonitor.wait(pollInterval * 10);
414                        }
415                    }
416                } catch (InterruptedException ex) { }
417            }
418        }
419    }
420    public void destroy () {
421        synchronized (shutdownMonitor) {
422            shutdown = true;
423            shutdownMonitor.notifyAll();
424
425            if (executor != null) {
426                executor.shutdown();
427                try {
428                    if (!executor.awaitTermination(SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) {
429                        executor.shutdownNow();
430                    }
431                } catch (InterruptedException e) {
432                    Thread.currentThread().interrupt();
433                }
434            }
435        }
436    }
437
438    //----------------------------------------------------- public helpers
439
440    /** Creates all required poll directories (request, response, tmp, bad, run, archive). */
441    public void createDirs() {
442        requestDir.mkdirs();
443        responseDir.mkdirs();
444        tmpDir.mkdirs();
445        badDir.mkdirs();
446        runDir.mkdirs();
447        archiveDir.mkdirs();
448    }
449    /**
450     * Adds a file extension to the priority list.
451     * @param fileExtension the extension to add (e.g. {@code "xml"})
452     */
453    public void addPriority(String fileExtension) {
454        prio.addElement (fileExtension);
455    }
456
457    //----------------------------------------------------- private helpers
458    private byte[] readRequest (File f) throws IOException {
459        byte[] b = new byte[(int) f.length()];
460        try (FileInputStream in = new FileInputStream(f)) {
461            in.read(b);
462        }
463        return b;
464    }
465    private void writeResponse (String requestName, byte[] b)
466        throws IOException
467    {
468        if (responseSuffix != null) {
469            int pos = requestName.lastIndexOf ('.');
470            if (pos > 0)
471                requestName = requestName.substring (0, pos) + responseSuffix;
472        }
473
474        File tmp = new File(tmpDir, requestName);
475        try (FileOutputStream out = new FileOutputStream(tmp)) {
476            out.write(b);
477        }
478        moveTo (tmp, responseDir);
479    }
480
481    private File moveTo(File f, File dir) throws IOException {
482        File destination = new File(dir, f.getName());
483        if (!f.renameTo(destination))
484            throw new IOException("Unable to move "+f.getName());
485        return destination;
486    }
487
488    private File store(File f, File destinationDirectory) throws IOException {
489        String storedFilename = f.getName();
490        if (shouldTimestampArchive)
491            storedFilename = f.getName() + "." + new SimpleDateFormat(archiveDateFormat).format(new Date());
492        File destination = new File(destinationDirectory, storedFilename);
493        if (!f.renameTo(destination))
494            throw new IOException("Unable to archive " + "'" + f.getName() + "' in directory " + destinationDirectory);
495        return destination;
496    }
497
498    private void compress(File f) throws IOException {
499        ZipUtil.zipFile(f, new File(f.getAbsolutePath() + ".zip"));
500        f.delete();
501    }
502
503    /**
504     * Scans the request directory for the next file to process, respecting priority order.
505     * @return the next request {@link File}, or {@code null} if none is available
506     */
507    protected File scan() {
508        if (prio.size() > 1) {
509            for (currentPriority = 0; currentPriority < prio.size(); currentPriority++) {
510                if (poolBatchFiles.isEmpty()) {
511                    String[] files = requestDir.list(this);
512                    if (files != null && files.length > 0) {
513                        poolBatchFiles = new ArrayList(Arrays.asList(files));
514                        return new File(requestDir, poolBatchFiles.remove(0));
515                    }
516                } else {
517                    return new File(requestDir, poolBatchFiles.remove(0));
518                }
519            }
520        } else {
521            if (poolBatchFiles.isEmpty()) {
522                String[] files = requestDir.list();
523                if (files != null && files.length > 0) {
524                    poolBatchFiles = new ArrayList(Arrays.asList(files));
525                    return new File(requestDir, poolBatchFiles.remove(0));
526                }
527            } else {
528                return new File(requestDir, poolBatchFiles.remove(0));
529            }
530        }
531        return null;
532    }
533
534    private synchronized ExecutorService getExecutor() {
535        if (executor == null) {
536                if (cfg.getBoolean("virtual-threads", true)) {
537                         executor = Executors.newFixedThreadPool(10, Thread.ofVirtual().factory());
538                } else {
539                        executor = Executors.newFixedThreadPool(10, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory());
540                }
541        }
542        return executor;
543    }
544
545    // ------------------------------------------------ inner interfaces
546    /** Callback interface for processing binary request files. */
547    public interface Processor {
548        /**
549         * Processes a request and returns a response.
550         * @param name request file name
551         * @param request request image bytes
552         * @return response bytes (or null if none)
553         * @throws DirPollException on processing errors
554         */
555        byte[] process(String name, byte[] request)
556            throws DirPollException;
557    }
558    /** Callback interface for processing request {@link File} objects directly. */
559    public interface FileProcessor {
560        /**
561         * Processes a request file.
562         * @param name the request file
563         * @throws DirPollException on errors
564         */
565        void process(File name) throws DirPollException;
566    }
567    /** Runnable that moves a request to the run directory and dispatches it to the processor. */
568    public class ProcessorRunner implements Runnable {
569        File request;
570        LogEvent logEvent;
571        /**
572         * Creates a ProcessorRunner for the given request file.
573         * @param request the request file
574         * @throws IOException on I/O failure while moving the file
575         */
576        public ProcessorRunner (File request) throws IOException {
577            this.request = moveTo (request, runDir);
578            this.logEvent = null;
579        }
580        public void run() {
581            LogEvent evt = 
582                new LogEvent (
583                    DirPoll.this, "dirpoll", request.getName()
584                );
585            try {
586                if (processor == null) 
587                    throw new DirPollException 
588                        ("null processor - nothing to do");
589                else if (processor instanceof Processor) {
590                    byte[] resp = ((Processor) processor).process (
591                        request.getName(), readRequest (request)
592                    );
593                    if (resp != null) 
594                        writeResponse (request.getName(), resp);
595                } else if (processor instanceof FileProcessor) 
596                    ((FileProcessor) processor).process (request);
597
598                if (shouldArchive) {
599                    File archivedFile = store(request, archiveDir);
600                    if (shouldCompressArchive) {
601                        compress(archivedFile);
602                    }
603                } else {
604                    if (!request.delete ())
605                        throw new DirPollException 
606                            ("error: can't unlink request " + request.getName());                    
607                }
608
609            } catch (Throwable e) {
610                logEvent = evt;
611                evt.addMessage (e);
612                try {
613                    if (e instanceof DirPollException && ((DirPollException)e).isRetry()) {
614                        synchronized (shutdownMonitor) {
615                            if (!shutdown) {
616                                try {
617                                    if (pollInterval > 0L)
618                                        shutdownMonitor.wait(pollInterval * 10); // retry delay (pollInterval defaults to 100ms)
619                                } catch (InterruptedException ie) {
620                                }
621                            }
622                        }
623                        evt.addMessage("retrying");
624                        moveTo(request, requestDir);
625                    } else {
626                        store(request, badDir);
627                    }
628                } catch (IOException _e) {
629                    evt.addMessage ("Can't move to "+badDir.getPath());
630                    evt.addMessage (_e);
631                }
632            } finally {
633                if (logEvent != null) 
634                    Logger.log (logEvent);
635            }
636        }
637    }
638    /** Exception thrown by {@link Processor} or {@link FileProcessor} to signal a processing error. */
639    public static class DirPollException extends ISOException {
640        /** When {@code true}, the request should be retried rather than moved to bad. */
641        boolean retry;
642        /** Default constructor. */
643        public DirPollException () {
644            super();
645        }
646        /**
647         * Constructs a DirPollException with the given message.
648         * @param detail the error message
649         */
650        public DirPollException (String detail) {
651            super(detail);
652        }
653        /**
654         * Constructs a DirPollException wrapping the given exception.
655         * @param nested the nested exception
656         */
657        public DirPollException (Exception nested) {
658            super(nested);
659        }
660        /**
661         * Constructs a DirPollException with a message and nested exception.
662         * @param detail the error message
663         * @param nested the nested exception
664         */
665        public DirPollException (String detail, Exception nested) {
666            super(detail, nested);
667        }
668        /**
669         * Returns whether the failed request should be retried.
670         * @return true if retry is requested
671         */
672        public boolean isRetry() {
673            return retry;
674        }
675        /**
676         * Sets whether the failed request should be retried.
677         * @param retry true to retry, false to move to bad directory
678         */
679        public void setRetry(boolean retry) {
680            this.retry = retry;
681        }
682    }
683
684    /** Pauses the poll loop. */
685    public void pause() {
686        synchronized (this) {
687            if (!paused) {
688                paused = true;
689                // Wake up the run() method from sleeping and tell it to pause
690                notify();
691            }
692        }
693    }
694
695    /** Resumes a paused poll loop. */
696    public void unpause() {
697        synchronized (this) {
698            if (paused) {
699                paused = false;
700                // Wake up the wait()ing thread from being paused
701                notify();
702                // The run() method will reset the paused flag
703            }
704        }
705    }
706    
707    /**
708     * Returns whether the poll loop is currently paused.
709     * @return true if paused
710     */
711    public boolean isPaused() {
712        synchronized (this) {
713            return paused;
714        }
715    }
716}