001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import io.micrometer.core.instrument.Gauge; 022import io.micrometer.core.instrument.Tags; 023import io.micrometer.core.instrument.binder.BaseUnits; 024import org.jdom2.Element; 025import org.jpos.core.ConfigurationException; 026import org.jpos.core.Environment; 027import org.jpos.core.annotation.Config; 028import org.jpos.core.handlers.exception.ExceptionHandlerAware; 029import org.jpos.core.handlers.exception.ExceptionHandlerConfigAware; 030import org.jpos.iso.*; 031import org.jpos.metrics.MeterFactory; 032import org.jpos.metrics.MeterInfo; 033import org.jpos.metrics.iso.ISOMsgCounter; 034import org.jpos.metrics.iso.ISOMsgMetrics; 035import org.jpos.q2.QBeanSupport; 036import org.jpos.q2.QFactory; 037import org.jpos.space.Space; 038import org.jpos.space.SpaceFactory; 039import org.jpos.space.SpaceUtil; 040import org.jpos.util.LogSource; 041import org.jpos.util.Loggeable; 042import org.jpos.util.NameRegistrar; 043import org.jpos.util.Realm; 044 045import java.io.EOFException; 046import java.io.IOException; 047import java.io.PrintStream; 048import java.net.SocketTimeoutException; 049import java.time.Duration; 050import java.time.Instant; 051import java.time.ZoneId; 052import java.util.Date; 053import java.util.concurrent.ExecutorService; 054import java.util.concurrent.Executors; 055import java.util.concurrent.ScheduledExecutorService; 056import java.util.concurrent.TimeUnit; 057 058/** 059 * A Q2 adaptor that wraps an {@link org.jpos.iso.ISOChannel} for use within the Q2 container. 060 * @author Alejandro Revilla 061 */ 062@SuppressWarnings("unchecked") 063public class ChannelAdaptor 064 extends QBeanSupport 065 implements ChannelAdaptorMBean, Channel, Loggeable, ExceptionHandlerConfigAware 066{ 067 /** The space used for inter-process message passing. */ 068 protected Space sp; 069 private ISOChannel channel; 070 String in, out, ready, reconnect; 071 long delay; 072 boolean keepAlive = false; 073 boolean ignoreISOExceptions = false; 074 boolean writeOnly = false; 075 int rx, tx, connects; 076 long lastTxn = 0L; 077 long timeout = 0L; 078 boolean waitForWorkersOnStop; 079 private Thread receiver; 080 private Thread sender; 081 private final Object disconnectLock = Boolean.TRUE; 082 083 private ExecutorService executor; 084 private ScheduledExecutorService scheduledExecutor; 085 086 private Gauge connectionsGauge; 087 088 @Config("soft-stop") private long softStop; 089 090 /** Default constructor. */ 091 public ChannelAdaptor () { 092 super (); 093 resetCounters(); 094 } 095 096 @Override 097 protected String defaultRealm() { 098 return Realm.COMM_CHANNEL; 099 } 100 101 public void initService() throws ConfigurationException { 102 if (softStop < 0) 103 throw new ConfigurationException ("Invalid soft-stop %d".formatted(Long.valueOf(softStop))); 104 initSpaceAndQueues(); 105 NameRegistrar.register (getName(), this); 106 executor = QFactory.executorService(cfg.getBoolean("virtual-threads", false)); 107 scheduledExecutor = Executors.newSingleThreadScheduledExecutor( 108 Thread.ofVirtual().factory() 109 ); 110 } 111 public void startService () { 112 try { 113 channel = initChannel (); 114 executor.submit(new Sender()); 115 if (!writeOnly) { // fixes #426 && jPOS-20 116 executor.submit (new Receiver()); 117 } 118 initMeters(); 119 } catch (Exception e) { 120 getLog().warn ("error starting service", e); 121 } 122 } 123 public void stopService () { 124 try { 125 sp.out (in, Boolean.TRUE); 126 if (channel != null) { 127 if (softStop > 0L) 128 disconnectLater(softStop); 129 else 130 disconnect(); 131 } 132 if (waitForWorkersOnStop) 133 executor.awaitTermination(Math.max(5000L, softStop), TimeUnit.MILLISECONDS); 134 sender = null; 135 receiver = null; 136 removeMeters(); 137 } catch (Exception e) { 138 getLog().warn ("error disconnecting from remote host", e); 139 } 140 } 141 public void destroyService () { 142 NameRegistrar.unregister (getName ()); 143 NameRegistrar.unregister ("channel." + getName ()); 144 } 145 146 public synchronized void setReconnectDelay (long delay) { 147 getPersist().getChild ("reconnect-delay") 148 .setText (Long.toString (delay)); 149 this.delay = delay; 150 setModified (true); 151 } 152 public long getReconnectDelay () { 153 return delay; 154 } 155 public synchronized void setInQueue (String in) { 156 String old = this.in; 157 this.in = in; 158 if (old != null) 159 sp.out (old, Boolean.TRUE); 160 161 getPersist().getChild("in").setText (in); 162 setModified (true); 163 } 164 public String getInQueue () { 165 return in; 166 } 167 public synchronized void setOutQueue (String out) { 168 this.out = out; 169 getPersist().getChild("out").setText (out); 170 setModified (true); 171 } 172 173 /** 174 * Queue a message to be transmitted by this adaptor 175 * @param m message to send 176 */ 177 public void send (ISOMsg m) { 178 sp.out (in, m); 179 } 180 /** 181 * Queue a message to be transmitted by this adaptor 182 * @param m message to send 183 * @param timeout timeout in millis 184 */ 185 public void send (ISOMsg m, long timeout) { 186 sp.out (in, m, timeout); 187 } 188 189 /** 190 * Receive message 191 */ 192 public ISOMsg receive () { 193 return (ISOMsg) sp.in (out); 194 } 195 196 /** 197 * Receive message 198 * @param timeout time to wait for an incoming message 199 */ 200 public ISOMsg receive (long timeout) { 201 return (ISOMsg) sp.in (out, timeout); 202 } 203 /** 204 * @return true if channel is connected 205 */ 206 public boolean isConnected () { 207 return sp != null && sp.rdp (ready) != null; 208 } 209 210 public String getOutQueue () { 211 return out; 212 } 213 214 /** 215 * Parses a {@code <channel>} element, returning an {@link ISOChannel}. 216 * @param e the configuration element 217 * @param f the QFactory 218 * @return a configured ISOChannel 219 * @throws ConfigurationException on configuration error 220 */ 221 public ISOChannel newChannel (Element e, QFactory f) throws ConfigurationException { 222 return newChannel(e, f, getRealm()); 223 } 224 225 /** 226 * Parses a {@code <channel>} element, using the provided fallback realm when none is configured. 227 * @param e the configuration element 228 * @param f the QFactory 229 * @param fallbackRealm realm to use if none is configured 230 * @return a configured ISOChannel 231 * @throws ConfigurationException on configuration error 232 */ 233 public ISOChannel newChannel (Element e, QFactory f, String fallbackRealm) throws ConfigurationException { 234 String channelName = QFactory.getAttributeValue (e, "class"); 235 String packagerName = QFactory.getAttributeValue (e, "packager"); 236 237 ISOChannel channel = f.newInstance(channelName); 238 if (packagerName != null) { 239 ISOPackager packager = f.newInstance(packagerName); 240 channel.setPackager (packager); 241 f.setConfiguration (packager, e); 242 } 243 QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header")); 244 f.setLogger (channel, e, fallbackRealm); 245 f.setConfiguration (channel, e); 246 247 if (channel instanceof FilteredChannel) { 248 addFilters ((FilteredChannel) channel, e, f); 249 } 250 251 if (channel instanceof ExceptionHandlerAware) { 252 addExceptionHandlers((ExceptionHandlerAware) channel, e, f); 253 } 254 255 if (channel instanceof ISOMsgMetrics.Source metricsChannel) { 256 String type = "default"; // default alias, in case metrics not defined 257 String clazz = null; 258 259 Element met = e.getChild("metrics"); 260 if (met != null) { 261 if (QFactory.isEnabled(met)) { 262 clazz = QFactory.getAttributeValue(met, "class"); 263 String typeAttr = QFactory.getAttributeValue(met, "type"); 264 type = (clazz != null) ? "class" : // class attribute has precedence over type 265 (typeAttr != null) ? typeAttr : 266 type; 267 } else { 268 type = "none"; // <metrics enabled="false" /> equivalent to type="none" 269 } 270 } 271 272 ISOMsgMetrics m = switch (type) { 273 case "none" -> null; 274 275 case "default" -> { 276 var mc = new ISOMsgCounter(); 277 if (met != null) 278 f.setLogger(mc, met); 279 else 280 mc.setLogger(this.getLog().getLogger(), this.getRealm()); 281 yield mc; 282 } 283 284 case "counter" -> { 285 var mc = new ISOMsgCounter(); 286 f.setLogger(mc, met); 287 f.setConfiguration(mc, met); 288 yield mc; 289 } 290 291 case "class" -> { 292 ISOMsgMetrics mc = f.newInstance(clazz); 293 f.setLogger(mc, met); 294 f.setConfiguration(mc, met); 295 yield mc; 296 } 297 298 default -> throw new ConfigurationException("Unknown metric type '"+type+"'"); 299 }; 300 301 metricsChannel.setISOMsgMetrics(m); 302 } // metrics config 303 304 if (getName () != null) 305 channel.setName (getName ()); 306 307 return channel; 308 } 309 310 /** 311 * Registers filters defined in the configuration element with the channel. 312 * @param channel the channel to configure 313 * @param e the configuration element 314 * @param fact the QFactory 315 * @throws ConfigurationException on error 316 */ 317 protected void addFilters (FilteredChannel channel, Element e, QFactory fact) 318 throws ConfigurationException 319 { 320 for (Element f : e.getChildren("filter")) { 321 ISOFilter filter = fact.newInstance(f); 322 if (filter == null) continue; 323 String direction = QFactory.getAttributeValue(f, "direction"); 324 if (direction == null) 325 channel.addFilter(filter); 326 else if ("incoming".equalsIgnoreCase(direction)) 327 channel.addIncomingFilter(filter); 328 else if ("outgoing".equalsIgnoreCase(direction)) 329 channel.addOutgoingFilter(filter); 330 else if ("both".equalsIgnoreCase(direction)) { 331 channel.addIncomingFilter(filter); 332 channel.addOutgoingFilter(filter); 333 } 334 } 335 } 336 337 338 /** 339 * Initialises and returns the ISOChannel from the current configuration. 340 * @return the initialised channel 341 * @throws ConfigurationException on error 342 */ 343 protected ISOChannel initChannel () throws ConfigurationException { 344 Element persist = getPersist (); 345 Element e = persist.getChild ("channel"); 346 if (e == null) 347 throw new ConfigurationException ("channel element missing"); 348 349 ISOChannel c = newChannel (e, getFactory()); 350 351 String socketFactoryString = getSocketFactory(); 352 if (socketFactoryString != null && c instanceof FactoryChannel) { 353 ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString); 354 if (sFac instanceof LogSource) { 355 ((LogSource) sFac).setLogger(log.getLogger(), getRealm()); 356 } 357 getFactory().setConfiguration (sFac, e); 358 ((FactoryChannel)c).setSocketFactory(sFac); 359 } 360 361 return c; 362 } 363 364 /** 365 * Initialises the Space and in/out queues used by this adaptor. 366 * @throws ConfigurationException on error 367 */ 368 protected void initSpaceAndQueues () throws ConfigurationException { 369 Element persist = getPersist (); 370 sp = grabSpace (persist.getChild ("space")); 371 in = Environment.get(persist.getChildTextTrim ("in")); 372 out = Environment.get(persist.getChildTextTrim ("out")); 373 writeOnly = "yes".equalsIgnoreCase (getPersist().getChildTextTrim ("write-only")); 374 if (in == null || (out == null && !writeOnly)) { 375 throw new ConfigurationException ("Misconfigured channel. Please verify in/out queues"); 376 } 377 String s = Environment.get(persist.getChildTextTrim ("reconnect-delay")); 378 delay = s != null ? Long.parseLong (s) : 10000; // reasonable default 379 keepAlive = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("keep-alive"))); 380 ignoreISOExceptions = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("ignore-iso-exceptions"))); 381 String t = Environment.get(persist.getChildTextTrim("timeout")); 382 timeout = t != null && t.length() > 0 ? Long.parseLong(t) : 0L; 383 ready = getName() + ".ready"; 384 reconnect = getName() + ".reconnect"; 385 waitForWorkersOnStop = "yes".equalsIgnoreCase(Environment.get(persist.getChildTextTrim ("wait-for-workers-on-stop"))); 386 } 387 388 /** Background thread that forwards outgoing messages to the channel. */ 389 @SuppressWarnings("unchecked") 390 public class Sender implements Runnable { 391 /** Default constructor. */ 392 public Sender () { 393 super (); 394 } 395 public void run () { 396 Thread.currentThread().setName ("channel-sender-" + in); 397 398 while (running()){ 399 try { 400 checkConnection (); 401 if (!running()) 402 break; 403 Object o = sp.in (in, delay); 404 if (o instanceof ISOMsg m) { 405 if (!channel.isConnected()) { 406 // push back the message so it can be handled by another channel adaptor 407 sp.push(in, o); 408 continue; 409 } 410 channel.send(m); 411 tx++; 412 } else if (o instanceof Integer) { 413 if ((int)o != hashCode()) { 414 // STOP indicator seems to be for another channel adaptor 415 // sharing the same queue push it back and allow the companion 416 // channel to get it 417 sp.push (in, o, 500L); 418 ISOUtil.sleep (1000L); // larger sleep so that the indicator has time to timeout 419 } 420 } 421 else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) { 422 ((BaseChannel)channel).sendKeepAlive(); 423 } 424 } catch (ISOFilter.VetoException e) { 425 // getLog().warn ("channel-sender-"+in, e.getMessage ()); 426 } catch (ISOException e) { 427 // getLog().warn ("channel-sender-"+in, e.getMessage ()); 428 if (!ignoreISOExceptions) { 429 disconnect (); 430 } 431 ISOUtil.sleep (1000); // slow down on errors 432 } catch (Exception e) { 433 // getLog().warn ("channel-sender-"+in, e.getMessage ()); 434 disconnect (); 435 ISOUtil.sleep (1000); 436 } 437 } 438 } 439 } 440 /** Background thread that reads incoming messages from the channel. */ 441 @SuppressWarnings("unchecked") 442 public class Receiver implements Runnable { 443 /** Default constructor. */ 444 public Receiver () { 445 super (); 446 } 447 public void run () { 448 Thread.currentThread().setName ("channel-receiver-"+out); 449 boolean shuttingDown = false; 450 Instant shutdownDeadline = null; 451 final Duration gracePeriod = Duration.ofMillis(softStop); 452 while (true) { 453 if (!shuttingDown && !running()) { 454 if (gracePeriod.isZero()) 455 break; 456 shuttingDown = true; 457 shutdownDeadline = Instant.now().plus(gracePeriod); 458 getLog().info("soft-stop (%s)".formatted(shutdownDeadline.atZone(ZoneId.systemDefault()))); 459 } 460 final boolean shouldExit = shuttingDown 461 ? Instant.now().isAfter(shutdownDeadline) 462 : !running(); 463 464 if (shouldExit) { 465 getLog().info ("stop"); 466 break; 467 } 468 try { 469 Object r = sp.rd (ready, 5000L); 470 if (r == null) { 471 continue; 472 } 473 ISOMsg m = channel.receive (); 474 rx++; 475 lastTxn = System.currentTimeMillis(); 476 if (timeout > 0) 477 sp.out (out, m, timeout); 478 else 479 sp.out (out, m); 480 } catch (ISOFilter.VetoException e) { 481 // getLog().warn ("channel-receiver-"+out+"-veto-exception", e.getMessage()); 482 } catch (ISOException e) { 483 if (running()) { 484 // getLog().warn ("channel-receiver-"+out, e); 485 if (!ignoreISOExceptions) { 486 sp.out (reconnect, Boolean.TRUE, delay); 487 disconnect (); 488 sp.push (in, hashCode()); // wake-up Sender 489 } 490 ISOUtil.sleep(1000); 491 } 492 } catch (SocketTimeoutException | EOFException e) { 493 if (running()) { 494 // getLog().warn ("channel-receiver-"+out, "Read timeout / EOF - reconnecting"); 495 sp.out (reconnect, Boolean.TRUE, delay); 496 disconnect (); 497 sp.push (in, hashCode()); // wake-up Sender 498 ISOUtil.sleep(1000); 499 } 500 } catch (Exception e) { 501 if (running()) { 502 // getLog().warn ("channel-receiver-"+out, e); 503 sp.out (reconnect, Boolean.TRUE, delay); 504 disconnect (); 505 sp.push (in, hashCode()); // wake-up Sender 506 ISOUtil.sleep(1000); 507 } 508 } 509 } 510 disconnect(); 511 } 512 } 513 /** 514 * Waits until the reconnect token clears, then attempts to reconnect. 515 */ 516 protected void checkConnection () { 517 while (running() && sp.rdp (reconnect) != null) { 518 ISOUtil.sleep(1000); 519 } 520 while (running() && !channel.isConnected ()) { 521 SpaceUtil.wipe(sp, ready); 522 try { 523 channel.connect (); 524 } catch (IOException ignored) { 525 // channel.connect already logs - no need for more warnings 526 } 527 if (!channel.isConnected ()) 528 ISOUtil.sleep (delay); 529 else 530 connects++; 531 } 532 if (running() && sp.rdp (ready) == null) 533 sp.out (ready, new Date()); 534 } 535 /** 536 * Disconnects the channel and releases associated resources. 537 */ 538 protected void disconnect () { 539 // do not synchronize on this as both Sender and Receiver can deadlock against a thread calling stop() 540 synchronized (disconnectLock) { 541 try { 542 SpaceUtil.wipe(sp, ready); 543 channel.disconnect(); 544 } catch (Exception e) { 545 getLog().warn("disconnect", e); 546 } 547 } 548 } 549 private void disconnectLater(long delayInMillis) { 550 SpaceUtil.wipe(sp, ready); 551 scheduledExecutor.schedule(this::disconnect, delayInMillis, TimeUnit.MILLISECONDS); 552 } 553 public synchronized void setHost (String host) { 554 setProperty (getProperties ("channel"), "host", host); 555 setModified (true); 556 } 557 public String getHost () { 558 return getProperty (getProperties ("channel"), "host"); 559 } 560 public synchronized void setPort (int port) { 561 setProperty ( 562 getProperties ("channel"), "port", Integer.toString (port) 563 ); 564 setModified (true); 565 } 566 public int getPort () { 567 int port = 0; 568 try { 569 port = Integer.parseInt ( 570 getProperty (getProperties ("channel"), "port") 571 ); 572 } catch (NumberFormatException e) { 573 getLog().error(e); 574 } 575 return port; 576 } 577 public synchronized void setSocketFactory (String sFac) { 578 setProperty(getProperties("channel"), "socketFactory", sFac); 579 setModified(true); 580 } 581 582 public void resetCounters () { 583 rx = tx = connects = 0; 584 lastTxn = 0L; 585 } 586 public String getCountersAsString () { 587 StringBuilder sb = new StringBuilder(); 588 append (sb, "tx=", tx); 589 append (sb, ", rx=", rx); 590 append (sb, ", connects=", connects); 591 sb.append (", last="); 592 sb.append(lastTxn); 593 if (lastTxn > 0) { 594 sb.append (", idle="); 595 sb.append(System.currentTimeMillis() - lastTxn); 596 sb.append ("ms"); 597 } 598 return sb.toString(); 599 } 600 public int getTXCounter() { 601 return tx; 602 } 603 public int getRXCounter() { 604 return rx; 605 } 606 public int getConnectsCounter () { 607 return connects; 608 } 609 public long getLastTxnTimestampInMillis() { 610 return lastTxn; 611 } 612 public long getIdleTimeInMillis() { 613 return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L; 614 } 615 public String getSocketFactory() { 616 return getProperty(getProperties ("channel"), "socketFactory"); 617 } 618 public void dump (PrintStream p, String indent) { 619 p.println (indent + getCountersAsString()); 620 } 621 /** 622 * Returns the Space referenced by the given element, or the default space if null. 623 * @param e the element whose text names the space 624 * @return the resolved Space 625 */ 626 protected Space grabSpace (Element e) { 627 return SpaceFactory.getSpace (e != null ? e.getText() : ""); 628 } 629 /** 630 * Appends a name=value counter entry to the string builder. 631 * @param sb the builder to append to 632 * @param name the counter name 633 * @param value the counter value 634 */ 635 protected void append (StringBuilder sb, String name, int value) { 636 sb.append (name); 637 sb.append (value); 638 } 639 640 private void initMeters() { 641 var tags = Tags.of("name", getName(), 642 "type", "client"); 643 var registry = getServer().getMeterRegistry(); 644 645 connectionsGauge = 646 MeterFactory.gauge 647 (registry, MeterInfo.ISOCHANNEL_CONNECTION_COUNT, 648 tags, 649 BaseUnits.SESSIONS, 650 () -> isConnected() ? 1 : 0 651 ); 652 653 if (channel instanceof ISOMsgMetrics.Source ms) { 654 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 655 if (mtr != null) { 656 mtr.addTags(tags); 657 mtr.register(registry); 658 } 659 } 660 } 661 662 private void removeMeters() { 663 var registry = getServer().getMeterRegistry(); 664 registry.remove(connectionsGauge); 665 666 if (channel instanceof ISOMsgMetrics.Source ms) { 667 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 668 if (mtr != null) 669 mtr.removeMeters(); 670 } 671 } 672}