001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import io.micrometer.core.instrument.Gauge; 022import io.micrometer.core.instrument.Tags; 023import io.micrometer.core.instrument.binder.BaseUnits; 024import org.jdom2.Element; 025import org.jpos.core.ConfigurationException; 026import org.jpos.core.Environment; 027import org.jpos.core.annotation.Config; 028import org.jpos.core.handlers.exception.ExceptionHandlerAware; 029import org.jpos.core.handlers.exception.ExceptionHandlerConfigAware; 030import org.jpos.iso.*; 031import org.jpos.metrics.MeterFactory; 032import org.jpos.metrics.MeterInfo; 033import org.jpos.metrics.iso.ISOMsgCounter; 034import org.jpos.metrics.iso.ISOMsgMetrics; 035import org.jpos.q2.QBeanSupport; 036import org.jpos.q2.QFactory; 037import org.jpos.space.Space; 038import org.jpos.space.SpaceFactory; 039import org.jpos.space.SpaceUtil; 040import org.jpos.util.LogSource; 041import org.jpos.util.Loggeable; 042import org.jpos.util.NameRegistrar; 043 044import java.io.EOFException; 045import java.io.IOException; 046import java.io.PrintStream; 047import java.net.SocketTimeoutException; 048import java.time.Duration; 049import java.time.Instant; 050import java.time.ZoneId; 051import java.util.Date; 052import java.util.concurrent.ExecutorService; 053import java.util.concurrent.Executors; 054import java.util.concurrent.ScheduledExecutorService; 055import java.util.concurrent.TimeUnit; 056 057/** 058 * @author Alejandro Revilla 059 */ 060@SuppressWarnings("unchecked") 061public class ChannelAdaptor 062 extends QBeanSupport 063 implements ChannelAdaptorMBean, Channel, Loggeable, ExceptionHandlerConfigAware 064{ 065 protected Space sp; 066 private ISOChannel channel; 067 String in, out, ready, reconnect; 068 long delay; 069 boolean keepAlive = false; 070 boolean ignoreISOExceptions = false; 071 boolean writeOnly = false; 072 int rx, tx, connects; 073 long lastTxn = 0L; 074 long timeout = 0L; 075 boolean waitForWorkersOnStop; 076 private Thread receiver; 077 private Thread sender; 078 private final Object disconnectLock = Boolean.TRUE; 079 080 private ExecutorService executor; 081 private ScheduledExecutorService scheduledExecutor; 082 083 private Gauge connectionsGauge; 084 085 @Config("soft-stop") private long softStop; 086 087 public ChannelAdaptor () { 088 super (); 089 resetCounters(); 090 } 091 public void initService() throws ConfigurationException { 092 if (softStop < 0) 093 throw new ConfigurationException ("Invalid soft-stop %d".formatted(Long.valueOf(softStop))); 094 initSpaceAndQueues(); 095 NameRegistrar.register (getName(), this); 096 executor = QFactory.executorService(cfg.getBoolean("virtual-threads", false)); 097 scheduledExecutor = Executors.newSingleThreadScheduledExecutor( 098 Thread.ofVirtual().factory() 099 ); 100 } 101 public void startService () { 102 try { 103 channel = initChannel (); 104 executor.submit(new Sender()); 105 if (!writeOnly) { // fixes #426 && jPOS-20 106 executor.submit (new Receiver()); 107 } 108 initMeters(); 109 } catch (Exception e) { 110 getLog().warn ("error starting service", e); 111 } 112 } 113 public void stopService () { 114 try { 115 sp.out (in, Boolean.TRUE); 116 if (channel != null) { 117 if (softStop > 0L) 118 disconnectLater(softStop); 119 else 120 disconnect(); 121 } 122 if (waitForWorkersOnStop) 123 executor.awaitTermination(Math.max(5000L, softStop), TimeUnit.MILLISECONDS); 124 sender = null; 125 receiver = null; 126 removeMeters(); 127 } catch (Exception e) { 128 getLog().warn ("error disconnecting from remote host", e); 129 } 130 } 131 public void destroyService () { 132 NameRegistrar.unregister (getName ()); 133 NameRegistrar.unregister ("channel." + getName ()); 134 } 135 136 public synchronized void setReconnectDelay (long delay) { 137 getPersist().getChild ("reconnect-delay") 138 .setText (Long.toString (delay)); 139 this.delay = delay; 140 setModified (true); 141 } 142 public long getReconnectDelay () { 143 return delay; 144 } 145 public synchronized void setInQueue (String in) { 146 String old = this.in; 147 this.in = in; 148 if (old != null) 149 sp.out (old, Boolean.TRUE); 150 151 getPersist().getChild("in").setText (in); 152 setModified (true); 153 } 154 public String getInQueue () { 155 return in; 156 } 157 public synchronized void setOutQueue (String out) { 158 this.out = out; 159 getPersist().getChild("out").setText (out); 160 setModified (true); 161 } 162 163 /** 164 * Queue a message to be transmitted by this adaptor 165 * @param m message to send 166 */ 167 public void send (ISOMsg m) { 168 sp.out (in, m); 169 } 170 /** 171 * Queue a message to be transmitted by this adaptor 172 * @param m message to send 173 * @param timeout timeout in millis 174 */ 175 public void send (ISOMsg m, long timeout) { 176 sp.out (in, m, timeout); 177 } 178 179 /** 180 * Receive message 181 */ 182 public ISOMsg receive () { 183 return (ISOMsg) sp.in (out); 184 } 185 186 /** 187 * Receive message 188 * @param timeout time to wait for an incoming message 189 */ 190 public ISOMsg receive (long timeout) { 191 return (ISOMsg) sp.in (out, timeout); 192 } 193 /** 194 * @return true if channel is connected 195 */ 196 public boolean isConnected () { 197 return sp != null && sp.rdp (ready) != null; 198 } 199 200 public String getOutQueue () { 201 return out; 202 } 203 204 /** Parses a {@code <channel>} element, returning an {@link ISOChannel} */ 205 public ISOChannel newChannel (Element e, QFactory f) throws ConfigurationException { 206 String channelName = QFactory.getAttributeValue (e, "class"); 207 String packagerName = QFactory.getAttributeValue (e, "packager"); 208 209 ISOChannel channel = f.newInstance(channelName); 210 if (packagerName != null) { 211 ISOPackager packager = f.newInstance(packagerName); 212 channel.setPackager (packager); 213 f.setConfiguration (packager, e); 214 } 215 QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header")); 216 f.setLogger (channel, e); 217 f.setConfiguration (channel, e); 218 219 if (channel instanceof FilteredChannel) { 220 addFilters ((FilteredChannel) channel, e, f); 221 } 222 223 if (channel instanceof ExceptionHandlerAware) { 224 addExceptionHandlers((ExceptionHandlerAware) channel, e, f); 225 } 226 227 if (channel instanceof ISOMsgMetrics.Source metricsChannel) { 228 String type = "default"; // default alias, in case metrics not defined 229 String clazz = null; 230 231 Element met = e.getChild("metrics"); 232 if (met != null) { 233 if (QFactory.isEnabled(met)) { 234 clazz = QFactory.getAttributeValue(met, "class"); 235 String typeAttr = QFactory.getAttributeValue(met, "type"); 236 type = (clazz != null) ? "class" : // class attribute has precedence over type 237 (typeAttr != null) ? typeAttr : 238 type; 239 } else { 240 type = "none"; // <metrics enabled="false" /> equivalent to type="none" 241 } 242 } 243 244 ISOMsgMetrics m = switch (type) { 245 case "none" -> null; 246 247 case "default" -> { 248 var mc = new ISOMsgCounter(); 249 if (met != null) 250 f.setLogger(mc, met); 251 else 252 mc.setLogger(this.getLog().getLogger(), this.getRealm()+"/metrics"); 253 yield mc; 254 } 255 256 case "counter" -> { 257 var mc = new ISOMsgCounter(); 258 f.setLogger(mc, met); 259 f.setConfiguration(mc, met); 260 yield mc; 261 } 262 263 case "class" -> { 264 ISOMsgMetrics mc = f.newInstance(clazz); 265 f.setLogger(mc, met); 266 f.setConfiguration(mc, met); 267 yield mc; 268 } 269 270 default -> throw new ConfigurationException("Unknown metric type '"+type+"'"); 271 }; 272 273 metricsChannel.setISOMsgMetrics(m); 274 } // metrics config 275 276 if (getName () != null) 277 channel.setName (getName ()); 278 279 return channel; 280 } 281 282 protected void addFilters (FilteredChannel channel, Element e, QFactory fact) 283 throws ConfigurationException 284 { 285 for (Element f : e.getChildren("filter")) { 286 ISOFilter filter = fact.newInstance(f); 287 if (filter == null) continue; 288 String direction = QFactory.getAttributeValue(f, "direction"); 289 if (direction == null) 290 channel.addFilter(filter); 291 else if ("incoming".equalsIgnoreCase(direction)) 292 channel.addIncomingFilter(filter); 293 else if ("outgoing".equalsIgnoreCase(direction)) 294 channel.addOutgoingFilter(filter); 295 else if ("both".equalsIgnoreCase(direction)) { 296 channel.addIncomingFilter(filter); 297 channel.addOutgoingFilter(filter); 298 } 299 } 300 } 301 302 303 protected ISOChannel initChannel () throws ConfigurationException { 304 Element persist = getPersist (); 305 Element e = persist.getChild ("channel"); 306 if (e == null) 307 throw new ConfigurationException ("channel element missing"); 308 309 ISOChannel c = newChannel (e, getFactory()); 310 311 String socketFactoryString = getSocketFactory(); 312 if (socketFactoryString != null && c instanceof FactoryChannel) { 313 ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString); 314 if (sFac instanceof LogSource) { 315 ((LogSource) sFac).setLogger(log.getLogger(),getName() + ".socket-factory"); 316 } 317 getFactory().setConfiguration (sFac, e); 318 ((FactoryChannel)c).setSocketFactory(sFac); 319 } 320 321 return c; 322 } 323 324 protected void initSpaceAndQueues () throws ConfigurationException { 325 Element persist = getPersist (); 326 sp = grabSpace (persist.getChild ("space")); 327 in = Environment.get(persist.getChildTextTrim ("in")); 328 out = Environment.get(persist.getChildTextTrim ("out")); 329 writeOnly = "yes".equalsIgnoreCase (getPersist().getChildTextTrim ("write-only")); 330 if (in == null || (out == null && !writeOnly)) { 331 throw new ConfigurationException ("Misconfigured channel. Please verify in/out queues"); 332 } 333 String s = Environment.get(persist.getChildTextTrim ("reconnect-delay")); 334 delay = s != null ? Long.parseLong (s) : 10000; // reasonable default 335 keepAlive = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("keep-alive"))); 336 ignoreISOExceptions = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("ignore-iso-exceptions"))); 337 String t = Environment.get(persist.getChildTextTrim("timeout")); 338 timeout = t != null && t.length() > 0 ? Long.parseLong(t) : 0L; 339 ready = getName() + ".ready"; 340 reconnect = getName() + ".reconnect"; 341 waitForWorkersOnStop = "yes".equalsIgnoreCase(Environment.get(persist.getChildTextTrim ("wait-for-workers-on-stop"))); 342 } 343 344 @SuppressWarnings("unchecked") 345 public class Sender implements Runnable { 346 public Sender () { 347 super (); 348 } 349 public void run () { 350 Thread.currentThread().setName ("channel-sender-" + in); 351 352 while (running()){ 353 try { 354 checkConnection (); 355 if (!running()) 356 break; 357 Object o = sp.in (in, delay); 358 if (o instanceof ISOMsg m) { 359 if (!channel.isConnected()) { 360 // push back the message so it can be handled by another channel adaptor 361 sp.push(in, o); 362 continue; 363 } 364 channel.send(m); 365 tx++; 366 } else if (o instanceof Integer) { 367 if ((int)o != hashCode()) { 368 // STOP indicator seems to be for another channel adaptor 369 // sharing the same queue push it back and allow the companion 370 // channel to get it 371 sp.push (in, o, 500L); 372 ISOUtil.sleep (1000L); // larger sleep so that the indicator has time to timeout 373 } 374 } 375 else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) { 376 ((BaseChannel)channel).sendKeepAlive(); 377 } 378 } catch (ISOFilter.VetoException e) { 379 // getLog().warn ("channel-sender-"+in, e.getMessage ()); 380 } catch (ISOException e) { 381 // getLog().warn ("channel-sender-"+in, e.getMessage ()); 382 if (!ignoreISOExceptions) { 383 disconnect (); 384 } 385 ISOUtil.sleep (1000); // slow down on errors 386 } catch (Exception e) { 387 // getLog().warn ("channel-sender-"+in, e.getMessage ()); 388 disconnect (); 389 ISOUtil.sleep (1000); 390 } 391 } 392 } 393 } 394 @SuppressWarnings("unchecked") 395 public class Receiver implements Runnable { 396 public Receiver () { 397 super (); 398 } 399 public void run () { 400 Thread.currentThread().setName ("channel-receiver-"+out); 401 boolean shuttingDown = false; 402 Instant shutdownDeadline = null; 403 final Duration gracePeriod = Duration.ofMillis(softStop); 404 while (true) { 405 if (!shuttingDown && !running()) { 406 if (gracePeriod.isZero()) 407 break; 408 shuttingDown = true; 409 shutdownDeadline = Instant.now().plus(gracePeriod); 410 getLog().info("soft-stop (%s)".formatted(shutdownDeadline.atZone(ZoneId.systemDefault()))); 411 } 412 final boolean shouldExit = shuttingDown 413 ? Instant.now().isAfter(shutdownDeadline) 414 : !running(); 415 416 if (shouldExit) { 417 getLog().info ("stop"); 418 break; 419 } 420 try { 421 Object r = sp.rd (ready, 5000L); 422 if (r == null) { 423 continue; 424 } 425 ISOMsg m = channel.receive (); 426 rx++; 427 lastTxn = System.currentTimeMillis(); 428 if (timeout > 0) 429 sp.out (out, m, timeout); 430 else 431 sp.out (out, m); 432 } catch (ISOFilter.VetoException e) { 433 // getLog().warn ("channel-receiver-"+out+"-veto-exception", e.getMessage()); 434 } catch (ISOException e) { 435 if (running()) { 436 // getLog().warn ("channel-receiver-"+out, e); 437 if (!ignoreISOExceptions) { 438 sp.out (reconnect, Boolean.TRUE, delay); 439 disconnect (); 440 sp.push (in, hashCode()); // wake-up Sender 441 } 442 ISOUtil.sleep(1000); 443 } 444 } catch (SocketTimeoutException | EOFException e) { 445 if (running()) { 446 // getLog().warn ("channel-receiver-"+out, "Read timeout / EOF - reconnecting"); 447 sp.out (reconnect, Boolean.TRUE, delay); 448 disconnect (); 449 sp.push (in, hashCode()); // wake-up Sender 450 ISOUtil.sleep(1000); 451 } 452 } catch (Exception e) { 453 if (running()) { 454 // getLog().warn ("channel-receiver-"+out, e); 455 sp.out (reconnect, Boolean.TRUE, delay); 456 disconnect (); 457 sp.push (in, hashCode()); // wake-up Sender 458 ISOUtil.sleep(1000); 459 } 460 } 461 } 462 disconnect(); 463 } 464 } 465 protected void checkConnection () { 466 while (running() && sp.rdp (reconnect) != null) { 467 ISOUtil.sleep(1000); 468 } 469 while (running() && !channel.isConnected ()) { 470 SpaceUtil.wipe(sp, ready); 471 try { 472 channel.connect (); 473 } catch (IOException ignored) { 474 // channel.connect already logs - no need for more warnings 475 } 476 if (!channel.isConnected ()) 477 ISOUtil.sleep (delay); 478 else 479 connects++; 480 } 481 if (running() && sp.rdp (ready) == null) 482 sp.out (ready, new Date()); 483 } 484 protected void disconnect () { 485 // do not synchronize on this as both Sender and Receiver can deadlock against a thread calling stop() 486 synchronized (disconnectLock) { 487 try { 488 SpaceUtil.wipe(sp, ready); 489 channel.disconnect(); 490 } catch (Exception e) { 491 getLog().warn("disconnect", e); 492 } 493 } 494 } 495 private void disconnectLater(long delayInMillis) { 496 SpaceUtil.wipe(sp, ready); 497 scheduledExecutor.schedule(this::disconnect, delayInMillis, TimeUnit.MILLISECONDS); 498 } 499 public synchronized void setHost (String host) { 500 setProperty (getProperties ("channel"), "host", host); 501 setModified (true); 502 } 503 public String getHost () { 504 return getProperty (getProperties ("channel"), "host"); 505 } 506 public synchronized void setPort (int port) { 507 setProperty ( 508 getProperties ("channel"), "port", Integer.toString (port) 509 ); 510 setModified (true); 511 } 512 public int getPort () { 513 int port = 0; 514 try { 515 port = Integer.parseInt ( 516 getProperty (getProperties ("channel"), "port") 517 ); 518 } catch (NumberFormatException e) { 519 getLog().error(e); 520 } 521 return port; 522 } 523 public synchronized void setSocketFactory (String sFac) { 524 setProperty(getProperties("channel"), "socketFactory", sFac); 525 setModified(true); 526 } 527 528 public void resetCounters () { 529 rx = tx = connects = 0; 530 lastTxn = 0L; 531 } 532 public String getCountersAsString () { 533 StringBuilder sb = new StringBuilder(); 534 append (sb, "tx=", tx); 535 append (sb, ", rx=", rx); 536 append (sb, ", connects=", connects); 537 sb.append (", last="); 538 sb.append(lastTxn); 539 if (lastTxn > 0) { 540 sb.append (", idle="); 541 sb.append(System.currentTimeMillis() - lastTxn); 542 sb.append ("ms"); 543 } 544 return sb.toString(); 545 } 546 public int getTXCounter() { 547 return tx; 548 } 549 public int getRXCounter() { 550 return rx; 551 } 552 public int getConnectsCounter () { 553 return connects; 554 } 555 public long getLastTxnTimestampInMillis() { 556 return lastTxn; 557 } 558 public long getIdleTimeInMillis() { 559 return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L; 560 } 561 public String getSocketFactory() { 562 return getProperty(getProperties ("channel"), "socketFactory"); 563 } 564 public void dump (PrintStream p, String indent) { 565 p.println (indent + getCountersAsString()); 566 } 567 protected Space grabSpace (Element e) { 568 return SpaceFactory.getSpace (e != null ? e.getText() : ""); 569 } 570 protected void append (StringBuilder sb, String name, int value) { 571 sb.append (name); 572 sb.append (value); 573 } 574 575 private void initMeters() { 576 var tags = Tags.of("name", getName(), 577 "type", "client"); 578 var registry = getServer().getMeterRegistry(); 579 580 connectionsGauge = 581 MeterFactory.gauge 582 (registry, MeterInfo.ISOCHANNEL_CONNECTION_COUNT, 583 tags, 584 BaseUnits.SESSIONS, 585 () -> isConnected() ? 1 : 0 586 ); 587 588 if (channel instanceof ISOMsgMetrics.Source ms) { 589 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 590 if (mtr != null) { 591 mtr.addTags(tags); 592 mtr.register(registry); 593 } 594 } 595 } 596 597 private void removeMeters() { 598 var registry = getServer().getMeterRegistry(); 599 registry.remove(connectionsGauge); 600 601 if (channel instanceof ISOMsgMetrics.Source ms) { 602 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 603 if (mtr != null) 604 mtr.removeMeters(); 605 } 606 } 607}