001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.iso.ISOException; 025 026import java.io.*; 027import java.text.SimpleDateFormat; 028import java.util.*; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.TimeUnit; 032 033/** 034 * DirPoll operates on a set of directories which defaults to 035 * <ul> 036 * <li>request 037 * <li>response 038 * <li>tmp 039 * <li>run 040 * <li>bad 041 * <li>archive 042 * </ul> 043 * scanning for incoming requests (of varying priorities) 044 * on the request directory and processing them by means of 045 * DirPoll.Processor or DirPoll.FileProcessor 046 * 047 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a> 048 * @author <a href="mailto:mmilliss@moneyswitch.net">Matthew Milliss</a> 049 * @since jPOS 1.2.7 050 * @version $Revision$ $Date$ 051 */ 052@SuppressWarnings("unchecked") 053public class DirPoll extends SimpleLogSource 054 implements Runnable, FilenameFilter, Configurable, Destroyable 055{ 056 private long pollInterval; 057 private File requestDir; 058 private File responseDir; 059 private File tmpDir; 060 private File badDir; 061 private File runDir; 062 private File archiveDir; 063 private Vector prio; 064 private int currentPriority; 065 private String basePath; 066 private String responseSuffix; 067 private ExecutorService executor; 068 private Object processor; 069 private final Object shutdownMonitor = new Object(); 070 071 private boolean shutdown; 072 private boolean paused = false; 073 private boolean shouldArchive; 074 private boolean shouldCompressArchive; 075 private boolean shouldTimestampArchive; 076 private String archiveDateFormat; 077 private boolean acceptZeroLength = false; 078 private boolean regexPriorityMatching = false; 079 private List<String> poolBatchFiles = new ArrayList<>(); 080 private static final long SHUTDOWN_WAIT = 15000; 081 protected Configuration cfg; 082 083 public DirPoll () { 084 prio = new Vector(); 085 setPollInterval(1000); 086 setPath ("."); 087 executor = null; 088 } 089 public synchronized void setPath(String base) { 090 this.basePath = base; 091 requestDir = new File(base, "request"); 092 responseDir = new File(base, "response"); 093 tmpDir = new File(base, "tmp"); 094 badDir = new File(base, "bad"); 095 runDir = new File(base, "run"); 096 archiveDir = new File(base, "archive"); 097 } 098 public void setShouldTimestampArchive(boolean shouldTimestampArchive) { 099 this.shouldTimestampArchive = shouldTimestampArchive; 100 } 101 public void setArchiveDateFormat(String dateFormat) { 102 this.archiveDateFormat = dateFormat; 103 } 104 public void setShouldArchive(boolean shouldArchive) { 105 this.shouldArchive = shouldArchive; 106 } 107 public void setShouldCompressArchive(boolean shouldCompressArchive) { 108 this.shouldCompressArchive = shouldCompressArchive; 109 } 110 public void setAcceptZeroLength(boolean acceptZeroLength) { 111 this.acceptZeroLength = acceptZeroLength; 112 } 113 public String getPath() { 114 return basePath; 115 } 116 public void setRequestDir (String dir) { 117 requestDir = new File (basePath, dir); 118 } 119 public void setResponseDir (String dir) { 120 responseDir = new File (basePath, dir); 121 } 122 public void setTmpDir (String dir) { 123 tmpDir = new File (basePath, dir); 124 } 125 public void setBadDir (String dir) { 126 badDir = new File (basePath, dir); 127 } 128 public void setRunDir (String dir) { 129 runDir = new File (basePath, dir); 130 } 131 public void setArchiveDir (String dir) { 132 archiveDir = new File (basePath, dir); 133 } 134 public void setPollInterval(long pollInterval) { 135 this.pollInterval = pollInterval; 136 } 137 public void setResponseSuffix (String suffix) { 138 this.responseSuffix = suffix; 139 } 140 public long getPollInterval() { 141 return pollInterval; 142 } 143 public void setProcessor (Object processor) { 144 this.processor = processor; 145 } 146 147 protected File getRequestDir() { 148 return requestDir; 149 } 150 151 protected File getResponseDir() { 152 return responseDir; 153 } 154 155 protected File getTmpDir() { 156 return tmpDir; 157 } 158 159 protected File getBadDir() { 160 return badDir; 161 } 162 163 protected File getRunDir() { 164 return runDir; 165 } 166 167 protected File getArchiveDir() { 168 return archiveDir; 169 } 170 171 public boolean isRegexPriorityMatching() { 172 return regexPriorityMatching; 173 } 174 175 public void setRegexPriorityMatching(boolean regexPriorityMatching) { 176 this.regexPriorityMatching = regexPriorityMatching; 177 } 178 179 /** 180 * Return instance implementing {@link FileProcessor} or {@link Processor} 181 * @return 182 * Object - need to be casted to {@link FileProcessor} or {@link Processor} 183 */ 184 public Object getProcessor() 185 { 186 return this.processor; 187 } 188 /** 189 * DirPool receives Configuration requests 190 * and pass along them to the underlying processor. 191 * @param cfg Configuration object 192 * @throws ConfigurationException on errors 193 */ 194 public void setConfiguration (Configuration cfg) 195 throws ConfigurationException 196 { 197 this.cfg = cfg; 198 if (cfg != null) { 199 if (processor instanceof Configurable) { 200 ((Configurable) processor).setConfiguration (cfg); 201 } 202 setRequestDir (cfg.get ("request.dir", "request")); 203 setResponseDir(cfg.get("response.dir", "response")); 204 setTmpDir(cfg.get("tmp.dir", "tmp")); 205 setRunDir(cfg.get("run.dir", "run")); 206 setBadDir(cfg.get("bad.dir", "bad")); 207 setArchiveDir(cfg.get("archive.dir", "archive")); 208 setResponseSuffix(cfg.get("response.suffix", null)); 209 setShouldArchive(cfg.getBoolean("archive", false)); 210 setShouldCompressArchive(cfg.getBoolean("archive.compress", false)); 211 setAcceptZeroLength (cfg.getBoolean ("zero-length", false)); 212 setArchiveDateFormat ( 213 cfg.get ("archive.dateformat", "yyyyMMddHHmmss") 214 ); 215 setShouldTimestampArchive (cfg.getBoolean ("archive.timestamp", false)); 216 setRegexPriorityMatching(cfg.getBoolean("priority.regex", false)); 217 } 218 } 219 /** 220 * @param priorities blank separated list of extensions 221 */ 222 public void setPriorities (String priorities) { 223 StringTokenizer st = new StringTokenizer (priorities); 224 Vector v = new Vector(); 225 while (st.hasMoreTokens()) { 226 String ext = st.nextToken(); 227 v.addElement (ext.equals ("*") ? "" : ext); 228 } 229 if (v.isEmpty()) 230 v.addElement (""); 231 synchronized (this) { 232 prio = v; 233 } 234 } 235 public synchronized void setThreadPool (ExecutorService executor) { 236 this.executor = executor; 237 } 238 239 //--------------------------------------- FilenameFilter implementation 240 public boolean accept(File dir, String name) { 241 boolean result; 242 String ext = currentPriority >= 0 ? 243 (String) prio.elementAt(currentPriority) : null; 244 if (ext != null) { 245 if (isRegexPriorityMatching()) { 246 if (!name.matches(ext)) 247 return false; 248 } else { 249 if (!name.endsWith(ext)) 250 return false; 251 } 252 } 253 File f = new File (dir, name); 254 if (acceptZeroLength){ 255 result = f.isFile(); 256 } else { 257 result = f.isFile() && f.length() > 0; 258 } 259 return result; 260 } 261 //--------------------------------------------- Runnable implementation 262 263 public void run() { 264 Thread.currentThread().setName ("DirPoll-"+basePath); 265 if (prio.isEmpty()) 266 addPriority(""); 267 while (!shutdown) { 268 synchronized (this) { 269 if (paused) { 270 try { 271 wait(); 272 paused = false; 273 } catch (InterruptedException e) { 274 } 275 } 276 } 277 278 try { 279 File f; 280 synchronized (this) { 281 f = scan(); 282 } 283 if (f != null) { 284 executor.submit(new ProcessorRunner (f)); 285 Thread.yield(); // be nice 286 } 287 else { 288 synchronized (shutdownMonitor) { 289 if (!shutdown && pollInterval > 0L) { 290 shutdownMonitor.wait(pollInterval); 291 } 292 } 293 } 294 } catch (InterruptedException e) { 295 } catch (Throwable e) { 296 Logger.log (new LogEvent (this, "dirpoll", e)); 297 try { 298 synchronized (shutdownMonitor) { 299 if (!shutdown && pollInterval > 0L) { 300 shutdownMonitor.wait(pollInterval * 10); 301 } 302 } 303 } catch (InterruptedException ex) { } 304 } 305 } 306 } 307 public void destroy () { 308 synchronized (shutdownMonitor) { 309 shutdown = true; 310 shutdownMonitor.notifyAll(); 311 312 if (executor != null) { 313 executor.shutdown(); 314 try { 315 if (!executor.awaitTermination(SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) { 316 executor.shutdownNow(); 317 } 318 } catch (InterruptedException e) { 319 Thread.currentThread().interrupt(); 320 } 321 } 322 } 323 } 324 325 //----------------------------------------------------- public helpers 326 327 public void createDirs() { 328 requestDir.mkdirs(); 329 responseDir.mkdirs(); 330 tmpDir.mkdirs(); 331 badDir.mkdirs(); 332 runDir.mkdirs(); 333 archiveDir.mkdirs(); 334 } 335 public void addPriority(String fileExtension) { 336 prio.addElement (fileExtension); 337 } 338 339 //----------------------------------------------------- private helpers 340 private byte[] readRequest (File f) throws IOException { 341 byte[] b = new byte[(int) f.length()]; 342 FileInputStream in = new FileInputStream(f); 343 in.read(b); 344 in.close(); 345 return b; 346 } 347 private void writeResponse (String requestName, byte[] b) 348 throws IOException 349 { 350 if (responseSuffix != null) { 351 int pos = requestName.lastIndexOf ('.'); 352 if (pos > 0) 353 requestName = requestName.substring (0, pos) + responseSuffix; 354 } 355 356 File tmp = new File(tmpDir, requestName); 357 FileOutputStream out = new FileOutputStream(tmp); 358 out.write(b); 359 out.close(); 360 moveTo (tmp, responseDir); 361 } 362 363 private File moveTo(File f, File dir) throws IOException { 364 File destination = new File(dir, f.getName()); 365 if (!f.renameTo(destination)) 366 throw new IOException("Unable to move "+f.getName()); 367 return destination; 368 } 369 370 private File store(File f, File destinationDirectory) throws IOException { 371 String storedFilename = f.getName(); 372 if (shouldTimestampArchive) 373 storedFilename = f.getName() + "." + new SimpleDateFormat(archiveDateFormat).format(new Date()); 374 File destination = new File(destinationDirectory, storedFilename); 375 if (!f.renameTo(destination)) 376 throw new IOException("Unable to archive " + "'" + f.getName() + "' in directory " + destinationDirectory); 377 return destination; 378 } 379 380 private void compress(File f) throws IOException { 381 ZipUtil.zipFile(f, new File(f.getAbsolutePath() + ".zip")); 382 f.delete(); 383 } 384 385 protected File scan() { 386 if (prio.size() > 1) { 387 for (currentPriority = 0; currentPriority < prio.size(); currentPriority++) { 388 if (poolBatchFiles.isEmpty()) { 389 String[] files = requestDir.list(this); 390 if (files != null && files.length > 0) { 391 poolBatchFiles = new ArrayList(Arrays.asList(files)); 392 return new File(requestDir, poolBatchFiles.remove(0)); 393 } 394 } else { 395 return new File(requestDir, poolBatchFiles.remove(0)); 396 } 397 } 398 } else { 399 if (poolBatchFiles.isEmpty()) { 400 String[] files = requestDir.list(); 401 if (files != null && files.length > 0) { 402 poolBatchFiles = new ArrayList(Arrays.asList(files)); 403 return new File(requestDir, poolBatchFiles.remove(0)); 404 } 405 } else { 406 return new File(requestDir, poolBatchFiles.remove(0)); 407 } 408 } 409 return null; 410 } 411 412 private synchronized ExecutorService getExecutor() { 413 if (executor == null) { 414 if (cfg.getBoolean("virtual-threads", true)) { 415 executor = Executors.newFixedThreadPool(10, Thread.ofVirtual().factory()); 416 } else { 417 executor = Executors.newFixedThreadPool(10, Thread.ofPlatform().inheritInheritableThreadLocals(true).factory()); 418 } 419 } 420 return executor; 421 } 422 423 // ------------------------------------------------ inner interfaces 424 public interface Processor { 425 /** 426 * @param name request name 427 * @param request request image 428 * @return response (or null) 429 */ 430 byte[] process(String name, byte[] request) 431 throws DirPollException; 432 } 433 public interface FileProcessor { 434 /** 435 * @param name request File 436 * @throws org.jpos.util.DirPoll.DirPollException on errors 437 */ 438 void process(File name) throws DirPollException; 439 } 440 public class ProcessorRunner implements Runnable { 441 File request; 442 LogEvent logEvent; 443 public ProcessorRunner (File request) throws IOException { 444 this.request = moveTo (request, runDir); 445 this.logEvent = null; 446 } 447 public void run() { 448 LogEvent evt = 449 new LogEvent ( 450 DirPoll.this, "dirpoll", request.getName() 451 ); 452 try { 453 if (processor == null) 454 throw new DirPollException 455 ("null processor - nothing to do"); 456 else if (processor instanceof Processor) { 457 byte[] resp = ((Processor) processor).process ( 458 request.getName(), readRequest (request) 459 ); 460 if (resp != null) 461 writeResponse (request.getName(), resp); 462 } else if (processor instanceof FileProcessor) 463 ((FileProcessor) processor).process (request); 464 465 if (shouldArchive) { 466 File archivedFile = store(request, archiveDir); 467 if (shouldCompressArchive) { 468 compress(archivedFile); 469 } 470 } else { 471 if (!request.delete ()) 472 throw new DirPollException 473 ("error: can't unlink request " + request.getName()); 474 } 475 476 } catch (Throwable e) { 477 logEvent = evt; 478 evt.addMessage (e); 479 try { 480 if (e instanceof DirPollException && ((DirPollException)e).isRetry()) { 481 synchronized (shutdownMonitor) { 482 if (!shutdown) { 483 try { 484 if (pollInterval > 0L) 485 shutdownMonitor.wait(pollInterval * 10); // retry delay (pollInterval defaults to 100ms) 486 } catch (InterruptedException ie) { 487 } 488 } 489 } 490 evt.addMessage("retrying"); 491 moveTo(request, requestDir); 492 } else { 493 store(request, badDir); 494 } 495 } catch (IOException _e) { 496 evt.addMessage ("Can't move to "+badDir.getPath()); 497 evt.addMessage (_e); 498 } 499 } finally { 500 if (logEvent != null) 501 Logger.log (logEvent); 502 } 503 } 504 } 505 public static class DirPollException extends ISOException { 506 boolean retry; 507 public DirPollException () { 508 super(); 509 } 510 public DirPollException (String detail) { 511 super(detail); 512 } 513 public DirPollException (Exception nested) { 514 super(nested); 515 } 516 public DirPollException (String detail, Exception nested) { 517 super(detail, nested); 518 } 519 public boolean isRetry() { 520 return retry; 521 } 522 public void setRetry(boolean retry) { 523 this.retry = retry; 524 } 525 } 526 527 public void pause() { 528 synchronized (this) { 529 if (!paused) { 530 paused = true; 531 // Wake up the run() method from sleeping and tell it to pause 532 notify(); 533 } 534 } 535 } 536 537 public void unpause() { 538 synchronized (this) { 539 if (paused) { 540 paused = false; 541 // Wake up the wait()ing thread from being paused 542 notify(); 543 // The run() method will reset the paused flag 544 } 545 } 546 } 547 548 public boolean isPaused() { 549 synchronized (this) { 550 return paused; 551 } 552 } 553}