001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.iso.ISOException; 025 026import java.io.*; 027import java.text.SimpleDateFormat; 028import java.util.*; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032 033/** 034 * DirPoll operates on a set of directories which defaults to 035 * <ul> 036 * <li>request 037 * <li>response 038 * <li>tmp 039 * <li>run 040 * <li>bad 041 * <li>archive 042 * </ul> 043 * scanning for incoming requests (of varying priorities) 044 * on the request directory and processing them by means of 045 * DirPoll.Processor or DirPoll.FileProcessor 046 * 047 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a> 048 * @author <a href="mailto:mmilliss@moneyswitch.net">Matthew Milliss</a> 049 * @since jPOS 1.2.7 050 * @version $Revision$ $Date$ 051 */ 052@SuppressWarnings("unchecked") 053public class DirPoll extends SimpleLogSource 054 implements Runnable, FilenameFilter, Configurable, Destroyable 055{ 056 private long pollInterval; 057 private File requestDir; 058 private File responseDir; 059 private File tmpDir; 060 private File badDir; 061 private File runDir; 062 private File archiveDir; 063 private Vector prio; 064 private int currentPriority; 065 private String basePath; 066 private String responseSuffix; 067 private ExecutorService executor; 068 private Object processor; 069 private final Object shutdownMonitor = new Object(); 070 071 private boolean shutdown; 072 private boolean paused = false; 073 private boolean shouldArchive; 074 private boolean shouldCompressArchive; 075 private boolean shouldTimestampArchive; 076 private String archiveDateFormat; 077 private boolean acceptZeroLength = false; 078 private boolean regexPriorityMatching = false; 079 private List<String> poolBatchFiles = new ArrayList<>(); 080 private static final long SHUTDOWN_WAIT = 15000; 081 /** Configuration for this DirPoll instance. */ 082 protected Configuration cfg; 083 084 /** Default constructor. */ 085 public DirPoll () { 086 prio = new Vector(); 087 setPollInterval(1000); 088 setPath ("."); 089 executor = null; 090 } 091 /** 092 * Sets the base poll directory and derives the standard subdirectories from it. 093 * @param base the base directory path 094 */ 095 public synchronized void setPath(String base) { 096 this.basePath = base; 097 requestDir = new File(base, "request"); 098 responseDir = new File(base, "response"); 099 tmpDir = new File(base, "tmp"); 100 badDir = new File(base, "bad"); 101 runDir = new File(base, "run"); 102 archiveDir = new File(base, "archive"); 103 } 104 /** 105 * When {@code true}, archive files are timestamped. 106 * @param shouldTimestampArchive true to timestamp archived files 107 */ 108 public void setShouldTimestampArchive(boolean shouldTimestampArchive) { 109 this.shouldTimestampArchive = shouldTimestampArchive; 110 } 111 /** 112 * Sets the date format pattern used when timestamping archived files. 113 * @param dateFormat the date format pattern (see {@link java.text.SimpleDateFormat}) 114 */ 115 public void setArchiveDateFormat(String dateFormat) { 116 this.archiveDateFormat = dateFormat; 117 } 118 /** 119 * When {@code true}, processed request files are moved to the archive directory. 120 * @param shouldArchive true to archive processed files 121 */ 122 public void setShouldArchive(boolean shouldArchive) { 123 this.shouldArchive = shouldArchive; 124 } 125 /** 126 * When {@code true}, archived files are compressed. 127 * @param shouldCompressArchive true to compress archived files 128 */ 129 public void setShouldCompressArchive(boolean shouldCompressArchive) { 130 this.shouldCompressArchive = shouldCompressArchive; 131 } 132 /** 133 * When {@code true}, zero-length request files are accepted. 134 * @param acceptZeroLength true to accept zero-length files 135 */ 136 public void setAcceptZeroLength(boolean acceptZeroLength) { 137 this.acceptZeroLength = acceptZeroLength; 138 } 139 /** 140 * Returns the base poll directory path. 141 * @return the base path 142 */ 143 public String getPath() { 144 return basePath; 145 } 146 /** 147 * Sets the request sub-directory name relative to the base path. 148 * @param dir the sub-directory name 149 */ 150 public void setRequestDir (String dir) { 151 requestDir = new File (basePath, dir); 152 } 153 /** 154 * Sets the response sub-directory name relative to the base path. 155 * @param dir the sub-directory name 156 */ 157 public void setResponseDir (String dir) { 158 responseDir = new File (basePath, dir); 159 } 160 /** 161 * Sets the tmp sub-directory name relative to the base path. 162 * @param dir the sub-directory name 163 */ 164 public void setTmpDir (String dir) { 165 tmpDir = new File (basePath, dir); 166 } 167 /** 168 * Sets the bad (failed) sub-directory name relative to the base path. 169 * @param dir the sub-directory name 170 */ 171 public void setBadDir (String dir) { 172 badDir = new File (basePath, dir); 173 } 174 /** 175 * Sets the run (in-progress) sub-directory name relative to the base path. 176 * @param dir the sub-directory name 177 */ 178 public void setRunDir (String dir) { 179 runDir = new File (basePath, dir); 180 } 181 /** 182 * Sets the archive sub-directory name relative to the base path. 183 * @param dir the sub-directory name 184 */ 185 public void setArchiveDir (String dir) { 186 archiveDir = new File (basePath, dir); 187 } 188 /** 189 * Sets the polling interval in milliseconds. 190 * @param pollInterval the interval in milliseconds 191 */ 192 public void setPollInterval(long pollInterval) { 193 this.pollInterval = pollInterval; 194 } 195 /** 196 * Sets the suffix appended to response file names. 197 * @param suffix the response file suffix 198 */ 199 public void setResponseSuffix (String suffix) { 200 this.responseSuffix = suffix; 201 } 202 /** 203 * Returns the polling interval in milliseconds. 204 * @return poll interval 205 */ 206 public long getPollInterval() { 207 return pollInterval; 208 } 209 /** 210 * Sets the processor used to handle request files. 211 * @param processor a {@link Processor} or {@link FileProcessor} instance 212 */ 213 public void setProcessor (Object processor) { 214 this.processor = processor; 215 } 216 217 /** 218 * Returns the request directory. 219 * @return request directory 220 */ 221 protected File getRequestDir() { 222 return requestDir; 223 } 224 225 /** 226 * Returns the response directory. 227 * @return response directory 228 */ 229 protected File getResponseDir() { 230 return responseDir; 231 } 232 233 /** 234 * Returns the tmp directory. 235 * @return tmp directory 236 */ 237 protected File getTmpDir() { 238 return tmpDir; 239 } 240 241 /** 242 * Returns the bad (failed) directory. 243 * @return bad directory 244 */ 245 protected File getBadDir() { 246 return badDir; 247 } 248 249 /** 250 * Returns the run (in-progress) directory. 251 * @return run directory 252 */ 253 protected File getRunDir() { 254 return runDir; 255 } 256 257 /** 258 * Returns the archive directory. 259 * @return archive directory 260 */ 261 protected File getArchiveDir() { 262 return archiveDir; 263 } 264 265 /** 266 * Returns whether regex-based priority matching is enabled. 267 * @return true if regex priority matching is active 268 */ 269 public boolean isRegexPriorityMatching() { 270 return regexPriorityMatching; 271 } 272 273 /** 274 * Enables or disables regex-based file extension priority matching. 275 * @param regexPriorityMatching true to enable regex priority matching 276 */ 277 public void setRegexPriorityMatching(boolean regexPriorityMatching) { 278 this.regexPriorityMatching = regexPriorityMatching; 279 } 280 281 /** 282 * Return instance implementing {@link FileProcessor} or {@link Processor} 283 * @return 284 * Object - need to be casted to {@link FileProcessor} or {@link Processor} 285 */ 286 public Object getProcessor() 287 { 288 return this.processor; 289 } 290 /** 291 * DirPool receives Configuration requests 292 * and pass along them to the underlying processor. 293 * @param cfg Configuration object 294 * @throws ConfigurationException on errors 295 */ 296 public void setConfiguration (Configuration cfg) 297 throws ConfigurationException 298 { 299 this.cfg = cfg; 300 if (cfg != null) { 301 if (processor instanceof Configurable) { 302 ((Configurable) processor).setConfiguration (cfg); 303 } 304 setRequestDir (cfg.get ("request.dir", "request")); 305 setResponseDir(cfg.get("response.dir", "response")); 306 setTmpDir(cfg.get("tmp.dir", "tmp")); 307 setRunDir(cfg.get("run.dir", "run")); 308 setBadDir(cfg.get("bad.dir", "bad")); 309 setArchiveDir(cfg.get("archive.dir", "archive")); 310 setResponseSuffix(cfg.get("response.suffix", null)); 311 setShouldArchive(cfg.getBoolean("archive", false)); 312 setShouldCompressArchive(cfg.getBoolean("archive.compress", false)); 313 setAcceptZeroLength (cfg.getBoolean ("zero-length", false)); 314 setArchiveDateFormat ( 315 cfg.get ("archive.dateformat", "yyyyMMddHHmmss") 316 ); 317 setShouldTimestampArchive (cfg.getBoolean ("archive.timestamp", false)); 318 setRegexPriorityMatching(cfg.getBoolean("priority.regex", false)); 319 } 320 } 321 /** 322 * Sets the file extension priority order for polling. 323 * @param priorities blank-separated list of file extensions in priority order 324 */ 325 public void setPriorities (String priorities) { 326 StringTokenizer st = new StringTokenizer (priorities); 327 Vector v = new Vector(); 328 while (st.hasMoreTokens()) { 329 String ext = st.nextToken(); 330 v.addElement (ext.equals ("*") ? "" : ext); 331 } 332 if (v.isEmpty()) 333 v.addElement (""); 334 synchronized (this) { 335 prio = v; 336 } 337 } 338 /** 339 * Sets the thread pool used to execute processor tasks. 340 * @param executor the executor service to use 341 */ 342 public synchronized void setThreadPool (ExecutorService executor) { 343 this.executor = executor; 344 } 345 346 //--------------------------------------- FilenameFilter implementation 347 /** 348 * {@link java.io.FilenameFilter} implementation that selects files matching the current priority extension. 349 * @param dir the directory 350 * @param name the file name 351 * @return true if the file should be accepted 352 */ 353 public boolean accept(File dir, String name) { 354 boolean result; 355 String ext = currentPriority >= 0 ? 356 (String) prio.elementAt(currentPriority) : null; 357 if (ext != null) { 358 if (isRegexPriorityMatching()) { 359 if (!name.matches(ext)) 360 return false; 361 } else { 362 if (!name.endsWith(ext)) 363 return false; 364 } 365 } 366 File f = new File (dir, name); 367 if (acceptZeroLength){ 368 result = f.isFile(); 369 } else { 370 result = f.isFile() && f.length() > 0; 371 } 372 return result; 373 } 374 //--------------------------------------------- Runnable implementation 375 376 public void run() { 377 Thread.currentThread().setName ("DirPoll-"+basePath); 378 if (prio.isEmpty()) 379 addPriority(""); 380 while (!shutdown) { 381 synchronized (this) { 382 if (paused) { 383 try { 384 wait(); 385 paused = false; 386 } catch (InterruptedException e) { 387 } 388 } 389 } 390 391 try { 392 File f; 393 synchronized (this) { 394 f = scan(); 395 } 396 if (f != null) { 397 executor.submit(new ProcessorRunner (f)); 398 Thread.yield(); // be nice 399 } 400 else { 401 synchronized (shutdownMonitor) { 402 if (!shutdown && pollInterval > 0L) { 403 shutdownMonitor.wait(pollInterval); 404 } 405 } 406 } 407 } catch (InterruptedException e) { 408 } catch (Throwable e) { 409 Logger.log (new LogEvent (this, "dirpoll", e)); 410 try { 411 synchronized (shutdownMonitor) { 412 if (!shutdown && pollInterval > 0L) { 413 shutdownMonitor.wait(pollInterval * 10); 414 } 415 } 416 } catch (InterruptedException ex) { } 417 } 418 } 419 } 420 public void destroy () { 421 synchronized (shutdownMonitor) { 422 shutdown = true; 423 shutdownMonitor.notifyAll(); 424 425 if (executor != null) { 426 executor.shutdown(); 427 try { 428 if (!executor.awaitTermination(SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) { 429 executor.shutdownNow(); 430 } 431 } catch (InterruptedException e) { 432 Thread.currentThread().interrupt(); 433 } 434 } 435 } 436 } 437 438 //----------------------------------------------------- public helpers 439 440 /** Creates all required poll directories (request, response, tmp, bad, run, archive). */ 441 public void createDirs() { 442 requestDir.mkdirs(); 443 responseDir.mkdirs(); 444 tmpDir.mkdirs(); 445 badDir.mkdirs(); 446 runDir.mkdirs(); 447 archiveDir.mkdirs(); 448 } 449 /** 450 * Adds a file extension to the priority list. 451 * @param fileExtension the extension to add (e.g. {@code "xml"}) 452 */ 453 public void addPriority(String fileExtension) { 454 prio.addElement (fileExtension); 455 } 456 457 //----------------------------------------------------- private helpers 458 private byte[] readRequest (File f) throws IOException { 459 byte[] b = new byte[(int) f.length()]; 460 try (FileInputStream in = new FileInputStream(f)) { 461 in.read(b); 462 } 463 return b; 464 } 465 private void writeResponse (String requestName, byte[] b) 466 throws IOException 467 { 468 if (responseSuffix != null) { 469 int pos = requestName.lastIndexOf ('.'); 470 if (pos > 0) 471 requestName = requestName.substring (0, pos) + responseSuffix; 472 } 473 474 File tmp = new File(tmpDir, requestName); 475 try (FileOutputStream out = new FileOutputStream(tmp)) { 476 out.write(b); 477 } 478 moveTo (tmp, responseDir); 479 } 480 481 private File moveTo(File f, File dir) throws IOException { 482 File destination = new File(dir, f.getName()); 483 if (!f.renameTo(destination)) 484 throw new IOException("Unable to move "+f.getName()); 485 return destination; 486 } 487 488 private File store(File f, File destinationDirectory) throws IOException { 489 String storedFilename = f.getName(); 490 if (shouldTimestampArchive) 491 storedFilename = f.getName() + "." + new SimpleDateFormat(archiveDateFormat).format(new Date()); 492 File destination = new File(destinationDirectory, storedFilename); 493 if (!f.renameTo(destination)) 494 throw new IOException("Unable to archive " + "'" + f.getName() + "' in directory " + destinationDirectory); 495 return destination; 496 } 497 498 private void compress(File f) throws IOException { 499 ZipUtil.zipFile(f, new File(f.getAbsolutePath() + ".zip")); 500 f.delete(); 501 } 502 503 /** 504 * Scans the request directory for the next file to process, respecting priority order. 505 * @return the next request {@link File}, or {@code null} if none is available 506 */ 507 protected File scan() { 508 if (prio.size() > 1) { 509 for (currentPriority = 0; currentPriority < prio.size(); currentPriority++) { 510 if (poolBatchFiles.isEmpty()) { 511 String[] files = requestDir.list(this); 512 if (files != null && files.length > 0) { 513 poolBatchFiles = new ArrayList(Arrays.asList(files)); 514 return new File(requestDir, poolBatchFiles.remove(0)); 515 } 516 } else { 517 return new File(requestDir, poolBatchFiles.remove(0)); 518 } 519 } 520 } else { 521 if (poolBatchFiles.isEmpty()) { 522 String[] files = requestDir.list(); 523 if (files != null && files.length > 0) { 524 poolBatchFiles = new ArrayList(Arrays.asList(files)); 525 return new File(requestDir, poolBatchFiles.remove(0)); 526 } 527 } else { 528 return new File(requestDir, poolBatchFiles.remove(0)); 529 } 530 } 531 return null; 532 } 533 534 private synchronized ExecutorService getExecutor() { 535 if (executor == null) { 536 if (cfg.getBoolean("virtual-threads", true)) { 537 executor = Executors.newFixedThreadPool(10, Thread.ofVirtual().factory()); 538 } else { 539 executor = Executors.newFixedThreadPool(10, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory()); 540 } 541 } 542 return executor; 543 } 544 545 // ------------------------------------------------ inner interfaces 546 /** Callback interface for processing binary request files. */ 547 public interface Processor { 548 /** 549 * Processes a request and returns a response. 550 * @param name request file name 551 * @param request request image bytes 552 * @return response bytes (or null if none) 553 * @throws DirPollException on processing errors 554 */ 555 byte[] process(String name, byte[] request) 556 throws DirPollException; 557 } 558 /** Callback interface for processing request {@link File} objects directly. */ 559 public interface FileProcessor { 560 /** 561 * Processes a request file. 562 * @param name the request file 563 * @throws DirPollException on errors 564 */ 565 void process(File name) throws DirPollException; 566 } 567 /** Runnable that moves a request to the run directory and dispatches it to the processor. */ 568 public class ProcessorRunner implements Runnable { 569 File request; 570 LogEvent logEvent; 571 /** 572 * Creates a ProcessorRunner for the given request file. 573 * @param request the request file 574 * @throws IOException on I/O failure while moving the file 575 */ 576 public ProcessorRunner (File request) throws IOException { 577 this.request = moveTo (request, runDir); 578 this.logEvent = null; 579 } 580 public void run() { 581 LogEvent evt = 582 new LogEvent ( 583 DirPoll.this, "dirpoll", request.getName() 584 ); 585 try { 586 if (processor == null) 587 throw new DirPollException 588 ("null processor - nothing to do"); 589 else if (processor instanceof Processor) { 590 byte[] resp = ((Processor) processor).process ( 591 request.getName(), readRequest (request) 592 ); 593 if (resp != null) 594 writeResponse (request.getName(), resp); 595 } else if (processor instanceof FileProcessor) 596 ((FileProcessor) processor).process (request); 597 598 if (shouldArchive) { 599 File archivedFile = store(request, archiveDir); 600 if (shouldCompressArchive) { 601 compress(archivedFile); 602 } 603 } else { 604 if (!request.delete ()) 605 throw new DirPollException 606 ("error: can't unlink request " + request.getName()); 607 } 608 609 } catch (Throwable e) { 610 logEvent = evt; 611 evt.addMessage (e); 612 try { 613 if (e instanceof DirPollException && ((DirPollException)e).isRetry()) { 614 synchronized (shutdownMonitor) { 615 if (!shutdown) { 616 try { 617 if (pollInterval > 0L) 618 shutdownMonitor.wait(pollInterval * 10); // retry delay (pollInterval defaults to 100ms) 619 } catch (InterruptedException ie) { 620 } 621 } 622 } 623 evt.addMessage("retrying"); 624 moveTo(request, requestDir); 625 } else { 626 store(request, badDir); 627 } 628 } catch (IOException _e) { 629 evt.addMessage ("Can't move to "+badDir.getPath()); 630 evt.addMessage (_e); 631 } 632 } finally { 633 if (logEvent != null) 634 Logger.log (logEvent); 635 } 636 } 637 } 638 /** Exception thrown by {@link Processor} or {@link FileProcessor} to signal a processing error. */ 639 public static class DirPollException extends ISOException { 640 /** When {@code true}, the request should be retried rather than moved to bad. */ 641 boolean retry; 642 /** Default constructor. */ 643 public DirPollException () { 644 super(); 645 } 646 /** 647 * Constructs a DirPollException with the given message. 648 * @param detail the error message 649 */ 650 public DirPollException (String detail) { 651 super(detail); 652 } 653 /** 654 * Constructs a DirPollException wrapping the given exception. 655 * @param nested the nested exception 656 */ 657 public DirPollException (Exception nested) { 658 super(nested); 659 } 660 /** 661 * Constructs a DirPollException with a message and nested exception. 662 * @param detail the error message 663 * @param nested the nested exception 664 */ 665 public DirPollException (String detail, Exception nested) { 666 super(detail, nested); 667 } 668 /** 669 * Returns whether the failed request should be retried. 670 * @return true if retry is requested 671 */ 672 public boolean isRetry() { 673 return retry; 674 } 675 /** 676 * Sets whether the failed request should be retried. 677 * @param retry true to retry, false to move to bad directory 678 */ 679 public void setRetry(boolean retry) { 680 this.retry = retry; 681 } 682 } 683 684 /** Pauses the poll loop. */ 685 public void pause() { 686 synchronized (this) { 687 if (!paused) { 688 paused = true; 689 // Wake up the run() method from sleeping and tell it to pause 690 notify(); 691 } 692 } 693 } 694 695 /** Resumes a paused poll loop. */ 696 public void unpause() { 697 synchronized (this) { 698 if (paused) { 699 paused = false; 700 // Wake up the wait()ing thread from being paused 701 notify(); 702 // The run() method will reset the paused flag 703 } 704 } 705 } 706 707 /** 708 * Returns whether the poll loop is currently paused. 709 * @return true if paused 710 */ 711 public boolean isPaused() { 712 synchronized (this) { 713 return paused; 714 } 715 } 716}