001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import io.micrometer.core.instrument.*; 022import io.micrometer.core.instrument.Timer; 023import org.HdrHistogram.AtomicHistogram; 024import org.jdom2.Element; 025import org.jpos.core.ConfigurationException; 026import org.jpos.core.Environment; 027import org.jpos.iso.*; 028import org.jpos.metrics.MeterFactory; 029import org.jpos.metrics.MeterInfo; 030import org.jpos.q2.QBeanSupport; 031import org.jpos.q2.QFactory; 032import org.jpos.space.*; 033import org.jpos.util.*; 034import org.jpos.util.Metrics; 035 036import java.io.IOException; 037import java.io.PrintStream; 038import java.util.*; 039import java.util.concurrent.ScheduledFuture; 040import java.util.concurrent.TimeUnit; 041 042/** 043 * @author Alejandro Revilla 044 */ 045@SuppressWarnings("unchecked") 046public class QMUX 047 extends QBeanSupport 048 implements SpaceListener, MUX, QMUXMBean, Loggeable, MetricsProvider 049{ 050 static final String nomap = "0123456789"; 051 static final String DEFAULT_KEY = "41, 11"; 052 protected LocalSpace sp; 053 protected String in, out, unhandled; 054 protected String[] ready; 055 protected String[] key; 056 protected String ignorerc; 057 protected String[] mtiMapping; 058 private boolean headerIsKey; 059 private boolean returnRejects; 060 private LocalSpace isp; // internal space 061 private Map<String,String[]> mtiKey = new HashMap<>(); 062 private Metrics metrics = new Metrics(new AtomicHistogram(60000, 2)); 063 064 List<ISORequestListener> listeners; 065 private volatile int rx, tx, rxExpired, txExpired, rxPending, rxUnhandled, rxForwarded; 066 private volatile long lastTxn = 0L; 067 private boolean listenerRegistered; 068 069 private Gauge statusGauge; 070 private Gauge rxPendingGauge; 071 private Timer responseTimer; 072 073 private Counter txCounter; 074 private Counter rxCounter; 075 private Counter rxMatchCounter; 076 private Counter rxUnhandledCounter; 077 078 public QMUX () { 079 super (); 080 listeners = new ArrayList<>(); 081 } 082 public void initService () throws ConfigurationException { 083 Element e = getPersist (); 084 sp = grabSpace (e.getChild ("space")); 085 isp = cfg.getBoolean("reuse-space", false) ? sp : new TSpace(); 086 in = Environment.get(e.getChildTextTrim ("in")); 087 out = Environment.get(e.getChildTextTrim ("out")); 088 089 if (in == null || out == null) { 090 throw new ConfigurationException ("Misconfigured QMUX. Please verify in/out queues"); 091 } 092 ignorerc = Environment.get(e.getChildTextTrim ("ignore-rc")); 093 key = toStringArray(DEFAULT_KEY, ", ", null); 094 returnRejects = cfg.getBoolean("return-rejects", false); 095 for (Element keyElement : e.getChildren("key")) { 096 String mtiOverride = QFactory.getAttributeValue(keyElement, "mti"); 097 if (mtiOverride != null && mtiOverride.length() >= 2) { 098 String pcode = sanitizePcode(QFactory.getAttributeValue(keyElement, "pcode")); 099 String mapKey = buildMtiKey(mtiOverride.substring(0,2), pcode); 100 mtiKey.put (mapKey, toStringArray(keyElement.getTextTrim(), ", ", null)); 101 } else { 102 key = toStringArray(e.getChildTextTrim("key"), ", ", DEFAULT_KEY); 103 } 104 } 105 ready = toStringArray(Environment.get(e.getChildTextTrim ("ready"))); 106 mtiMapping = toStringArray(Environment.get(e.getChildTextTrim ("mtimapping"))); 107 if (mtiMapping == null || mtiMapping.length != 3) 108 mtiMapping = new String[] { nomap, nomap, "0022446689" }; 109 addListeners (); 110 unhandled = Environment.get(e.getChildTextTrim ("unhandled")); 111 initMeters(); 112 NameRegistrar.register ("mux."+getName (), this); 113 } 114 public void startService () { 115 if (!listenerRegistered) { 116 listenerRegistered = true; 117 // Handle messages that could be in the in queue at start time 118 synchronized (sp) { 119 Object[] pending = SpaceUtil.inpAll(sp, in); 120 sp.addListener (in, this); 121 for (Object o : pending) 122 sp.out(in, o); 123 } 124 } 125 } 126 public void stopService () { 127 listenerRegistered = false; 128 sp.removeListener (in, this); 129 removeMeters(); 130 } 131 public void destroyService () { 132 NameRegistrar.unregister ("mux."+getName ()); 133 } 134 135 /** 136 * @return MUX with name using NameRegistrar 137 * @throws NameRegistrar.NotFoundException 138 * @see NameRegistrar 139 */ 140 public static MUX getMUX (String name) 141 throws NameRegistrar.NotFoundException 142 { 143 return (MUX) NameRegistrar.get ("mux."+name); 144 } 145 146 /** 147 * @param m message to send 148 * @param timeout amount of time in millis to wait for a response 149 * @return response or null 150 */ 151 public ISOMsg request (ISOMsg m, long timeout) throws ISOException { 152 String key = getKey (m); 153 String req = key + ".req"; 154 synchronized (isp) { 155 if (isp.rdp (req) != null) 156 throw new ISOException ("Duplicate key '" + req + "' detected"); 157 isp.out (req, m); 158 } 159 m.setDirection(0); 160 Chronometer c = new Chronometer(); 161 if (timeout > 0) 162 sp.out (out, m, timeout); 163 else 164 sp.out (out, m); 165 166 txCounter.increment(); 167 ISOMsg resp; 168 try { 169 synchronized (this) { tx++; rxPending++; } 170 171 for (;;) { 172 resp = (ISOMsg) isp.in (key, timeout); 173 if (!shouldIgnore (resp)) 174 break; 175 } 176 if (resp == null && isp.inp (req) == null) { 177 // possible race condition, retry for a few extra seconds 178 resp = (ISOMsg) isp.in (key, 10000); 179 } 180 synchronized (this) { 181 if (resp != null) { 182 rx++; 183 lastTxn = System.currentTimeMillis(); 184 } else { 185 rxExpired++; 186 if (m.getDirection() != ISOMsg.OUTGOING) 187 txExpired++; 188 } 189 } 190 } finally { 191 synchronized (this) { rxPending--; } 192 } 193 long elapsed = c.elapsed(); 194 metrics.record("all", elapsed); 195 if (resp != null) { 196 responseTimer.record(elapsed, TimeUnit.MILLISECONDS); 197 metrics.record("ok", elapsed); 198 } 199 return resp; 200 } 201 public void request (ISOMsg m, long timeout, ISOResponseListener rl, Object handBack) 202 throws ISOException 203 { 204 String key = getKey (m); 205 String req = key + ".req"; 206 synchronized (isp) { 207 if (isp.rdp (req) != null) 208 throw new ISOException ("Duplicate key '" + req + "' detected."); 209 m.setDirection(0); 210 AsyncRequest ar = new AsyncRequest (rl, handBack); 211 synchronized (ar) { 212 if (timeout > 0) 213 ar.setFuture(getScheduledThreadPoolExecutor().schedule(ar, timeout, TimeUnit.MILLISECONDS)); 214 } 215 isp.out (req, ar, timeout); 216 } 217 if (timeout > 0) 218 sp.out (out, m, timeout); 219 else 220 sp.out (out, m); 221 synchronized (this) { tx++; rxPending++; } 222 } 223 224 protected boolean isNotifyEligible(ISOMsg msg) { 225 if (returnRejects) 226 return true; 227 228 try { 229 return msg.isResponse(); 230 } catch (RuntimeException | ISOException ex) { 231 // * ArrayIndexOutOfBoundsException - It may occur for messages where 232 // MTI is not standard 4 characters (eg. FSDISOMsg), then notification is expected. 233 // * ISOException: When there is no field 0, the error should be logged 234 return true; 235 } 236 } 237 238 @Override 239 public void notify (Object k, Object value) { 240 Object obj = sp.inp (k); 241 if (obj instanceof ISOMsg) { 242 ISOMsg m = (ISOMsg) obj; 243 rxCounter.increment(); 244 try { 245 if (isNotifyEligible(m)) { 246 String key = getKey (m); 247 String req = key + ".req"; 248 Object r = isp.inp (req); 249 if (r != null) { 250 if (r instanceof AsyncRequest ar) { 251 ar.responseReceived (m); 252 } else { 253 isp.out (key, m); 254 } 255 rxMatchCounter.increment(); 256 return; 257 } 258 } 259 } catch (ISOException e) { 260 LogEvent evt = getLog().createLogEvent("notify"); 261 evt.addMessage(e); 262 evt.addMessage(obj); 263 Logger.log(evt); 264 } 265 processUnhandled (m); 266 } 267 } 268 269 public String getKey (ISOMsg m) throws ISOException { 270 if (out == null) 271 throw new NullPointerException ("Misconfigured QMUX. Please verify out queue is not null."); 272 StringBuilder sb = new StringBuilder (out); 273 sb.append ('.'); 274 sb.append (mapMTI(m.getMTI())); 275 if (headerIsKey && m.getHeader()!=null) { 276 sb.append ('.'); 277 sb.append(ISOUtil.hexString(m.getHeader())); 278 sb.append ('.'); 279 } 280 boolean hasFields = false; 281 String mti = m.getMTI(); 282 String mtiPrefix = mti.substring(0,2); 283 String[] k = null; 284 String pcode = m.hasField(3) ? sanitizePcode(m.getString(3)) : null; 285 if (pcode != null) { 286 k = mtiKey.get(buildMtiKey(mtiPrefix, pcode)); 287 } 288 if (k == null) { 289 k = mtiKey.getOrDefault(mtiPrefix, key); 290 } 291 for (String f : k) { 292 String v = m.getString(f); 293 if (v != null) { 294 if ("11".equals(f)) { 295 String vt = v.trim(); 296 int l = m.getMTI().charAt(0) == '2' ? 12 : 6; 297 if (vt.length() < l) 298 v = ISOUtil.zeropad(vt, l); 299 } 300 if ("41".equals(f)) { 301 v = ISOUtil.zeropad(v.trim(), 16); // BIC ANSI to ISO hack 302 } 303 hasFields = true; 304 sb.append(v); 305 } 306 } 307 if (!hasFields) 308 throw new ISOException ("Key fields not found - not sending " + sb.toString()); 309 return sb.toString(); 310 } 311 312 private String sanitizePcode(String pcode) { 313 if (pcode == null) 314 return null; 315 String trimmed = pcode.trim(); 316 return trimmed.isEmpty() ? null : trimmed; 317 } 318 319 private String buildMtiKey(String mtiPrefix, String pcode) { 320 return pcode == null ? mtiPrefix : mtiPrefix + ':' + pcode; 321 } 322 323 @Override 324 public Metrics getMetrics() { 325 return metrics; 326 } 327 328 private String mapMTI (String mti) throws ISOException { 329 StringBuilder sb = new StringBuilder(); 330 if (mti != null) { 331 if (mti.length() < 4) 332 mti = ISOUtil.zeropad(mti, 4); // #jPOS-55 333 if (mti.length() == 4) { 334 for (int i=0; i<mtiMapping.length; i++) { 335 int c = mti.charAt (i) - '0'; 336 if (c >= 0 && c < 10) 337 sb.append (mtiMapping[i].charAt(c)); 338 } 339 } 340 } 341 return sb.toString(); 342 } 343 public synchronized void setInQueue (String in) { 344 this.in = in; 345 getPersist().getChild("in").setText (in); 346 setModified (true); 347 } 348 public String getInQueue () { 349 return in; 350 } 351 public synchronized void setOutQueue (String out) { 352 this.out = out; 353 getPersist().getChild("out").setText (out); 354 setModified (true); 355 } 356 public String getOutQueue () { 357 return out; 358 } 359 public Space getSpace() { 360 return sp; 361 } 362 public synchronized void setUnhandledQueue (String unhandled) { 363 this.unhandled = unhandled; 364 getPersist().getChild("unhandled").setText (unhandled); 365 setModified (true); 366 } 367 public String getUnhandledQueue () { 368 return unhandled; 369 } 370 @SuppressWarnings("unused") 371 public String[] getReadyIndicatorNames() { 372 return ready; 373 } 374 375 private void addListeners() throws ConfigurationException { 376 QFactory factory = getFactory (); 377 for (Element l : getPersist().getChildren("request-listener")) { 378 ISORequestListener listener = factory.newInstance(l); 379 if (listener != null) 380 addISORequestListener (listener); 381 } 382 } 383 public void addISORequestListener(ISORequestListener l) { 384 listeners.add (l); 385 } 386 public boolean removeISORequestListener(ISORequestListener l) { 387 return listeners.remove(l); 388 } 389 public synchronized void resetCounters() { 390 rx = tx = rxExpired = txExpired = rxPending = rxUnhandled = rxForwarded = 0; 391 lastTxn = 0l; 392 } 393 public String getCountersAsString () { 394 StringBuffer sb = new StringBuffer(); 395 append (sb, "tx=", tx); 396 append (sb, ", rx=", rx); 397 append (sb, ", tx_expired=", getTXExpired()); 398 append (sb, ", tx_pending=", getTXPending()); 399 append (sb, ", rx_expired=", getRXExpired()); 400 append (sb, ", rx_pending=", getRXPending()); 401 append (sb, ", rx_unhandled=", getRXUnhandled()); 402 append (sb, ", rx_forwarded=", getRXForwarded()); 403 sb.append (", connected="); 404 sb.append (Boolean.toString(isConnected())); 405 sb.append (", last="); 406 sb.append (lastTxn); 407 if (lastTxn > 0) { 408 sb.append (", idle="); 409 sb.append(System.currentTimeMillis() - lastTxn); 410 sb.append ("ms"); 411 } 412 return sb.toString(); 413 } 414 415 public int getTXCounter() { 416 return tx; 417 } 418 public int getRXCounter() { 419 return rx; 420 } 421 422 @Override 423 public int getTXExpired() { 424 return txExpired; 425 } 426 427 @Override 428 public int getTXPending() { 429 return sp.size(out); 430 } 431 432 @Override 433 public int getRXExpired() { 434 return rxExpired; 435 } 436 437 @Override 438 public int getRXPending() { 439 return rxPending; 440 } 441 442 @Override 443 public int getRXUnhandled() { 444 return rxUnhandled; 445 } 446 447 @Override 448 public int getRXForwarded() { 449 return rxForwarded; 450 } 451 452 public long getLastTxnTimestampInMillis() { 453 return lastTxn; 454 } 455 public long getIdleTimeInMillis() { 456 return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L; 457 } 458 459 protected void processUnhandled (ISOMsg m) { 460 ISOSource source = m.getSource(); 461 source = source != null ? source : this; 462 rxUnhandledCounter.increment(); 463 Iterator<ISORequestListener> iter = listeners.iterator(); 464 if (iter.hasNext()) 465 synchronized (this) { rxForwarded++; } 466 while (iter.hasNext()) 467 if (iter.next().process (source, m)) 468 return; 469 if (unhandled != null) { 470 synchronized (this) { rxUnhandled++; } 471 sp.out (unhandled, m, 120000); 472 } 473 } 474 private LocalSpace grabSpace (Element e) 475 throws ConfigurationException 476 { 477 String uri = e != null ? e.getText() : ""; 478 Space sp = SpaceFactory.getSpace (uri); 479 if (sp instanceof LocalSpace) { 480 return (LocalSpace) sp; 481 } 482 throw new ConfigurationException ("Invalid space " + uri); 483 } 484 485 /** 486 * sends (or hands back) an ISOMsg 487 * 488 * @param m the Message to be sent 489 * @throws java.io.IOException 490 * @throws org.jpos.iso.ISOException 491 * @throws org.jpos.iso.ISOFilter.VetoException; 492 */ 493 public void send(ISOMsg m) throws IOException, ISOException { 494 if (!isConnected()) 495 throw new ISOException ("MUX is not connected"); 496 sp.out (out, m); 497 txCounter.increment(); 498 } 499 500 public boolean isConnected() { 501 if (running() && ready != null && ready.length > 0) { 502 for (String aReady : ready) 503 if (sp.rdp(aReady) != null) 504 return true; 505 return false; 506 } 507 return running(); 508 } 509 public void dump (PrintStream p, String indent) { 510 p.println (indent + getCountersAsString()); 511 metrics.dump (p, indent); 512 } 513 private String[] toStringArray(String s, String delimiter, String def) { 514 if (s == null) 515 s = def; 516 String[] arr = null; 517 if (s != null && s.length() > 0) { 518 StringTokenizer st; 519 if (delimiter != null) 520 st = new StringTokenizer(s, delimiter); 521 else 522 st = new StringTokenizer(s); 523 524 List<String> l = new ArrayList<String>(); 525 while (st.hasMoreTokens()) { 526 String t = st.nextToken(); 527 if ("header".equalsIgnoreCase(t)) { 528 headerIsKey = true; 529 } else { 530 l.add (t); 531 } 532 } 533 arr = l.toArray(new String[l.size()]); 534 } 535 return arr; 536 } 537 private String[] toStringArray(String s) { 538 return toStringArray(s, null,null); 539 } 540 private boolean shouldIgnore (ISOMsg m) { 541 if (m != null && ignorerc != null 542 && ignorerc.length() > 0 && m.hasField(39)) 543 { 544 return ignorerc.contains(m.getString(39)); 545 } 546 return false; 547 } 548 private void append (StringBuffer sb, String name, int value) { 549 sb.append (name); 550 sb.append (value); 551 } 552 public class AsyncRequest implements Runnable { 553 ISOResponseListener rl; 554 Object handBack; 555 ScheduledFuture future; 556 Chronometer chrono; 557 public AsyncRequest (ISOResponseListener rl, Object handBack) { 558 super(); 559 this.rl = rl; 560 this.handBack = handBack; 561 this.chrono = new Chronometer(); 562 } 563 public void setFuture(ScheduledFuture future) { 564 this.future = future; 565 } 566 public void responseReceived (ISOMsg response) { 567 if (future == null || future.cancel(false)) { 568 synchronized (QMUX.this) { 569 rx++; 570 rxPending--; 571 lastTxn = System.currentTimeMillis(); 572 } 573 long elapsed = chrono.elapsed(); 574 metrics.record("all", elapsed); 575 metrics.record("ok", elapsed); 576 rl.responseReceived(response, handBack); 577 } 578 } 579 public void run() { 580 synchronized(QMUX.this) { 581 rxPending--; 582 } 583 metrics.record("all", chrono.elapsed()); 584 rl.expired(handBack); 585 } 586 } 587 588 private void initMeters() { 589 var tags = io.micrometer.core.instrument.Tags.of("name", getName()); 590 var registry = getServer().getMeterRegistry(); 591 statusGauge = 592 MeterFactory.gauge 593 (registry, MeterInfo.MUX_STATUS, 594 tags, 595 null, 596 () -> isConnected() ? 1 : 0 597 ); 598 599 rxPendingGauge = 600 MeterFactory.gauge 601 (registry, MeterInfo.MUX_RX_PENDING, 602 tags, 603 null, 604 () -> rxPending 605 ); 606 607 txCounter = MeterFactory.counter(registry, MeterInfo.MUX_TX, tags); 608 rxCounter = MeterFactory.counter(registry, MeterInfo.MUX_RX, tags); 609 rxMatchCounter = MeterFactory.counter(registry, MeterInfo.MUX_MATCH, tags.and("type", "match")); 610 rxUnhandledCounter = MeterFactory.counter(registry, MeterInfo.MUX_UNHANDLED, tags.and("type", "unhandled")); 611 responseTimer = MeterFactory.timer(registry, MeterInfo.MUX_RESPONSE_TIMER, tags); 612 } 613 614 private void removeMeters() { 615 MeterFactory.remove (getServer().getMeterRegistry(), 616 statusGauge, rxPendingGauge, txCounter, rxCounter, rxMatchCounter, rxUnhandledCounter, responseTimer 617 ); 618 } 619}