001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import io.micrometer.core.instrument.*; 022import io.micrometer.core.instrument.Timer; 023import org.HdrHistogram.AtomicHistogram; 024import org.jdom2.Element; 025import org.jpos.core.ConfigurationException; 026import org.jpos.core.Environment; 027import org.jpos.iso.*; 028import org.jpos.metrics.MeterFactory; 029import org.jpos.metrics.MeterInfo; 030import org.jpos.q2.QBeanSupport; 031import org.jpos.q2.QFactory; 032import org.jpos.space.*; 033import org.jpos.util.*; 034import org.jpos.util.Metrics; 035 036import java.io.IOException; 037import java.io.PrintStream; 038import java.util.*; 039import java.util.concurrent.ScheduledFuture; 040import java.util.concurrent.TimeUnit; 041 042/** 043 * A Q2-managed multiplexer that routes ISO messages between channels and listeners. 044 * @author Alejandro Revilla 045 */ 046@SuppressWarnings("unchecked") 047public class QMUX 048 extends QBeanSupport 049 implements SpaceListener, MUX, QMUXMBean, Loggeable, MetricsProvider 050{ 051 static final String nomap = "0123456789"; 052 static final String DEFAULT_KEY = "41, 11"; 053 /** Local space backing the in/out queues. */ 054 protected LocalSpace sp; 055 /** Queue names: inbound responses, outbound requests, and unhandled messages. */ 056 protected String in, out, unhandled; 057 /** Optional list of "ready indicator" keys polled to determine connectivity. */ 058 protected String[] ready; 059 /** Default field-number key components used to correlate responses. */ 060 protected String[] key; 061 /** Comma-separated response codes to ignore (treat as if no response was received). */ 062 protected String ignorerc; 063 /** Three-character MTI mapping table used by {@link #mapMTI(String)}. */ 064 protected String[] mtiMapping; 065 private boolean headerIsKey; 066 private boolean returnRejects; 067 private LocalSpace isp; // internal space 068 private Map<String,String[]> mtiKey = new HashMap<>(); 069 private Metrics metrics = new Metrics(new AtomicHistogram(60000, 2)); 070 071 List<ISORequestListener> listeners; 072 private volatile int rx, tx, rxExpired, txExpired, rxPending, rxUnhandled, rxForwarded; 073 private volatile long lastTxn = 0L; 074 private boolean listenerRegistered; 075 076 private Gauge statusGauge; 077 private Gauge rxPendingGauge; 078 private Timer responseTimer; 079 080 private Counter txCounter; 081 private Counter rxCounter; 082 private Counter rxMatchCounter; 083 private Counter rxUnhandledCounter; 084 085 /** Default constructor. */ 086 public QMUX () { 087 super (); 088 listeners = new ArrayList<>(); 089 } 090 091 @Override 092 protected String defaultRealm() { 093 return Realm.COMM_MUX; 094 } 095 096 public void initService () throws ConfigurationException { 097 Element e = getPersist (); 098 sp = grabSpace (e.getChild ("space")); 099 isp = cfg.getBoolean("reuse-space", false) ? sp : new TSpace(); 100 in = Environment.get(e.getChildTextTrim ("in")); 101 out = Environment.get(e.getChildTextTrim ("out")); 102 103 if (in == null || out == null) { 104 throw new ConfigurationException ("Misconfigured QMUX. Please verify in/out queues"); 105 } 106 ignorerc = Environment.get(e.getChildTextTrim ("ignore-rc")); 107 key = toStringArray(DEFAULT_KEY, ", ", null); 108 returnRejects = cfg.getBoolean("return-rejects", false); 109 for (Element keyElement : e.getChildren("key")) { 110 String mtiOverride = QFactory.getAttributeValue(keyElement, "mti"); 111 if (mtiOverride != null && mtiOverride.length() >= 2) { 112 String pcode = sanitizePcode(QFactory.getAttributeValue(keyElement, "pcode")); 113 String mapKey = buildMtiKey(mtiOverride.substring(0,2), pcode); 114 mtiKey.put (mapKey, toStringArray(keyElement.getTextTrim(), ", ", null)); 115 } else { 116 key = toStringArray(e.getChildTextTrim("key"), ", ", DEFAULT_KEY); 117 } 118 } 119 ready = toStringArray(Environment.get(e.getChildTextTrim ("ready"))); 120 mtiMapping = toStringArray(Environment.get(e.getChildTextTrim ("mtimapping"))); 121 if (mtiMapping == null || mtiMapping.length != 3) 122 mtiMapping = new String[] { nomap, nomap, "0022446689" }; 123 addListeners (); 124 unhandled = Environment.get(e.getChildTextTrim ("unhandled")); 125 initMeters(); 126 NameRegistrar.register ("mux."+getName (), this); 127 } 128 public void startService () { 129 if (!listenerRegistered) { 130 listenerRegistered = true; 131 // Handle messages that could be in the in queue at start time 132 synchronized (sp) { 133 Object[] pending = SpaceUtil.inpAll(sp, in); 134 sp.addListener (in, this); 135 for (Object o : pending) 136 sp.out(in, o); 137 } 138 } 139 } 140 public void stopService () { 141 listenerRegistered = false; 142 sp.removeListener (in, this); 143 removeMeters(); 144 } 145 public void destroyService () { 146 NameRegistrar.unregister ("mux."+getName ()); 147 } 148 149 /** 150 * Returns the MUX registered under the given name. 151 * 152 * @param name MUX name (without the {@code mux.} prefix) 153 * @return MUX with name using NameRegistrar 154 * @throws NameRegistrar.NotFoundException if not found in registry 155 * @see NameRegistrar 156 */ 157 public static MUX getMUX (String name) 158 throws NameRegistrar.NotFoundException 159 { 160 return (MUX) NameRegistrar.get ("mux."+name); 161 } 162 163 /** 164 * @param m message to send 165 * @param timeout amount of time in millis to wait for a response 166 * @return response or null 167 */ 168 public ISOMsg request (ISOMsg m, long timeout) throws ISOException { 169 String key = getKey (m); 170 String req = key + ".req"; 171 synchronized (isp) { 172 if (isp.rdp (req) != null) 173 throw new ISOException ("Duplicate key '" + req + "' detected"); 174 isp.out (req, m); 175 } 176 m.setDirection(0); 177 Chronometer c = new Chronometer(); 178 if (timeout > 0) 179 sp.out (out, m, timeout); 180 else 181 sp.out (out, m); 182 183 txCounter.increment(); 184 ISOMsg resp; 185 try { 186 synchronized (this) { tx++; rxPending++; } 187 188 for (;;) { 189 resp = (ISOMsg) isp.in (key, timeout); 190 if (!shouldIgnore (resp)) 191 break; 192 } 193 if (resp == null && isp.inp (req) == null) { 194 // possible race condition, retry for a few extra seconds 195 resp = (ISOMsg) isp.in (key, 10000); 196 } 197 synchronized (this) { 198 if (resp != null) { 199 rx++; 200 lastTxn = System.currentTimeMillis(); 201 } else { 202 rxExpired++; 203 if (m.getDirection() != ISOMsg.OUTGOING) 204 txExpired++; 205 } 206 } 207 } finally { 208 synchronized (this) { rxPending--; } 209 } 210 long elapsed = c.elapsed(); 211 metrics.record("all", elapsed); 212 if (resp != null) { 213 responseTimer.record(elapsed, TimeUnit.MILLISECONDS); 214 metrics.record("ok", elapsed); 215 } 216 return resp; 217 } 218 public void request (ISOMsg m, long timeout, ISOResponseListener rl, Object handBack) 219 throws ISOException 220 { 221 String key = getKey (m); 222 String req = key + ".req"; 223 synchronized (isp) { 224 if (isp.rdp (req) != null) 225 throw new ISOException ("Duplicate key '" + req + "' detected."); 226 m.setDirection(0); 227 AsyncRequest ar = new AsyncRequest (rl, handBack); 228 synchronized (ar) { 229 if (timeout > 0) 230 ar.setFuture(getScheduledThreadPoolExecutor().schedule(ar, timeout, TimeUnit.MILLISECONDS)); 231 } 232 isp.out (req, ar, timeout); 233 } 234 if (timeout > 0) 235 sp.out (out, m, timeout); 236 else 237 sp.out (out, m); 238 synchronized (this) { tx++; rxPending++; } 239 } 240 241 /** 242 * Returns whether {@code msg} should be considered for response-matching by {@link #notify(Object, Object)}. 243 * 244 * @param msg message just dequeued from the inbound space 245 * @return {@code true} when the message should be matched against pending requests 246 */ 247 protected boolean isNotifyEligible(ISOMsg msg) { 248 if (returnRejects) 249 return true; 250 251 try { 252 return msg.isResponse(); 253 } catch (RuntimeException | ISOException ex) { 254 // * ArrayIndexOutOfBoundsException - It may occur for messages where 255 // MTI is not standard 4 characters (eg. FSDISOMsg), then notification is expected. 256 // * ISOException: When there is no field 0, the error should be logged 257 return true; 258 } 259 } 260 261 @Override 262 public void notify (Object k, Object value) { 263 Object obj = sp.inp (k); 264 if (obj instanceof ISOMsg) { 265 ISOMsg m = (ISOMsg) obj; 266 rxCounter.increment(); 267 try { 268 if (isNotifyEligible(m)) { 269 String key = getKey (m); 270 String req = key + ".req"; 271 Object r = isp.inp (req); 272 if (r != null) { 273 if (r instanceof AsyncRequest ar) { 274 ar.responseReceived (m); 275 } else { 276 isp.out (key, m); 277 } 278 rxMatchCounter.increment(); 279 return; 280 } 281 } 282 } catch (ISOException e) { 283 LogEvent evt = getLog().createLogEvent("notify"); 284 evt.addMessage(e); 285 evt.addMessage(obj); 286 Logger.log(evt); 287 } 288 processUnhandled (m); 289 } 290 } 291 292 /** 293 * Builds the correlation key used to pair a response with its request, 294 * applying any per-MTI/per-PCODE overrides and special-cases for STAN/PAN fields. 295 * 296 * @param m message whose key should be derived 297 * @return the computed key (queue prefix + MTI + selected field values) 298 * @throws ISOException if no key fields are present in {@code m} 299 */ 300 public String getKey (ISOMsg m) throws ISOException { 301 if (out == null) 302 throw new NullPointerException ("Misconfigured QMUX. Please verify out queue is not null."); 303 StringBuilder sb = new StringBuilder (out); 304 sb.append ('.'); 305 sb.append (mapMTI(m.getMTI())); 306 if (headerIsKey && m.getHeader()!=null) { 307 sb.append ('.'); 308 sb.append(ISOUtil.hexString(m.getHeader())); 309 sb.append ('.'); 310 } 311 boolean hasFields = false; 312 String mti = m.getMTI(); 313 String mtiPrefix = mti.substring(0,2); 314 String[] k = null; 315 String pcode = m.hasField(3) ? sanitizePcode(m.getString(3)) : null; 316 if (pcode != null) { 317 k = mtiKey.get(buildMtiKey(mtiPrefix, pcode)); 318 } 319 if (k == null) { 320 k = mtiKey.getOrDefault(mtiPrefix, key); 321 } 322 for (String f : k) { 323 String v = m.getString(f); 324 if (v != null) { 325 if ("11".equals(f)) { 326 String vt = v.trim(); 327 int l = m.getMTI().charAt(0) == '2' ? 12 : 6; 328 if (vt.length() < l) 329 v = ISOUtil.zeropad(vt, l); 330 } 331 if ("41".equals(f)) { 332 v = ISOUtil.zeropad(v.trim(), 16); // BIC ANSI to ISO hack 333 } 334 hasFields = true; 335 sb.append(v); 336 } 337 } 338 if (!hasFields) 339 throw new ISOException ("Key fields not found - not sending " + sb.toString()); 340 return sb.toString(); 341 } 342 343 private String sanitizePcode(String pcode) { 344 if (pcode == null) 345 return null; 346 String trimmed = pcode.trim(); 347 return trimmed.isEmpty() ? null : trimmed; 348 } 349 350 private String buildMtiKey(String mtiPrefix, String pcode) { 351 return pcode == null ? mtiPrefix : mtiPrefix + ':' + pcode; 352 } 353 354 @Override 355 public Metrics getMetrics() { 356 return metrics; 357 } 358 359 private String mapMTI (String mti) throws ISOException { 360 StringBuilder sb = new StringBuilder(); 361 if (mti != null) { 362 if (mti.length() < 4) 363 mti = ISOUtil.zeropad(mti, 4); // #jPOS-55 364 if (mti.length() == 4) { 365 for (int i=0; i<mtiMapping.length; i++) { 366 int c = mti.charAt (i) - '0'; 367 if (c >= 0 && c < 10) 368 sb.append (mtiMapping[i].charAt(c)); 369 } 370 } 371 } 372 return sb.toString(); 373 } 374 public synchronized void setInQueue (String in) { 375 this.in = in; 376 getPersist().getChild("in").setText (in); 377 setModified (true); 378 } 379 public String getInQueue () { 380 return in; 381 } 382 public synchronized void setOutQueue (String out) { 383 this.out = out; 384 getPersist().getChild("out").setText (out); 385 setModified (true); 386 } 387 public String getOutQueue () { 388 return out; 389 } 390 /** 391 * Returns the {@link Space} backing this MUX's queues. 392 * 393 * @return the local space 394 */ 395 public Space getSpace() { 396 return sp; 397 } 398 public synchronized void setUnhandledQueue (String unhandled) { 399 this.unhandled = unhandled; 400 getPersist().getChild("unhandled").setText (unhandled); 401 setModified (true); 402 } 403 public String getUnhandledQueue () { 404 return unhandled; 405 } 406 /** 407 * Returns the configured ready-indicator key names polled to determine connectivity. 408 * 409 * @return the ready indicator names, or {@code null} if none were configured 410 */ 411 @SuppressWarnings("unused") 412 public String[] getReadyIndicatorNames() { 413 return ready; 414 } 415 416 private void addListeners() throws ConfigurationException { 417 QFactory factory = getFactory (); 418 for (Element l : getPersist().getChildren("request-listener")) { 419 ISORequestListener listener = factory.newInstance(l); 420 if (listener != null) 421 addISORequestListener (listener); 422 } 423 } 424 /** 425 * Registers a request listener invoked for messages that don't match a pending request. 426 * 427 * @param l listener to add 428 */ 429 public void addISORequestListener(ISORequestListener l) { 430 listeners.add (l); 431 } 432 /** 433 * Removes a previously registered request listener. 434 * 435 * @param l listener to remove 436 * @return {@code true} if the listener was registered, {@code false} otherwise 437 */ 438 public boolean removeISORequestListener(ISORequestListener l) { 439 return listeners.remove(l); 440 } 441 /** 442 * Resets all in-memory transaction counters and the last-transaction timestamp. 443 */ 444 public synchronized void resetCounters() { 445 rx = tx = rxExpired = txExpired = rxPending = rxUnhandled = rxForwarded = 0; 446 lastTxn = 0l; 447 } 448 /** 449 * Returns the current counters formatted as a single human-readable string. 450 * 451 * @return a comma-separated counter snapshot suitable for diagnostics 452 */ 453 public String getCountersAsString () { 454 StringBuffer sb = new StringBuffer(); 455 append (sb, "tx=", tx); 456 append (sb, ", rx=", rx); 457 append (sb, ", tx_expired=", getTXExpired()); 458 append (sb, ", tx_pending=", getTXPending()); 459 append (sb, ", rx_expired=", getRXExpired()); 460 append (sb, ", rx_pending=", getRXPending()); 461 append (sb, ", rx_unhandled=", getRXUnhandled()); 462 append (sb, ", rx_forwarded=", getRXForwarded()); 463 sb.append (", connected="); 464 sb.append (Boolean.toString(isConnected())); 465 sb.append (", last="); 466 sb.append (lastTxn); 467 if (lastTxn > 0) { 468 sb.append (", idle="); 469 sb.append(System.currentTimeMillis() - lastTxn); 470 sb.append ("ms"); 471 } 472 return sb.toString(); 473 } 474 475 public int getTXCounter() { 476 return tx; 477 } 478 public int getRXCounter() { 479 return rx; 480 } 481 482 @Override 483 public int getTXExpired() { 484 return txExpired; 485 } 486 487 @Override 488 public int getTXPending() { 489 return sp.size(out); 490 } 491 492 @Override 493 public int getRXExpired() { 494 return rxExpired; 495 } 496 497 @Override 498 public int getRXPending() { 499 return rxPending; 500 } 501 502 @Override 503 public int getRXUnhandled() { 504 return rxUnhandled; 505 } 506 507 @Override 508 public int getRXForwarded() { 509 return rxForwarded; 510 } 511 512 public long getLastTxnTimestampInMillis() { 513 return lastTxn; 514 } 515 public long getIdleTimeInMillis() { 516 return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L; 517 } 518 519 /** 520 * Dispatches an inbound message that did not match any pending request, 521 * giving registered request listeners a chance to handle it before falling 522 * back to the configured {@link #unhandled} queue. 523 * 524 * @param m the unmatched inbound message 525 */ 526 protected void processUnhandled (ISOMsg m) { 527 ISOSource source = m.getSource(); 528 source = source != null ? source : this; 529 rxUnhandledCounter.increment(); 530 Iterator<ISORequestListener> iter = listeners.iterator(); 531 if (iter.hasNext()) 532 synchronized (this) { rxForwarded++; } 533 while (iter.hasNext()) 534 if (iter.next().process (source, m)) 535 return; 536 if (unhandled != null) { 537 synchronized (this) { rxUnhandled++; } 538 sp.out (unhandled, m, 120000); 539 } 540 } 541 private LocalSpace grabSpace (Element e) 542 throws ConfigurationException 543 { 544 String uri = e != null ? e.getText() : ""; 545 Space sp = SpaceFactory.getSpace (uri); 546 if (sp instanceof LocalSpace) { 547 return (LocalSpace) sp; 548 } 549 throw new ConfigurationException ("Invalid space " + uri); 550 } 551 552 /** 553 * sends (or hands back) an ISOMsg 554 * 555 * @param m the Message to be sent 556 * @throws java.io.IOException if the underlying space cannot accept the message 557 * @throws org.jpos.iso.ISOException on pack/unpack error 558 * @throws org.jpos.iso.ISOFilter.VetoException if a filter vetoes the message 559 */ 560 public void send(ISOMsg m) throws IOException, ISOException { 561 if (!isConnected()) 562 throw new ISOException ("MUX is not connected"); 563 sp.out (out, m); 564 txCounter.increment(); 565 } 566 567 public boolean isConnected() { 568 if (running() && ready != null && ready.length > 0) { 569 for (String aReady : ready) 570 if (sp.rdp(aReady) != null) 571 return true; 572 return false; 573 } 574 return running(); 575 } 576 public void dump (PrintStream p, String indent) { 577 p.println (indent + getCountersAsString()); 578 metrics.dump (p, indent); 579 } 580 private String[] toStringArray(String s, String delimiter, String def) { 581 if (s == null) 582 s = def; 583 String[] arr = null; 584 if (s != null && s.length() > 0) { 585 StringTokenizer st; 586 if (delimiter != null) 587 st = new StringTokenizer(s, delimiter); 588 else 589 st = new StringTokenizer(s); 590 591 List<String> l = new ArrayList<String>(); 592 while (st.hasMoreTokens()) { 593 String t = st.nextToken(); 594 if ("header".equalsIgnoreCase(t)) { 595 headerIsKey = true; 596 } else { 597 l.add (t); 598 } 599 } 600 arr = l.toArray(new String[l.size()]); 601 } 602 return arr; 603 } 604 private String[] toStringArray(String s) { 605 return toStringArray(s, null,null); 606 } 607 private boolean shouldIgnore (ISOMsg m) { 608 if (m != null && ignorerc != null 609 && ignorerc.length() > 0 && m.hasField(39)) 610 { 611 return ignorerc.contains(m.getString(39)); 612 } 613 return false; 614 } 615 private void append (StringBuffer sb, String name, int value) { 616 sb.append (name); 617 sb.append (value); 618 } 619 /** 620 * Tracks an asynchronous request awaiting a response, with optional timeout 621 * scheduling and elapsed-time measurement. 622 */ 623 public class AsyncRequest implements Runnable { 624 ISOResponseListener rl; 625 Object handBack; 626 ScheduledFuture future; 627 Chronometer chrono; 628 /** 629 * Constructs an async request paired with the given listener and hand-back token. 630 * 631 * @param rl listener to invoke on response/expiration 632 * @param handBack opaque token relayed back to {@code rl} 633 */ 634 public AsyncRequest (ISOResponseListener rl, Object handBack) { 635 super(); 636 this.rl = rl; 637 this.handBack = handBack; 638 this.chrono = new Chronometer(); 639 } 640 /** 641 * Sets the scheduled future used to enforce the request timeout. 642 * 643 * @param future timeout future, or {@code null} for no timeout 644 */ 645 public void setFuture(ScheduledFuture future) { 646 this.future = future; 647 } 648 /** 649 * Notifies the listener that a response has been received, cancelling the timeout future. 650 * 651 * @param response inbound response message 652 */ 653 public void responseReceived (ISOMsg response) { 654 if (future == null || future.cancel(false)) { 655 synchronized (QMUX.this) { 656 rx++; 657 rxPending--; 658 lastTxn = System.currentTimeMillis(); 659 } 660 long elapsed = chrono.elapsed(); 661 metrics.record("all", elapsed); 662 metrics.record("ok", elapsed); 663 rl.responseReceived(response, handBack); 664 } 665 } 666 public void run() { 667 synchronized(QMUX.this) { 668 rxPending--; 669 } 670 metrics.record("all", chrono.elapsed()); 671 rl.expired(handBack); 672 } 673 } 674 675 private void initMeters() { 676 var tags = io.micrometer.core.instrument.Tags.of("name", getName()); 677 var registry = getServer().getMeterRegistry(); 678 statusGauge = 679 MeterFactory.gauge 680 (registry, MeterInfo.MUX_STATUS, 681 tags, 682 null, 683 () -> isConnected() ? 1 : 0 684 ); 685 686 rxPendingGauge = 687 MeterFactory.gauge 688 (registry, MeterInfo.MUX_RX_PENDING, 689 tags, 690 null, 691 () -> rxPending 692 ); 693 694 txCounter = MeterFactory.counter(registry, MeterInfo.MUX_TX, tags); 695 rxCounter = MeterFactory.counter(registry, MeterInfo.MUX_RX, tags); 696 rxMatchCounter = MeterFactory.counter(registry, MeterInfo.MUX_MATCH, tags.and("type", "match")); 697 rxUnhandledCounter = MeterFactory.counter(registry, MeterInfo.MUX_UNHANDLED, tags.and("type", "unhandled")); 698 responseTimer = MeterFactory.timer(registry, MeterInfo.MUX_RESPONSE_TIMER, tags); 699 } 700 701 private void removeMeters() { 702 MeterFactory.remove (getServer().getMeterRegistry(), 703 statusGauge, rxPendingGauge, txCounter, rxCounter, rxMatchCounter, rxUnhandledCounter, responseTimer 704 ); 705 } 706}