001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.iso; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.io.PrintStream; 025import java.lang.ref.WeakReference; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.net.ServerSocket; 029import java.net.Socket; 030import java.net.BindException; 031import java.net.SocketException; 032import java.net.UnknownHostException; 033import java.time.Duration; 034import java.util.*; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Semaphore; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.concurrent.locks.LockSupport; 040 041import org.jpos.core.Configurable; 042import org.jpos.core.Configuration; 043import org.jpos.core.ConfigurationException; 044import org.jpos.jfr.ChannelEvent; 045import org.jpos.log.AuditLogEvent; 046import org.jpos.log.evt.*; 047import org.jpos.q2.QFactory; 048import org.jpos.util.*; 049 050/** 051 * Accept ServerChannel sessions and forwards them to ISORequestListeners 052 * @author Alejandro P. Revilla 053 * @author Bharavi Gade 054 * @version $Revision$ $Date$ 055 */ 056@SuppressWarnings("unchecked") 057public class ISOServer extends Observable 058 implements LogSource, Runnable, Observer, ISOServerMBean, Configurable, 059 Loggeable, ISOServerSocketFactory 060{ 061 private enum PermLogPolicy { 062 ALLOW_NOLOG, DENY_LOG, ALLOW_LOG, DENY_LOGWARNING 063 } 064 065 int port; 066 private InetAddress bindAddr; 067 068 private Map<String,Boolean> specificIPPerms= new HashMap<>(); // TRUE means allow; FALSE means deny 069 private List<String> wildcardAllow; 070 private List<String> wildcardDeny; 071 private PermLogPolicy ipPermLogPolicy= PermLogPolicy.ALLOW_NOLOG; 072 073 /** The channel template used for client-side connections. */ 074 protected ISOChannel clientSideChannel; 075 ISOPackager clientPackager; 076 /** Outgoing and incoming filter chains for client channels. */ 077 protected Collection clientOutgoingFilters, clientIncomingFilters; 078 /** Registered ISO request listeners. */ 079 protected List<ISORequestListener> listeners; 080 /** Default maximum number of concurrent sessions. */ 081 public static final int DEFAULT_MAX_SESSIONS = 100; 082 /** Realm suffix used for the last connected channel. */ 083 public static final String LAST = ":last"; 084 String name; 085 /** Timestamp of the last transaction processed. */ 086 protected long lastTxn = 0l; 087 /** The logger for this server. */ 088 protected Logger logger; 089 /** The realm string for this server. */ 090 protected String realm; 091 /** The realm string for channel sessions. */ 092 protected String realmChannel; 093 /** Optional factory for creating server sockets. */ 094 protected ISOServerSocketFactory socketFactory = null; 095 096 private AtomicInteger connectionCount = new AtomicInteger(); 097 098 private int backlog; 099 /** The server configuration. */ 100 protected Configuration cfg; 101 private volatile boolean shutdown = false; 102 private ServerSocket serverSocket; 103 private Map<String,WeakReference<ISOChannel>> channels; 104 /** If true, ISOExceptions are silently ignored. */ 105 protected boolean ignoreISOExceptions; 106 /** Registered server event listeners. */ 107 protected List<ISOServerEventListener> serverListeners = null; 108 private ExecutorService executor; 109 private Semaphore permits; 110 private int permitsCount = DEFAULT_MAX_SESSIONS; 111 private static final long SMALL_RELAX = 250; 112 private static final long LONG_RELAX = 5000; 113 private static final long SHUTDOWN_WAIT = 15000; 114 private final UUID uuid = UUID.randomUUID(); 115 116 /** 117 * Constructs an ISOServer on the given port. 118 * @param port port to listen 119 * @param clientSide client side ISOChannel, used as a "clonable template" to accept new connections 120 * @param maxSessions maximum number of concurrent sessions (0 = unlimited) 121 */ 122 public ISOServer(int port, ServerChannel clientSide, int maxSessions) { 123 super(); 124 this.port = port; 125 this.clientSideChannel = clientSide; 126 this.clientPackager = clientSide.getPackager(); 127 if (clientSide instanceof FilteredChannel fc) { 128 this.clientOutgoingFilters = fc.getOutgoingFilters(); 129 this.clientIncomingFilters = fc.getIncomingFilters(); 130 } 131 listeners = new ArrayList<>(); 132 name = ""; 133 channels = Collections.synchronizedMap(new HashMap<>()); 134 serverListeners = Collections.synchronizedList(new ArrayList<>()); 135 136 if (maxSessions > 0) 137 permitsCount = maxSessions; 138 permits = new Semaphore(permitsCount); 139 } 140 141 @Override 142 public void setConfiguration (Configuration cfg) throws ConfigurationException { 143 this.cfg = cfg; 144 configureConnectionPerms(); 145 backlog = cfg.getInt ("backlog", 5); 146 ignoreISOExceptions = cfg.getBoolean("ignore-iso-exceptions"); 147 String ip = cfg.get ("bind-address", null); 148 if (ip != null) { 149 try { 150 bindAddr = InetAddress.getByName (ip); 151 } catch (UnknownHostException e) { 152 throw new ConfigurationException ("Invalid bind-address " + ip, e); 153 } 154 } 155 if (socketFactory == null) { 156 socketFactory = this; 157 } 158 if (socketFactory != this && socketFactory instanceof Configurable) { 159 ((Configurable)socketFactory).setConfiguration (cfg); 160 } 161 executor = QFactory.executorService(cfg.getBoolean("virtual-threads", false)); 162 } 163 164 // Helper method to setConfiguration. Handles "allow" and "deny" params 165 private void configureConnectionPerms() throws ConfigurationException 166 { 167 boolean hasAllows= false, hasDenies= false; 168 169 String[] allows= cfg.getAll ("allow"); 170 if (allows != null && allows.length > 0) { 171 hasAllows= true; 172 173 for (String allowIP : allows) { 174 allowIP= allowIP.trim(); 175 176 if (allowIP.indexOf('*') == -1) { // specific IP with no wildcards 177 specificIPPerms.put(allowIP, true); 178 } else { // there's a wildcard 179 wildcardAllow= (wildcardAllow == null) ? new ArrayList<>() : wildcardAllow; 180 String[] parts= allowIP.split("[*]"); 181 wildcardAllow.add(parts[0]); // keep only the first part 182 } 183 } 184 } 185 186 String[] denies= cfg.getAll ("deny"); 187 if (denies != null && denies.length > 0) { 188 hasDenies= true; 189 190 for (String denyIP : denies) { 191 boolean conflict= false; // used for a little sanity check 192 193 denyIP= denyIP.trim(); 194 if (denyIP.indexOf('*') == -1) { // specific IP with no wildcards 195 Boolean oldVal= specificIPPerms.put(denyIP, false); 196 conflict= (oldVal == Boolean.TRUE); 197 } else { // there's a wildcard 198 wildcardDeny= (wildcardDeny == null) ? new ArrayList<>() : wildcardDeny; 199 String[] parts= denyIP.split("[*]"); 200 if (wildcardAllow != null && wildcardAllow.contains(parts[0])) 201 conflict= true; 202 else 203 wildcardDeny.add(parts[0]); // keep only the first part 204 } 205 206 if (conflict) { 207 throw new ConfigurationException( 208 "Conflicting IP permission in '"+getName()+"' configuration: 'deny' " 209 +denyIP+" while having an identical previous 'allow'."); 210 } 211 } 212 } 213 214 // sum up permission policy and logging type 215 ipPermLogPolicy= (!hasAllows && !hasDenies) ? PermLogPolicy.ALLOW_NOLOG : // default when no permissions specified 216 ( hasAllows && !hasDenies) ? PermLogPolicy.DENY_LOG : 217 (!hasAllows && hasDenies) ? PermLogPolicy.ALLOW_LOG : 218 PermLogPolicy.DENY_LOGWARNING; // mixed allows & denies, if nothing matches we'll DENY and log a warning 219 } 220 221 /** 222 * add an ISORequestListener 223 * @param l request listener to be added 224 * @see ISORequestListener 225 */ 226 public void addISORequestListener(ISORequestListener l) { 227 listeners.add (l); 228 } 229 /** 230 * remove an ISORequestListener 231 * @param l a request listener to be removed 232 * @see ISORequestListener 233 */ 234 public void removeISORequestListener(ISORequestListener l) { 235 listeners.remove (l); 236 } 237 238 /** 239 * Shutdown this server 240 */ 241 public void shutdown () { 242 shutdown = true; 243 executor.submit(() -> { 244 Thread.currentThread().setName("ISOServer-shutdown"); 245 shutdownServer(); 246 if (!cfg.getBoolean("keep-channels")) { 247 shutdownChannels(); 248 } 249 }); 250 executor.shutdown(); 251 try { 252 if (!executor.awaitTermination(SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) { 253 executor.shutdownNow(); 254 } 255 } catch (InterruptedException e) { 256 Thread.currentThread().interrupt(); 257 } 258 } 259 private void shutdownServer () { 260 try { 261 if (serverSocket != null) { 262 serverSocket.close (); 263 fireEvent(new ISOServerShutdownEvent(this)); 264 } 265 } catch (IOException e) { 266 fireEvent(new ISOServerShutdownEvent(this)); 267 Logger.log (new LogEvent (this, "shutdown", e)); 268 } 269 } 270 private void shutdownChannels () { 271 Iterator iter = channels.entrySet().iterator(); 272 while (iter.hasNext()) { 273 Map.Entry entry = (Map.Entry) iter.next(); 274 WeakReference ref = (WeakReference) entry.getValue(); 275 ISOChannel c = (ISOChannel) ref.get (); 276 if (c != null) { 277 try { 278 c.disconnect (); 279 fireEvent(new ISOServerClientDisconnectEvent(this, c)); 280 } catch (IOException e) { 281 Logger.log (new LogEvent (this, "shutdown", e)); 282 } 283 } 284 } 285 } 286 private void purgeChannels() { 287 channels.entrySet().removeIf(entry -> { 288 ISOChannel channel = entry.getValue().get(); 289 return channel == null || !channel.isConnected(); 290 }); 291 } 292 293 @Override 294 public ServerSocket createServerSocket(int port) throws IOException { 295 ServerSocket ss = new ServerSocket(); 296 try { 297 ss.setReuseAddress(true); 298 ss.bind(new InetSocketAddress(bindAddr, port), backlog); 299 } catch(SecurityException e) { 300 ss.close(); 301 fireEvent(new ISOServerShutdownEvent(this)); 302 throw e; 303 } catch(IOException e) { 304 ss.close(); 305 fireEvent(new ISOServerShutdownEvent(this)); 306 throw e; 307 } 308 return ss; 309 } 310 311 //----------------------------------------------------------------------------- 312 // -- Helper Session inner class. It's a Runnable, running in its own 313 // -- thread and handling a connection to this ISOServer 314 // -- 315 /** Creates a new server session for the given channel. 316 * @param channel the accepted server channel 317 * @return a new Session 318 */ 319 protected Session createSession (ServerChannel channel) { 320 return new Session (channel); 321 } 322 323 /** Handles the ISO 8583 exchange for a single accepted server connection. */ 324 protected class Session implements Runnable, LogSource { 325 ServerChannel channel; 326 String realm; 327 /** Creates a Session for the given accepted channel. 328 * @param channel the accepted server channel 329 */ 330 protected Session(ServerChannel channel) { 331 this.channel = channel; 332 realm = ISOServer.this.getRealm(); 333 } 334 @Override 335 public void run() { 336 setChanged (); 337 notifyObservers (); 338 UUID sessionUUID = uuid; 339 String sessionInfo = ""; 340 String endpoint = null; 341 if (channel instanceof BaseChannel baseChannel) { 342 Socket socket = baseChannel.getSocket (); 343 sessionInfo = socket.toString(); 344 sessionUUID = getSocketUUID(socket); 345 endpoint = baseChannel.toEndpoint(socket); 346 LogEvent ev = createSessionEvent(sessionUUID, endpoint) 347 .add(new SessionStart(getActiveConnections(), permitsCount, sessionInfo) 348 ); 349 if (!checkPermission (socket, ev)) 350 return; 351 } 352 try { 353 WeakReference<ISOChannel> wr = new WeakReference<> (channel); 354 channels.put (channel.getName(), wr); 355 channels.put (LAST, wr); // we are most likely the last one 356 while (true) try { 357 ISOMsg m = channel.receive(); 358 lastTxn = System.currentTimeMillis(); 359 for (ISORequestListener listener : listeners) { 360 if (listener.process(channel, m)) { 361 break; 362 } 363 } 364 } catch (ISOFilter.VetoException e) { 365 Logger.log(createSessionEvent("VetoException", sessionUUID, endpoint).add(e.getMessage())); 366 } catch (ISOException e) { 367 if (ignoreISOExceptions) { 368 Logger.log(createSessionEvent("ISOException", sessionUUID, endpoint).add(e.getMessage())); 369 } else { 370 throw e; 371 } 372 } 373 } catch (EOFException e) { 374 // Logger.log (new LogEvent (this, "session-warning", "<eof/>")); 375 } catch (SocketException e) { 376 if (!shutdown) 377 Logger.log (createSessionEvent("session-warning", sessionUUID, endpoint).add(e)); 378 } catch (InterruptedIOException e) { 379 // nothing to log 380 } catch (Throwable e) { 381 Logger.log (createSessionEvent("session-error", sessionUUID, endpoint).add(e)); 382 } 383 384 try { 385 channel.disconnect(); 386 fireEvent(new ISOServerClientDisconnectEvent(ISOServer.this, channel)); 387 } catch (IOException ex) { 388 Logger.log (createSessionEvent("session-error", sessionUUID, endpoint).add(ex)); 389 fireEvent(new ISOServerClientDisconnectEvent(ISOServer.this, channel)); 390 } 391 Logger.log(createSessionEvent(sessionUUID, endpoint) 392 .add(new SessionEnd(getActiveConnections(), permitsCount, sessionInfo) 393 ) 394 ); 395 } 396 @Override 397 public void setLogger (Logger logger, String realm) { 398 } 399 @Override 400 public String getRealm () { 401 return realm; 402 } 403 @Override 404 public Logger getLogger() { 405 return ISOServer.this.getLogger(); 406 } 407 408 private LogEvent createSessionEvent(UUID sessionUUID, String endpoint) { 409 LogEvent evt = new LogEvent().withSource(this).withTraceId(sessionUUID); 410 evt.withTag("session", sessionUUID.toString()); 411 if (endpoint != null) 412 evt.withTag("endpoint", endpoint); 413 return evt; 414 } 415 416 private LogEvent createSessionEvent(String tag, UUID sessionUUID, String endpoint) { 417 LogEvent evt = createSessionEvent(sessionUUID, endpoint); 418 evt.setTag(tag); 419 return evt; 420 } 421 422 private boolean checkPermission (Socket socket, LogEvent ev) { 423 try { 424 checkPermission0 (socket, ev); 425 return true; 426 } catch (ISOException e) { 427 try { 428 int delay = 1000 + new Random().nextInt(4000); 429 ev.addMessage(e.getMessage()); 430 ev.addMessage("delay=" + delay); 431 ISOUtil.sleep(delay); 432 socket.close(); 433 fireEvent(new ISOServerShutdownEvent(ISOServer.this)); 434 } catch (Throwable t) { 435 ev.addMessage (t); 436 } 437 } finally { 438 Logger.log (ev); 439 } 440 return false; 441 } 442 443 private void checkPermission0 (Socket socket, LogEvent evt) throws ISOException { 444 // if there are no allow/deny params, just return without doing any checks 445 // (i.e.: "silent allow policy", keeping backward compatibility) 446 if (specificIPPerms.isEmpty() && wildcardAllow == null && wildcardDeny == null) 447 return; 448 449 String ip= socket.getInetAddress().getHostAddress (); // The remote IP 450 451 // first, check allows or denies for specific/whole IPs (no wildcards) 452 boolean specificAllow = specificIPPerms.get(ip); 453 if (specificAllow == Boolean.TRUE) { // specific IP allow 454 evt.addMessage("access granted, ip=" + ip); 455 return; 456 } else if (specificAllow == Boolean.FALSE) { // specific IP deny 457 throw new ISOException("access denied, ip=" + ip); 458 } else { // no specific match under the specificIPPerms Map 459 // We check the wildcard lists, deny first 460 if (wildcardDeny != null) { 461 for (String wdeny : wildcardDeny) { 462 if (ip.startsWith(wdeny)) { 463 throw new ISOException ("access denied, ip=" + ip); 464 } 465 } 466 } 467 if (wildcardAllow != null) { 468 for (String wallow : wildcardAllow) { 469 if (ip.startsWith(wallow)) { 470 evt.addMessage("access granted, ip=" + ip); 471 return; 472 } 473 } 474 } 475 476 // Reaching this point means that nothing matched our specific or wildcard rules, so we fall 477 // back on the default permission policies and log type 478 switch (ipPermLogPolicy) { 479 case DENY_LOG: // only allows were specified, default policy is to deny non-matches and log the issue 480 throw new ISOException ("access denied, ip=" + ip); 481 // break; 482 483 case ALLOW_LOG: // only denies were specified, default policy is to allow non-matches and log the issue 484 evt.addMessage("access granted, ip=" + ip); 485 break; 486 487 case DENY_LOGWARNING: // mix of allows and denies were specified, but the IP matched no rules! 488 // so we adopt a deny policy but give a special warning 489 throw new ISOException ("access denied, ip=" + ip + " (WARNING: the IP did not match any rules!)"); 490 // break; 491 492 case ALLOW_NOLOG: // this is the default case when no allow/deny are specified 493 // the method will abort early on the first "if", so this is here just for completion 494 break; 495 } 496 497 } 498 // we should never reach this point!! :-) 499 } 500 } // inner class Session 501 502 //------------------------------------------------------------------------------- 503 //-- This is the main run for this ISOServer's Thread 504 @Override 505 public void run() { 506 if (socketFactory == null) { 507 socketFactory = this; 508 } 509 int round = 0; 510 serverLoop : while (!shutdown) { 511 round++; 512 try { 513 if (permits.availablePermits() <= 0) { 514 LockSupport.parkNanos(Duration.ofMillis(SMALL_RELAX).toNanos()); 515 if (round % 240 == 0 && cfg.getBoolean("permits-exhaustion-warning", true)) { 516 log(new Warning("permits exhausted " + serverSocket.toString())); 517 } 518 continue; 519 } 520 serverSocket = socketFactory.createServerSocket(port); 521 log (new Listen(port, bindAddr, permits.availablePermits(), backlog)); 522 while (!shutdown) { 523 try { 524 if (permits.availablePermits() <= 0) { 525 ChannelEvent jfr = new ChannelEvent.AcceptException( 526 "Available permits too low (%d)".formatted(permits.availablePermits()) 527 ); 528 jfr.begin(); 529 try { 530 serverSocket.close(); 531 fireEvent(new ISOServerShutdownEvent(this)); 532 } catch (IOException e){ 533 log (new ThrowableAuditLogEvent(e)); 534 } finally { 535 jfr.commit(); 536 } 537 continue serverLoop; 538 } 539 final ServerChannel channel = (ServerChannel) clientSideChannel.clone(); 540 channel.accept (serverSocket); 541 if (connectionCount.getAndIncrement() % 100 == 0) { 542 purgeChannels (); 543 } 544 executor.submit (() -> { 545 try { 546 permits.acquireUninterruptibly(); 547 createSession(channel).run(); 548 } finally { 549 permits.release(); 550 } 551 }); 552 setChanged (); 553 notifyObservers (this); 554 fireEvent(new ISOServerAcceptEvent(this, channel)); 555 if (channel instanceof Observable) { 556 ((Observable)channel).addObserver (this); 557 } 558 } catch (SocketException e) { 559 if (!shutdown) { 560 log (new ThrowableAuditLogEvent(e)); 561 relax(); 562 continue serverLoop; 563 } 564 } catch (IOException e) { 565 log (new ThrowableAuditLogEvent(e)); 566 relax(); 567 } 568 } // while !shutdown 569 } catch (BindException e) { 570 warn(new Listen(port, bindAddr, 571 permits.availablePermits(), backlog, 572 "(round "+round+") "+e)); 573 relax(); 574 } catch (Throwable e) { 575 log (new ThrowableAuditLogEvent(e)); 576 relax(); 577 } 578 } 579 } // ISOServer's run() 580 //------------------------------------------------------------------------------- 581 582 private void relax() { 583 LockSupport.parkNanos(Duration.ofMillis(LONG_RELAX).toNanos()); 584 } 585 586 /** 587 * associates this ISOServer with a name using NameRegistrar 588 * @param name name to register 589 * @see NameRegistrar 590 */ 591 public void setName (String name) { 592 this.name = name; 593 NameRegistrar.register ("server."+name, this); 594 } 595 /** 596 * Returns the ISOServer registered under the given name. 597 * @param name the server's registered name 598 * @return ISOServer instance with given name. 599 * @throws NameRegistrar.NotFoundException if not found in registry 600 * @see NameRegistrar 601 */ 602 public static ISOServer getServer (String name) 603 throws NameRegistrar.NotFoundException 604 { 605 return NameRegistrar.get ("server."+name); 606 } 607 /** 608 * Returns this server's registered name. 609 * @return this ISOServer's name ("" if no name was set) 610 */ 611 public String getName() { 612 return this.name; 613 } 614 @Override 615 public void setLogger (Logger logger, String realm) { 616 this.logger = logger; 617 this.realm = realm; 618 this.realmChannel = realm + ".channel"; 619 } 620 @Override 621 public String getRealm () { 622 return realm; 623 } 624 @Override 625 public Logger getLogger() { 626 return logger; 627 } 628 @Override 629 public void update(Observable o, Object arg) { 630 setChanged (); 631 notifyObservers (arg); 632 } 633 /** 634 * Gets the ISOClientSocketFactory (may be null) 635 * @return the configured server socket factory, or {@code null} 636 * @see ISOClientSocketFactory 637 * @since 1.3.3 638 */ 639 public ISOServerSocketFactory getSocketFactory() { 640 return socketFactory; 641 } 642 /** 643 * Sets the specified Socket Factory to create sockets 644 * @param socketFactory the ISOClientSocketFactory 645 * @see ISOClientSocketFactory 646 * @since 1.3.3 647 */ 648 public void setSocketFactory(ISOServerSocketFactory socketFactory) { 649 this.socketFactory = socketFactory; 650 } 651 @Override 652 public int getPort () { 653 return port; 654 } 655 @Override 656 public void resetCounters () { 657 connectionCount.set(0); 658 lastTxn = 0l; 659 } 660 661 /** 662 * Returns the cumulative number of connections accepted by this server. 663 * @return number of connections accepted by this server 664 */ 665 @Override 666 public int getConnectionCount () { 667 return connectionCount.get(); 668 } 669 670 /** 671 * Returns the most recently accepted ISOChannel. 672 * @return most recently connected ISOChannel or null 673 */ 674 public ISOChannel getLastConnectedISOChannel () { 675 return getISOChannel (LAST); 676 } 677 678 /** 679 * Returns the ISOChannel registered under the given name. 680 * @param name the channel name 681 * @return ISOChannel under the given name 682 */ 683 public ISOChannel getISOChannel (String name) { 684 WeakReference ref = channels.get (name); 685 if (ref != null) { 686 return (ISOChannel) ref.get (); 687 } 688 return null; 689 } 690 691 692 @Override 693 public String getISOChannelNames () { 694 StringBuilder sb = new StringBuilder (); 695 Iterator iter = channels.entrySet().iterator(); 696 for (int i=0; iter.hasNext(); i++) { 697 Map.Entry entry = (Map.Entry) iter.next(); 698 WeakReference ref = (WeakReference) entry.getValue(); 699 ISOChannel c = (ISOChannel) ref.get (); 700 if (c != null && !LAST.equals (entry.getKey()) && c.isConnected()) { 701 if (i > 0 && !sb.isEmpty()) { 702 sb.append (' '); 703 } 704 sb.append (entry.getKey()); 705 } 706 } 707 return sb.toString(); 708 } 709 /** Returns a human-readable string summarising RX/TX/connection counters. 710 * @return counters summary string 711 */ 712 public String getCountersAsString () { 713 StringBuilder sb = new StringBuilder (); 714 int cnt[] = getCounters(); 715 sb.append ("connected="); 716 sb.append (cnt[2]); 717 sb.append (", rx="); 718 sb.append (cnt[0]); 719 sb.append (", tx="); 720 sb.append (cnt[1]); 721 sb.append (", last="); 722 sb.append (lastTxn); 723 if (lastTxn > 0) { 724 sb.append (", idle="); 725 sb.append(System.currentTimeMillis() - lastTxn); 726 sb.append ("ms"); 727 } 728 return sb.toString(); 729 } 730 731 /** Returns an array of [rx, tx, connected] counters across all active channels. 732 * @return int array: [rx, tx, connected] 733 */ 734 public int[] getCounters() 735 { 736 Iterator iter = channels.entrySet().iterator(); 737 int[] cnt = new int[3]; 738 cnt[2] = 0; 739 for (int i=0; iter.hasNext(); i++) { 740 Map.Entry entry = (Map.Entry) iter.next(); 741 WeakReference ref = (WeakReference) entry.getValue(); 742 ISOChannel c = (ISOChannel) ref.get (); 743 if (c != null && !LAST.equals (entry.getKey()) && c.isConnected()) { 744 cnt[2]++; 745 if (c instanceof BaseChannel) { 746 int[] cc = ((BaseChannel)c).getCounters(); 747 cnt[0] += cc[ISOChannel.RX]; 748 cnt[1] += cc[ISOChannel.TX]; 749 } 750 } 751 } 752 return cnt; 753 } 754 755 @Override 756 public int getTXCounter() { 757 int cnt[] = getCounters(); 758 return cnt[1]; 759 } 760 @Override 761 public int getRXCounter() { 762 int cnt[] = getCounters(); 763 return cnt[0]; 764 } 765 /** 766 * Returns the current number of active accepted connections. 767 * 768 * @return active connection count 769 */ 770 public int getConnections () { 771 return connectionCount.get(); 772 } 773 @Override 774 public long getLastTxnTimestampInMillis() { 775 return lastTxn; 776 } 777 @Override 778 public long getIdleTimeInMillis() { 779 return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L; 780 } 781 782 783 @Override 784 public String getCountersAsString (String isoChannelName) { 785 ISOChannel channel = getISOChannel(isoChannelName); 786 StringBuffer sb = new StringBuffer(); 787 if (channel instanceof BaseChannel) { 788 int[] counters = ((BaseChannel)channel).getCounters(); 789 append (sb, "rx=", counters[ISOChannel.RX]); 790 append (sb, ", tx=", counters[ISOChannel.TX]); 791 append (sb, ", connects=", counters[ISOChannel.CONNECT]); 792 } 793 return sb.toString(); 794 } 795 @Override 796 public void dump (PrintStream p, String indent) { 797 p.println (indent + getCountersAsString()); 798 Iterator iter = channels.entrySet().iterator(); 799 String inner = indent + " "; 800 for (int i=0; iter.hasNext(); i++) { 801 Map.Entry entry = (Map.Entry) iter.next(); 802 WeakReference ref = (WeakReference) entry.getValue(); 803 ISOChannel c = (ISOChannel) ref.get (); 804 if (c != null && !LAST.equals (entry.getKey()) && c.isConnected() && c instanceof BaseChannel) { 805 StringBuilder sb = new StringBuilder (); 806 int[] cc = ((BaseChannel)c).getCounters(); 807 sb.append (inner); 808 sb.append (entry.getKey()); 809 sb.append (": rx="); 810 sb.append (cc[ISOChannel.RX]); 811 sb.append (", tx="); 812 sb.append (cc[ISOChannel.TX]); 813 sb.append (", last="); 814 sb.append (lastTxn); 815 p.println (sb); 816 } 817 } 818 } 819 private void append (StringBuffer sb, String name, int value) { 820 sb.append (name); 821 sb.append (value); 822 } 823 824 /** Registers a listener for server events. 825 * @param listener the listener to add 826 */ 827 public void addServerEventListener(ISOServerEventListener listener) { 828 serverListeners.add(listener); 829 } 830 /** Unregisters a previously added server event listener. 831 * @param listener the listener to remove 832 */ 833 public void removeServerEventListener(ISOServerEventListener listener) { 834 serverListeners.remove(listener); 835 } 836 837 /** Dispatches an event to all registered server event listeners. 838 * @param event the event to dispatch 839 */ 840 public void fireEvent(EventObject event) { 841 for (ISOServerEventListener l : serverListeners) { 842 try { 843 l.handleISOServerEvent(event); 844 } 845 catch (Throwable ignore) { 846 /* 847 * Don't want an exception from a handler to exit the loop or 848 * let it bubble up. 849 * If it bubbles up it can cause side effects like getting caught 850 * in the throwable catch leading to server trying to listen on 851 * the same port. 852 * We don't want a side effect in jpos caused by custom user 853 * handler code. 854 */ 855 } 856 857 } 858 } 859 860 private void warn (AuditLogEvent log) { 861 Logger.log(new LogEvent(Log.WARN) 862 .withSource(this) 863 .withTraceId(uuid) 864 .add(log) 865 ); 866 } 867 868 private void log (AuditLogEvent log) { 869 Logger.log(new LogEvent() 870 .withSource(this) 871 .withTraceId(uuid) 872 .add(log) 873 ); 874 } 875 876 private void log (String level, String message) { 877 LogEvent evt = new LogEvent (this, level).withTraceId(uuid); 878 evt.addMessage (message); 879 Logger.log (evt); 880 } 881 882 /** Returns the current number of active (in-use) connections. 883 * @return number of active connections 884 */ 885 public int getActiveConnections () { 886 return permitsCount - permits.availablePermits(); 887 } 888 889 private UUID getSocketUUID(Socket socket) { 890 return socket != null ? 891 new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ socket.hashCode()) : 892 uuid; 893 } 894}