001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.iso; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.core.handlers.exception.ExceptionHandler; 025import org.jpos.core.handlers.exception.ExceptionHandlerAware; 026import org.jpos.iso.ISOFilter.VetoException; 027import org.jpos.iso.header.BaseHeader; 028import org.jpos.jfr.ChannelEvent; 029import org.jpos.log.evt.Connect; 030import org.jpos.log.evt.Disconnect; 031import org.jpos.metrics.MeterInfo; 032import org.jpos.metrics.iso.ISOMsgMetrics; 033import org.jpos.util.*; 034 035import javax.net.ssl.SSLSocket; 036import java.io.*; 037import java.net.*; 038import java.util.*; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReentrantLock; 042 043/* 044 * BaseChannel was ISOChannel. Now ISOChannel is an interface 045 * Revision: 1.34 Date: 2000/04/08 23:54:55 046 */ 047 048/** 049 * ISOChannel is an abstract class that provides functionality that 050 * allows the transmission and reception of ISO 8583 Messages 051 * over a TCP/IP session. 052 * <p> 053 * This class is not thread-safe. 054 * <p> 055 * ISOChannel is Observable in order to support GUI components 056 * such as ISOChannelPanel. 057 * <br> 058 * It now support the new Logger architecture so we will 059 * probably setup ISOChannelPanel to be a LogListener instead 060 * of being an Observer in future releases. 061 * 062 * @author Alejandro P. Revilla 063 * @author Bharavi Gade 064 * @version $Revision$ $Date$ 065 * @see ISOMsg 066 * @see MUX 067 * @see ISOException 068 * @see org.jpos.iso.channel.CSChannel 069 * @see Logger 070 * 071 */ 072@SuppressWarnings("unchecked") 073public abstract class BaseChannel extends Observable 074 implements FilteredChannel, ClientChannel, ServerChannel, FactoryChannel, ISOMsgMetrics.Source, 075 LogSource, Configurable, BaseChannelMBean, Cloneable, ExceptionHandlerAware 076{ 077 private Socket socket; 078 private String host, localIface; 079 private String[] hosts; 080 private int[] ports; 081 private int port, timeout, connectTimeout, localPort; 082 private int maxPacketLength = 100000; 083 private boolean keepAlive; 084 private boolean expectKeepAlive; 085 private boolean soLingerOn = true; 086 private int soLingerSeconds = 5; 087 private Configuration cfg; 088 /** Whether this channel is in a usable state. */ 089 protected boolean usable; 090 /** When true, this channel's own header overrides the message's header. */ 091 protected boolean overrideHeader; 092 private String name; 093 private long sendTimeout = 15000L; 094 // private int serverPort = -1; 095 /** Input stream for reading from the connected socket. */ 096 protected DataInputStream serverIn; 097 /** Output stream for writing to the connected socket. */ 098 protected DataOutputStream serverOut; 099 // The lock objects should be final, and never changed, but due to the clone() method, they must be set there. 100 /** Lock guarding {@link #serverIn} and read operations. */ 101 protected Lock serverInLock = new ReentrantLock(); 102 /** Lock guarding {@link #serverOut} and write operations. */ 103 protected Lock serverOutLock = new ReentrantLock(); 104 /** The packager used to pack/unpack ISO messages. */ 105 protected ISOPackager packager; 106 /** Server socket used when this channel acts as a server. */ 107 protected ServerSocket serverSocket = null; 108 /** Incoming and outgoing filter chains. */ 109 protected List<ISOFilter> incomingFilters, outgoingFilters; 110 /** Optional factory for creating client sockets. */ 111 protected ISOClientSocketFactory socketFactory = null; 112 113 /** Message counters array. */ 114 protected int[] cnt; 115 116 /** The logger for this channel. */ 117 protected Logger logger = null; 118 /** The realm string for this channel. */ 119 protected String realm = null; 120 /** The original realm before any override. */ 121 protected String originalRealm = null; 122 /** Message header bytes. */ 123 protected byte[] header = null; 124 private static final int DEFAULT_TIMEOUT = 300000; 125 private int nextHostPort = 0; 126 private boolean roundRobin = false; 127 private boolean debugIsoError = true; 128 129 private ISOMsgMetrics isoMsgMetrics; 130 131 private final UUID uuid; 132 133 private final Map<Class<? extends Exception>, List<ExceptionHandler>> exceptionHandlers = new HashMap<>(); 134 135 /** 136 * constructor shared by server and client 137 * ISOChannels (which have different signatures) 138 */ 139 public BaseChannel () { 140 super(); 141 uuid = UUID.randomUUID(); 142 cnt = new int[SIZEOF_CNT]; 143 name = ""; 144 incomingFilters = new ArrayList<>(); 145 outgoingFilters = new ArrayList<>(); 146 setHost(null, 0); 147 } 148 149 /** 150 * constructs a client ISOChannel 151 * @param host server TCP Address 152 * @param port server port number 153 * @param p an ISOPackager 154 * @see ISOPackager 155 */ 156 public BaseChannel (String host, int port, ISOPackager p) { 157 this(); 158 setHost(host, port); 159 setPackager(p); 160 } 161 /** 162 * constructs a server ISOChannel 163 * @param p an ISOPackager 164 * @exception IOException on error on I/O error on I/O error 165 * @see ISOPackager 166 */ 167 public BaseChannel (ISOPackager p) throws IOException { 168 this(); 169 setPackager (p); 170 } 171 172 /** 173 * constructs a server ISOChannel associated with a Server Socket 174 * @param p an ISOPackager 175 * @param serverSocket where to accept a connection 176 * @exception IOException on error on I/O error on I/O error 177 * @see ISOPackager 178 */ 179 public BaseChannel (ISOPackager p, ServerSocket serverSocket) throws IOException { 180 this(); 181 setPackager (p); 182 setServerSocket (serverSocket); 183 } 184 185 /** 186 * initialize an ISOChannel 187 * @param host server TCP Address 188 * @param port server port number 189 */ 190 public void setHost(String host, int port) { 191 this.host = host; 192 this.port = port; 193 this.hosts = new String[] { host }; 194 this.ports = new int[] { port }; 195 } 196 197 /** 198 * initialize an ISOChannel 199 * @param iface server TCP Address 200 * @param port server port number 201 */ 202 public void setLocalAddress (String iface, int port) { 203 this.localIface = iface; 204 this.localPort = port; 205 } 206 207 208 /** 209 * @param host to connect (client ISOChannel) 210 */ 211 public void setHost (String host) { 212 this.host = host; 213 this.hosts = new String[] { host }; 214 } 215 /** 216 * @param port to connect (client ISOChannel) 217 */ 218 public void setPort (int port) { 219 this.port = port; 220 this.ports = new int[] { port }; 221 } 222 /** 223 * @return hostname (may be null) 224 */ 225 public String getHost() { 226 return host; 227 } 228 /** 229 * @return port number 230 */ 231 public int getPort() { 232 return port; 233 } 234 /** 235 * set Packager for channel 236 * @param p an ISOPackager 237 * @see ISOPackager 238 */ 239 public void setPackager(ISOPackager p) { 240 this.packager = p; 241 } 242 243 /** 244 * @return current packager 245 */ 246 public ISOPackager getPackager() { 247 return packager; 248 } 249 250 /** 251 * Associates this ISOChannel with a server socket 252 * @param sock where to accept a connection 253 */ 254 public void setServerSocket (ServerSocket sock) { 255 setHost (null, 0); 256 this.serverSocket = sock; 257 name = ""; 258 } 259 260 @Override 261 public Map<Class<? extends Exception>, List<ExceptionHandler>> getExceptionHandlers() { 262 return exceptionHandlers; 263 } 264 265 /** 266 * reset stat info 267 */ 268 public void resetCounters() { 269 for (int i=0; i<SIZEOF_CNT; i++) 270 cnt[i] = 0; 271 } 272 /** 273 * Returns the message counters array. 274 * @return counters 275 */ 276 public int[] getCounters() { 277 return cnt; 278 } 279 /** 280 * @return the connection state 281 */ 282 public boolean isConnected() { 283 return socket != null && usable; 284 } 285 286 @Override 287 public void setISOMsgMetrics(ISOMsgMetrics metrics) { 288 isoMsgMetrics = metrics; 289 } 290 @Override 291 public ISOMsgMetrics getISOMsgMetrics() { 292 return isoMsgMetrics; 293 } 294 295 /** 296 * setup I/O Streams from socket 297 * @param socket a Socket (client or server) 298 * @exception IOException on error on I/O error on I/O error 299 */ 300 protected void connect (Socket socket) throws IOException { 301 this.socket = socket; 302 applyTimeout(); 303 serverInLock.lock(); 304 try { 305 serverIn = new DataInputStream ( 306 new BufferedInputStream (socket.getInputStream ()) 307 ); 308 } finally { 309 serverInLock.unlock(); 310 } 311 serverOutLock.lock(); 312 try { 313 serverOut = new DataOutputStream( 314 new BufferedOutputStream(socket.getOutputStream(), 2048) 315 ); 316 } finally { 317 serverOutLock.unlock(); 318 } 319 postConnectHook(); 320 usable = true; 321 cnt[CONNECT]++; 322 setChanged(); 323 notifyObservers(); 324 } 325 /** Hook called after a successful connection. May be overridden by subclasses. 326 * @throws IOException on I/O error 327 */ 328 protected void postConnectHook() throws IOException { 329 // do nothing 330 } 331 /** 332 * factory method pattern (as suggested by Vincent.Greene@amo.com) 333 * @param host remote host 334 * @param port remote port 335 * @throws IOException on error 336 * 337 * Use Socket factory if exists. If it is missing create a normal socket 338 * @see ISOClientSocketFactory 339 * @return newly created socket 340 */ 341 protected Socket newSocket(String host, int port) throws IOException { 342 try { 343 if (socketFactory != null) 344 return socketFactory.createSocket (host, port); 345 else { 346 Socket s = new Socket(); 347 if (localIface != null || localPort != 0) { 348 InetAddress addr = localIface == null ? 349 InetAddress.getLocalHost() : 350 InetAddress.getByName(localIface); 351 s.bind(new InetSocketAddress(addr, localPort)); 352 } 353 s.connect ( 354 new InetSocketAddress (host, port), 355 connectTimeout 356 ); 357 return s; 358 } 359 } catch (ISOException e) { 360 throw new IOException (e.getMessage()); 361 } 362 } 363 /** Creates a new socket connected to one of the given hosts/ports. 364 * @param hosts array of host names to try 365 * @param ports corresponding array of port numbers 366 * @param evt log event for connection diagnostics 367 * @return the connected socket 368 * @throws IOException on connection failure 369 */ 370 protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt) 371 throws IOException 372 { 373 Socket s = null; 374 if (!roundRobin) 375 nextHostPort = 0; 376 377 IOException lastIOException = null; 378 String h=null; 379 int p=0; 380 for (int i=0; i<hosts.length; i++) { 381 try { 382 int ii = nextHostPort++ % hosts.length; 383 h = hosts[ii]; 384 p = ports[ii]; 385 s = newSocket (h, p); 386 evt.addMessage(new Connect( 387 s.getInetAddress().getHostAddress(), 388 s.getPort(), 389 s.getLocalPort(), null) 390 ); 391 break; 392 } catch (IOException e) { 393 lastIOException = e; 394 evt.addMessage( 395 new Connect(h, p, 0, "%s:%d %s (%s)".formatted( 396 h, p, Caller.shortClassName(lastIOException.getClass().getName()), Caller.info(1) 397 ) 398 ) 399 ); 400 } 401 } 402 if (s == null) 403 throw new IOException ("%s:%d %s (%s)".formatted(h, p, Caller.shortClassName(lastIOException.getClass().getName()), Caller.info(1))); 404 return s; 405 } 406 /** 407 * Returns the current connected socket. 408 * @return current socket 409 */ 410 public Socket getSocket() { 411 return socket; 412 } 413 /** 414 * Returns the current server socket. 415 * @return current serverSocket 416 */ 417 public ServerSocket getServerSocket() { 418 return serverSocket; 419 } 420 421 /** 422 * sets socket timeout (as suggested by 423 * Leonard Thomas {@literal <leonard@rhinosystemsinc.com>}) 424 * @param timeout in milliseconds 425 * @throws SocketException on error 426 */ 427 public void setTimeout (int timeout) throws SocketException { 428 this.timeout = timeout; 429 applyTimeout(); 430 } 431 /** Returns the socket read timeout in milliseconds. 432 * @return socket timeout in milliseconds 433 */ 434 public int getTimeout () { 435 return timeout; 436 } 437 438 /** 439 * sets timeout, and also keep alive 440 * @throws SocketException on error setting the timeout 441 */ 442 protected void applyTimeout () throws SocketException { 443 if (socket != null && socket.isConnected()) { 444 if (keepAlive) 445 socket.setKeepAlive(keepAlive); 446 if (timeout >= 0) 447 socket.setSoTimeout(timeout); 448 } 449 } 450 /** 451 * Socket SO_LINGER option to use when closing the socket. 452 * @see java.net.Socket#setSoLinger(boolean, int) 453 * @param on if true, enable SO_LINGER 454 * @param linger SO_LINGER timeout in seconds 455 */ 456 public void setSoLinger(boolean on, int linger) { 457 this.soLingerOn = on; 458 this.soLingerSeconds = linger; 459 } 460 /** Returns whether SO_LINGER is enabled on the socket. 461 * @return true if SO_LINGER is enabled 462 */ 463 public boolean isSoLingerOn() { 464 return soLingerOn; 465 } 466 /** Returns the SO_LINGER timeout in seconds. 467 * @return SO_LINGER timeout in seconds 468 */ 469 public int getSoLingerSeconds() { 470 return soLingerSeconds; 471 } 472 /** 473 * Connects client ISOChannel to server 474 * @exception IOException on I/O error on I/O error on I/O error 475 */ 476 public void connect () throws IOException { 477 ChannelEvent jfr = new ChannelEvent.Connect(); 478 jfr.begin(); 479 LogEvent evt = new LogEvent (this, "connect").withTraceId(uuid); 480 try { 481 socket = newSocket (hosts, ports, evt); 482 if (getHost() != null) 483 jfr.setDetail("%s:%d".formatted(getHost(), getPort())); 484 connect(socket); 485 jfr.append("%d".formatted(socket.getLocalPort())); 486 evt.withTraceId(getSocketUUID()); 487 evt.withTag("endpoint", toEndpoint(socket)); 488 applyTimeout(); 489 } catch (IOException e) { 490 jfr = new ChannelEvent.ConnectionException(jfr.getDetail()); 491 jfr.begin(); 492 jfr.append (e.getMessage()); 493 throw e; 494 } finally { 495 Logger.log (evt); 496 jfr.commit(); 497 } 498 } 499 500 /** 501 * Accepts connection 502 * @exception IOException on I/O error on I/O error on I/O error 503 */ 504 public void accept(ServerSocket s) throws IOException { 505 ChannelEvent jfr = new ChannelEvent.Accept(); 506 jfr.begin(); 507 try { 508 Socket ss = s.accept(); 509 this.name = "%d %s:%d".formatted( 510 ss.getLocalPort(), 511 ss.getInetAddress().getHostAddress(), 512 ss.getPort() 513 ); 514 jfr.setDetail(name); 515 connect(ss); 516 } catch (IOException e) { 517 jfr = new ChannelEvent.AcceptException(e.getMessage()); 518 jfr.begin(); 519 throw e; 520 } finally { 521 jfr.commit(); 522 } 523 524 // Warning - closing here breaks ISOServer, we need an 525 // accept that keep ServerSocket open. 526 // s.close(); 527 } 528 529 /** 530 * @param b - new Usable state (used by ISOMUX internals to 531 * flag as unusable in order to force a reconnection) 532 */ 533 public void setUsable(boolean b) { 534 Logger.log (new LogEvent (this, "usable", b)); 535 usable = b; 536 } 537 /** 538 * allow subclasses to override default packager 539 * on outgoing messages 540 * @param m outgoing ISOMsg 541 * @return ISOPackager 542 */ 543 protected ISOPackager getDynamicPackager (ISOMsg m) { 544 return packager; 545 } 546 547 /** 548 * allow subclasses to override default packager 549 * on incoming messages 550 * @param image incoming message image 551 * @return ISOPackager 552 */ 553 protected ISOPackager getDynamicPackager (byte[] image) { 554 return packager; 555 } 556 /** 557 * allow subclasses to override default packager 558 * on incoming messages 559 * @param header message header 560 * @param image incoming message image 561 * @return ISOPackager 562 */ 563 protected ISOPackager getDynamicPackager (byte[] header, byte[] image) { 564 return getDynamicPackager(image); 565 } 566 567 568 /** 569 * Allow subclasses to override the Default header on 570 * incoming messages. 571 * @param image message image 572 * @return ISOHeader instance 573 */ 574 protected ISOHeader getDynamicHeader (byte[] image) { 575 return image != null ? 576 new BaseHeader (image) : null; 577 } 578 /** Sends the message length prefix; default implementation is a no-op. 579 * @param len the message length to send 580 * @throws IOException on I/O error 581 */ 582 protected void sendMessageLength(int len) throws IOException { } 583 /** Sends the message header bytes. 584 * @param m the ISO message being sent 585 * @param len the message length 586 * @throws IOException on I/O error 587 */ 588 protected void sendMessageHeader(ISOMsg m, int len) throws IOException { 589 if (!isOverrideHeader() && m.getHeader() != null) 590 serverOut.write(m.getHeader()); 591 else if (header != null) 592 serverOut.write(header); 593 } 594 /** 595 * @deprecated use sendMessageTrailer(ISOMsg m, byte[] b) instead. 596 * @param m a reference to the ISOMsg 597 * @param len the packed image length 598 * @throws IOException on error 599 */ 600 @Deprecated 601 protected void sendMessageTrailler(ISOMsg m, int len) throws IOException 602 { 603 } 604 605 /** 606 * @deprecated use {@link #sendMessageTrailer(ISOMsg, byte[])} instead. 607 * @param m the ISO message being sent 608 * @param b the trailer bytes to send 609 * @throws IOException on I/O error 610 */ 611 @SuppressWarnings ("deprecation") 612 @Deprecated 613 protected void sendMessageTrailler(ISOMsg m, byte[] b) throws IOException { 614 sendMessageTrailler (m, b.length); 615 } 616 617 /** 618 * Sends the message trailer bytes. 619 * @param m the unpacked ISO message 620 * @param b the packed message image 621 * @throws IOException on I/O error 622 */ 623 @SuppressWarnings("deprecation") 624 protected void sendMessageTrailer(ISOMsg m, byte[] b) throws IOException { 625 sendMessageTrailler(m, b); 626 } 627 628 629 /** 630 * @deprecated use {@link #getMessageTrailer(ISOMsg)} instead. 631 * @throws IOException on I/O error 632 */ 633 @Deprecated 634 protected void getMessageTrailler() throws IOException { 635 } 636 637 /** 638 * Reads the message trailer from the stream and optionally stores it in the given message. 639 * @param m the ISOMessage to store the trailer data in 640 * @throws IOException on I/O error 641 * @see ISOMsg#setTrailer(byte[]) 642 */ 643 @SuppressWarnings("deprecation") 644 protected void getMessageTrailer(ISOMsg m) throws IOException { 645 getMessageTrailler(); 646 } 647 648 /** Reads raw message bytes from the stream into the given buffer. 649 * @param b the destination buffer 650 * @param offset starting offset in the buffer 651 * @param len number of bytes to read 652 * @throws IOException on I/O error 653 * @throws ISOException on protocol error 654 */ 655 protected void getMessage (byte[] b, int offset, int len) throws IOException, ISOException { 656 serverIn.readFully(b, offset, len); 657 } 658 /** Reads and returns the message length prefix from the stream. 659 * @return the message length 660 * @throws IOException on I/O error 661 * @throws ISOException on protocol error 662 */ 663 protected int getMessageLength() throws IOException, ISOException { 664 return -1; 665 } 666 /** Returns the fixed header length for this channel. 667 * @return header length in bytes 668 */ 669 protected int getHeaderLength() { 670 return header != null ? header.length : 0; 671 } 672 /** Returns the header length for the given raw message bytes. 673 * @param b the raw message bytes 674 * @return header length in bytes 675 */ 676 protected int getHeaderLength(byte[] b) { return 0; } 677 678 /** Returns the header length for the given ISO message. 679 * @param m the ISO message 680 * @return header length in bytes 681 */ 682 protected int getHeaderLength(ISOMsg m) { 683 return !overrideHeader && m.getHeader() != null ? 684 m.getHeader().length : getHeaderLength(); 685 } 686 687 /** Reads raw bytes from the stream for protocols without explicit length framing. 688 * @return the received bytes 689 * @throws IOException on I/O error 690 */ 691 protected byte[] streamReceive() throws IOException { 692 return new byte[0]; 693 } 694 /** Writes the given bytes to the output stream. 695 * @param b the byte array to send 696 * @param offset starting offset 697 * @param len number of bytes to send 698 * @throws IOException on I/O error 699 */ 700 protected void sendMessage (byte[] b, int offset, int len) 701 throws IOException 702 { 703 serverOut.write(b, offset, len); 704 } 705 /** 706 * Sends the specified {@link ISOMsg} over this ISOChannel. 707 * <p> 708 * This method performs the following steps: 709 * <ul> 710 * <li>Verifies the channel is connected.</li> 711 * <li>Sets the message direction to {@link ISOMsg#OUTGOING}.</li> 712 * <li>Retrieves and sets a dynamic packager for the message.</li> 713 * <li>Applies all registered outgoing filters, allowing them to modify or veto the message.</li> 714 * <li>Packs the message and writes its length, header, body, and trailer to the underlying stream, 715 * protected by a locking mechanism to ensure thread safety.</li> 716 * <li>Flushes the output stream and increments message counters.</li> 717 * <li>Notifies observers of the sent message.</li> 718 * <li>Logs both the message and the send operation through {@link ChannelEvent} and {@link LogEvent}.</li> 719 * </ul> 720 * 721 * If a {@link VetoException} is thrown by a filter, the message is not sent, and the exception is logged. 722 * 723 * @param m the ISO message to be sent. The message will be modified in-place: its direction and packager 724 * will be updated, and filters may alter its content. 725 * 726 * @throws IOException if the channel is not connected, if the output stream fails, if locking times out, 727 * or if an unexpected I/O error occurs. 728 * @throws ISOException if packing the message fails or other ISO-specific issues occur. 729 * @throws VetoException if an outgoing filter vetoes the message. 730 * 731 * @see ISOMsg 732 * @see ISOPackager 733 * @see ISOFilter 734 * @see ChannelEvent 735 * @see LogEvent 736 */ 737 public void send (ISOMsg m) 738 throws IOException, ISOException 739 { 740 ChannelEvent jfr = new ChannelEvent.Send(); 741 jfr.begin(); 742 LogEvent evt = new LogEvent (this, "send").withTraceId(getSocketUUID()); 743 try { 744 if (!isConnected()) 745 throw new IOException ("unconnected ISOChannel"); 746 m.setDirection(ISOMsg.OUTGOING); 747 ISOPackager p = getDynamicPackager(m); 748 m.setPackager (p); 749 m = applyOutgoingFilters (m, evt); 750 evt.addMessage (m); 751 applyTags (evt, m); 752 m.setDirection(ISOMsg.OUTGOING); // filter may have dropped this info 753 m.setPackager (p); // and could have dropped packager as well 754 byte[] b = pack(m); 755 756 if (serverOutLock.tryLock(sendTimeout, TimeUnit.MILLISECONDS)) { 757 try { 758 sendMessageLength(b.length + getHeaderLength(m)); 759 sendMessageHeader(m, b.length); 760 sendMessage (b, 0, b.length); 761 sendMessageTrailer(m, b); 762 serverOut.flush (); 763 cnt[TX]++; 764 } finally { 765 serverOutLock.unlock(); 766 } 767 incrementMsgOutCounter(m); 768 } else { 769 disconnect(); 770 } 771 setChanged(); 772 notifyObservers(m); 773 jfr.setDetail(m.toString()); 774 } catch (VetoException e) { 775 //if a filter vets the message it was not added to the event 776 evt.addMessage (m); 777 evt.addMessage (e); 778 jfr.append (e.getMessage()); 779 throw e; 780 } catch (ISOException | IOException e) { 781 evt.addMessage (e); 782 jfr = new ChannelEvent.SendException(e.getMessage()); 783 throw e; 784 } catch (Exception e) { 785 evt.addMessage (e); 786 jfr = new ChannelEvent.SendException(e.getMessage()); 787 throw new IOException ("unexpected exception", e); 788 } finally { 789 Logger.log (evt); 790 jfr.commit(); 791 } 792 } 793 /** 794 * sends a byte[] over the TCP/IP session 795 * @param b the byte array to be sent 796 * @exception IOException on I/O error on I/O error on I/O error 797 * @exception ISOException on ISO processing error 798 * @exception ISOFilter.VetoException if a filter vetoes the message 799 */ 800 public void send (byte[] b) throws IOException, ISOException { 801 var jfr = new ChannelEvent.Send(); 802 jfr.begin(); 803 LogEvent evt = new LogEvent (this, "send"); 804 try { 805 if (!isConnected()) 806 throw new ISOException ("unconnected ISOChannel"); 807 serverOutLock.lock(); 808 try { 809 serverOut.write(b); 810 serverOut.flush(); 811 } finally { 812 serverOutLock.unlock(); 813 } 814 cnt[TX]++; 815 setChanged(); 816 } catch (Exception e) { 817 evt.addMessage (e); 818 throw new ISOException ("unexpected exception", e); 819 } finally { 820 Logger.log (evt); 821 jfr.commit(); 822 } 823 } 824 /** 825 * Sends a high-level keep-alive message (zero length) 826 * @throws IOException on exception 827 */ 828 public void sendKeepAlive () throws IOException { 829 serverOutLock.lock(); 830 try { 831 sendMessageLength(0); 832 serverOut.flush (); 833 } finally { 834 serverOutLock.unlock(); 835 } 836 } 837 838 /** Returns whether this channel expects keep-alive messages. 839 * @return true if keep-alive is expected 840 */ 841 public boolean isExpectKeepAlive() { 842 return expectKeepAlive; 843 } 844 845 /** Returns whether the given raw message should be rejected. 846 * @param b the raw message bytes 847 * @return true if the message should be rejected 848 */ 849 protected boolean isRejected(byte[] b) { 850 // VAP Header support - see VAPChannel 851 return false; 852 } 853 /** Returns whether the given raw message should be silently ignored. 854 * @param b the raw message bytes 855 * @return true if the message should be ignored 856 */ 857 protected boolean shouldIgnore (byte[] b) { 858 // VAP Header support - see VAPChannel 859 return false; 860 } 861 /** 862 * support old factory method name for backward compatibility 863 * @return newly created ISOMsg 864 */ 865 protected ISOMsg createMsg () { 866 return createISOMsg(); 867 } 868 /** Creates a new ISOMsg instance; may be overridden to return a subclass. 869 * @return a new ISOMsg 870 */ 871 protected ISOMsg createISOMsg () { 872 return packager.createISOMsg (); 873 } 874 875 /** 876 * Reads in a message header. 877 * 878 * @param hLen The Length og the reader to read 879 * @return The header bytes that were read in 880 * @throws IOException on error 881 */ 882 protected byte[] readHeader(int hLen) throws IOException { 883 byte[] header = new byte[hLen]; 884 serverIn.readFully(header, 0, hLen); 885 return header; 886 } 887 /** 888 * Waits and receive an ISOMsg over the TCP/IP session 889 * @return the Message received 890 * @throws IOException on I/O error 891 * @throws ISOException on ISO processing error 892 */ 893 public ISOMsg receive() throws IOException, ISOException { 894 var jfr = new ChannelEvent.Receive(); 895 jfr.begin(); 896 897 byte[] b=null; 898 byte[] header=null; 899 LogEvent evt = new LogEvent (this, "receive").withTraceId(getSocketUUID()); 900 ISOMsg m = createMsg (); // call createMsg instead of createISOMsg for backward compatibility 901 902 m.setSource (this); 903 try { 904 if (!isConnected()) 905 throw new IOException ("unconnected ISOChannel"); 906 907 serverInLock.lock(); 908 try { 909 int len = getMessageLength(); 910 if (expectKeepAlive) { 911 while (len == 0) { 912 //If zero length, this is a keep alive msg 913 len = getMessageLength(); 914 } 915 } 916 int hLen = getHeaderLength(); 917 918 if (len == -1) { 919 if (hLen > 0) { 920 header = readHeader(hLen); 921 } 922 b = streamReceive(); 923 } 924 else if (len > 0 && len <= getMaxPacketLength()) { 925 if (hLen > 0) { 926 // ignore message header (TPDU) 927 // Note header length is not necessarily equal to hLen (see VAPChannel) 928 header = readHeader(hLen); 929 len -= header.length; 930 } 931 b = new byte[len]; 932 getMessage (b, 0, len); 933 getMessageTrailer(m); 934 } 935 else 936 throw new ISOException( 937 "receive length " +len + " seems strange - maxPacketLength = " + getMaxPacketLength()); 938 } finally { 939 serverInLock.unlock(); 940 } 941 m.setPackager (getDynamicPackager(header, b)); 942 m.setHeader (getDynamicHeader(header)); 943 if (b.length > 0 && !shouldIgnore (header)) // Ignore NULL messages 944 unpack (m, b); 945 m.setDirection(ISOMsg.INCOMING); 946 evt.addMessage (m); 947 applyTags (evt, m); 948 m = applyIncomingFilters (m, header, b, evt); 949 m.setDirection(ISOMsg.INCOMING); 950 cnt[RX]++; 951 incrementMsgInCounter(m); 952 setChanged(); 953 notifyObservers(m); 954 } catch (ISOException e) { 955 evt.addMessage (e); 956 if (header != null) { 957 evt.addMessage ("--- header ---"); 958 evt.addMessage (ISOUtil.hexdump (header)); 959 } 960 if (b != null && debugIsoError) { 961 evt.addMessage ("--- data ---"); 962 evt.addMessage (ISOUtil.hexdump (b)); 963 } 964 throw e; 965 } catch (IOException e) { 966 evt.addMessage ( 967 new Disconnect(socket.getInetAddress().getHostAddress(), socket.getPort(), socket.getLocalPort(), 968 "%s (%s)".formatted(Caller.shortClassName(e.getClass().getName()), Caller.info()), e.getMessage()) 969 ); 970 closeSocket(); 971 throw e; 972 } catch (Exception e) { 973 closeSocket(); 974 evt.addMessage (m); 975 evt.addMessage (e); 976 throw new IOException ("unexpected exception", e); 977 } finally { 978 Logger.log (evt); 979 } 980 jfr.setDetail(m.toString()); 981 jfr.commit(); 982 return m; 983 } 984 /** 985 * Low level receive 986 * @param b byte array 987 * @throws IOException on error 988 * @return the total number of bytes read into the buffer, 989 * or -1 if there is no more data because the end of the stream has been reached. 990 */ 991 public int getBytes (byte[] b) throws IOException { 992 return serverIn.read (b); 993 } 994 /** 995 * disconnects the TCP/IP session. The instance is ready for 996 * a reconnection. There is no need to create a new ISOChannel<br> 997 * @exception IOException on I/O error on I/O error on I/O error 998 */ 999 public void disconnect () throws IOException { 1000 var jfr = new ChannelEvent.Disconnect(); 1001 jfr.begin(); 1002 LogEvent evt = new LogEvent (this, "disconnect"); 1003 if (socket != null) { 1004 String detail = socket.getRemoteSocketAddress().toString(); 1005 jfr.setDetail(detail); 1006 evt.withTag("endpoint", toEndpoint(socket)); 1007 evt.addMessage(detail); 1008 } 1009 1010 try { 1011 usable = false; 1012 setChanged(); 1013 notifyObservers(); 1014 closeSocket(); 1015 if (serverIn != null) { 1016 try { 1017 serverIn.close(); 1018 } catch (IOException ex) { evt.addMessage (ex); } 1019 serverIn = null; 1020 } 1021 if (serverOut != null) { 1022 try { 1023 serverOut.close(); 1024 } catch (IOException ex) { evt.addMessage (ex); } 1025 serverOut = null; 1026 } 1027 } catch (IOException e) { 1028 evt.addMessage (e); 1029 jfr.append (e.getMessage()); 1030 Logger.log (evt); 1031 throw e; 1032 } finally { 1033 jfr.commit(); 1034 } 1035 socket = null; 1036 } 1037 /** 1038 * Issues a disconnect followed by a connect 1039 * @exception IOException on I/O error on I/O error on I/O error 1040 */ 1041 public void reconnect() throws IOException { 1042 disconnect(); 1043 connect(); 1044 } 1045 public void setLogger (Logger logger, String realm) { 1046 this.logger = logger; 1047 this.realm = realm; 1048 if (originalRealm == null) 1049 originalRealm = realm; 1050 } 1051 public String getRealm () { 1052 return realm; 1053 } 1054 public Logger getLogger() { 1055 return logger; 1056 } 1057 /** Returns a human-readable endpoint string for the given socket. 1058 * @param socket the connected socket 1059 * @return endpoint string in the form host:port 1060 */ 1061 protected String toEndpoint(Socket socket) { 1062 if (socket == null || socket.getInetAddress() == null) 1063 return null; 1064 return "%s:%d".formatted(socket.getInetAddress().getHostAddress(), socket.getPort()); 1065 } 1066 /** Returns the original realm before any override was applied. 1067 * @return the original realm string 1068 */ 1069 public String getOriginalRealm() { 1070 return originalRealm == null ? 1071 this.getClass().getName() : originalRealm; 1072 } 1073 /** 1074 * associates this ISOChannel with a name using NameRegistrar 1075 * @param name name to register 1076 * @see NameRegistrar 1077 */ 1078 public void setName (String name) { 1079 this.name = name; 1080 NameRegistrar.register("channel." + name, this); 1081 } 1082 /** 1083 * @return this ISOChannel's name ("" if no name was set) 1084 */ 1085 public String getName() { 1086 return this.name; 1087 } 1088 /** 1089 * Adds a filter to this channel. 1090 * @param filter filter to add 1091 * @param direction ISOMsg.INCOMING, ISOMsg.OUTGOING, 0 for both 1092 */ 1093 @SuppressWarnings ("unchecked") 1094 public void addFilter (ISOFilter filter, int direction) { 1095 switch (direction) { 1096 case ISOMsg.INCOMING : 1097 incomingFilters.add (filter); 1098 break; 1099 case ISOMsg.OUTGOING : 1100 outgoingFilters.add (filter); 1101 break; 1102 case 0 : 1103 incomingFilters.add (filter); 1104 outgoingFilters.add (filter); 1105 break; 1106 } 1107 } 1108 /** 1109 * @param filter incoming filter to add 1110 */ 1111 public void addIncomingFilter (ISOFilter filter) { 1112 addFilter(filter, ISOMsg.INCOMING); 1113 } 1114 /** 1115 * @param filter outgoing filter to add 1116 */ 1117 public void addOutgoingFilter (ISOFilter filter) { 1118 addFilter(filter, ISOMsg.OUTGOING); 1119 } 1120 1121 /** 1122 * Adds a filter to this channel. 1123 * @param filter filter to add (both directions, incoming/outgoing) 1124 */ 1125 public void addFilter (ISOFilter filter) { 1126 addFilter (filter, 0); 1127 } 1128 /** 1129 * Removes a filter from this channel. 1130 * @param filter filter to remove 1131 * @param direction ISOMsg.INCOMING, ISOMsg.OUTGOING, 0 for both 1132 */ 1133 public void removeFilter (ISOFilter filter, int direction) { 1134 switch (direction) { 1135 case ISOMsg.INCOMING : 1136 incomingFilters.remove (filter); 1137 break; 1138 case ISOMsg.OUTGOING : 1139 outgoingFilters.remove (filter); 1140 break; 1141 case 0 : 1142 incomingFilters.remove (filter); 1143 outgoingFilters.remove (filter); 1144 break; 1145 } 1146 } 1147 /** 1148 * Removes a filter from this channel. 1149 * @param filter filter to remove (both directions) 1150 */ 1151 public void removeFilter (ISOFilter filter) { 1152 removeFilter (filter, 0); 1153 } 1154 /** 1155 * @param filter incoming filter to remove 1156 */ 1157 public void removeIncomingFilter (ISOFilter filter) { 1158 removeFilter(filter, ISOMsg.INCOMING); 1159 } 1160 /** 1161 * @param filter outgoing filter to remove 1162 */ 1163 public void removeOutgoingFilter (ISOFilter filter) { 1164 removeFilter (filter, ISOMsg.OUTGOING); 1165 } 1166 /** Applies all registered outgoing filters to the given message. 1167 * @param m the message to filter 1168 * @param evt the log event for diagnostics 1169 * @return the filtered message 1170 * @throws VetoException if a filter vetoes the message 1171 */ 1172 protected ISOMsg applyOutgoingFilters (ISOMsg m, LogEvent evt) 1173 throws VetoException 1174 { 1175 for (ISOFilter f :outgoingFilters) 1176 m = f.filter (this, m, evt); 1177 return m; 1178 } 1179 /** Applies all registered incoming filters to the given message. 1180 * @param m the message to filter 1181 * @param evt the log event for diagnostics 1182 * @return the filtered message 1183 * @throws VetoException if a filter vetoes the message 1184 */ 1185 protected ISOMsg applyIncomingFilters (ISOMsg m, LogEvent evt) 1186 throws VetoException 1187 { 1188 return applyIncomingFilters (m, null, null, evt); 1189 } 1190 /** 1191 * Applies all registered incoming filters with the raw message header and image. 1192 * @param m the decoded message 1193 * @param header the message header bytes 1194 * @param image the raw message bytes 1195 * @param evt the log event for diagnostics 1196 * @return the filtered message 1197 * @throws VetoException if a filter vetoes the message 1198 */ 1199 protected ISOMsg applyIncomingFilters (ISOMsg m, byte[] header, byte[] image, LogEvent evt) 1200 throws VetoException 1201 { 1202 for (ISOFilter f :incomingFilters) { 1203 if (image != null && f instanceof RawIncomingFilter) 1204 m = ((RawIncomingFilter)f).filter (this, m, header, image, evt); 1205 else 1206 m = f.filter (this, m, evt); 1207 } 1208 return m; 1209 } 1210 /** Unpacks raw bytes into the given ISOMsg using this channel's packager. 1211 * @param m the message to populate 1212 * @param b the raw bytes to unpack 1213 * @throws ISOException on unpack error 1214 */ 1215 protected void unpack (ISOMsg m, byte[] b) throws ISOException { 1216 m.unpack (b); 1217 } 1218 /** Packs the given ISOMsg into raw bytes using this channel's packager. 1219 * @param m the message to pack 1220 * @return the packed byte array 1221 * @throws ISOException on pack error 1222 */ 1223 protected byte[] pack (ISOMsg m) throws ISOException { 1224 return m.pack(); 1225 } 1226 /** 1227 * Implements Configurable<br> 1228 * Properties:<br> 1229 * <ul> 1230 * <li>host - destination host (if ClientChannel) 1231 * <li>port - port number (if ClientChannel) 1232 * <li>local-iface - local interfase to use (if ClientChannel) 1233 * <li>local-port - local port to bind (if ClientChannel) 1234 * </ul> 1235 * (host not present indicates a ServerChannel) 1236 * 1237 * @param cfg Configuration 1238 * @throws ConfigurationException if configuration is invalid 1239 */ 1240 public void setConfiguration (Configuration cfg) 1241 throws ConfigurationException 1242 { 1243 this.cfg = cfg; 1244 String h = cfg.get ("host"); 1245 int port = cfg.getInt ("port"); 1246 maxPacketLength = cfg.getInt ("max-packet-length", 100000); 1247 sendTimeout = cfg.getLong ("send-timeout", sendTimeout); 1248 if (h != null && h.length() > 0) { 1249 if (port == 0) 1250 throw new ConfigurationException 1251 ("invalid port for host '"+h+"'"); 1252 setHost (h, port); 1253 setLocalAddress (cfg.get("local-iface", null),cfg.getInt("local-port")); 1254 String[] altHosts = cfg.getAll ("alternate-host"); 1255 int[] altPorts = cfg.getInts ("alternate-port"); 1256 hosts = new String[altHosts.length + 1]; 1257 ports = new int[altPorts.length + 1]; 1258 if (hosts.length != ports.length) { 1259 throw new ConfigurationException ( 1260 "alternate host/port misconfiguration" 1261 ); 1262 } 1263 hosts[0] = host; 1264 ports[0] = port; 1265 System.arraycopy (altHosts, 0, hosts, 1, altHosts.length); 1266 System.arraycopy (altPorts, 0, ports, 1, altPorts.length); 1267 } 1268 setOverrideHeader(cfg.getBoolean ("override-header", false)); 1269 keepAlive = cfg.getBoolean ("keep-alive", false); 1270 expectKeepAlive = cfg.getBoolean ("expect-keep-alive", false); 1271 roundRobin = cfg.getBoolean ("round-robin", false); 1272 debugIsoError = cfg.getBoolean ("debug-iso-error", true); 1273 if (socketFactory != this && socketFactory instanceof Configurable) 1274 ((Configurable)socketFactory).setConfiguration (cfg); 1275 try { 1276 setTimeout (cfg.getInt ("timeout", DEFAULT_TIMEOUT)); 1277 connectTimeout = cfg.getInt ("connect-timeout", timeout); 1278 } catch (SocketException e) { 1279 throw new ConfigurationException (e); 1280 } 1281 } 1282 /** Returns the current configuration for this channel. 1283 * @return the current {@link Configuration} 1284 */ 1285 public Configuration getConfiguration() { 1286 return cfg; 1287 } 1288 public Collection<ISOFilter> getIncomingFilters() { 1289 return incomingFilters; 1290 } 1291 public Collection<ISOFilter> getOutgoingFilters() { 1292 return outgoingFilters; 1293 } 1294 public void setIncomingFilters (Collection filters) { 1295 incomingFilters = new ArrayList (filters); 1296 } 1297 public void setOutgoingFilters (Collection filters) { 1298 outgoingFilters = new ArrayList (filters); 1299 } 1300 /** Sets the message header bytes for this channel. 1301 * @param header the header bytes 1302 */ 1303 public void setHeader (byte[] header) { 1304 this.header = header; 1305 } 1306 /** Sets the message header from a hex string. 1307 * @param header hex-encoded header string 1308 */ 1309 public void setHeader (String header) { 1310 setHeader (header.getBytes()); 1311 } 1312 /** Returns the message header bytes. 1313 * @return the header bytes, or null if not set 1314 */ 1315 public byte[] getHeader () { 1316 return header; 1317 } 1318 /** Controls whether this channel's own header overrides the message header. 1319 * @param overrideHeader if true, the channel header takes precedence 1320 */ 1321 public void setOverrideHeader (boolean overrideHeader) { 1322 this.overrideHeader = overrideHeader; 1323 } 1324 /** Returns whether the channel header overrides the message header. 1325 * @return true if override is active 1326 */ 1327 public boolean isOverrideHeader () { 1328 return overrideHeader; 1329 } 1330 /** 1331 * Retrieves a channel instance by name from the NameRegistrar. 1332 * @param name the Channel's name (without the "channel." prefix) 1333 * @return ISOChannel instance with given name. 1334 * @throws NameRegistrar.NotFoundException if no channel with that name is registered 1335 * @see NameRegistrar 1336 */ 1337 public static ISOChannel getChannel (String name) 1338 throws NameRegistrar.NotFoundException 1339 { 1340 return (ISOChannel) NameRegistrar.get ("channel."+name); 1341 } 1342 /** 1343 * Gets the ISOClientSocketFactory (may be null) 1344 * @see ISOClientSocketFactory 1345 * @since 1.3.3 \ 1346 * @return ISOClientSocketFactory 1347 */ 1348 public ISOClientSocketFactory getSocketFactory() { 1349 return socketFactory; 1350 } 1351 /** 1352 * Sets the specified Socket Factory to create sockets 1353 * @param socketFactory the ISOClientSocketFactory 1354 * @see ISOClientSocketFactory 1355 * @since 1.3.3 1356 */ 1357 public void setSocketFactory(ISOClientSocketFactory socketFactory) { 1358 this.socketFactory = socketFactory; 1359 } 1360 /** Returns the maximum packet length accepted by this channel. 1361 * @return maximum packet length in bytes 1362 */ 1363 public int getMaxPacketLength() { 1364 return maxPacketLength; 1365 } 1366 /** Sets the maximum packet length accepted by this channel. 1367 * @param maxPacketLength maximum packet length in bytes 1368 */ 1369 public void setMaxPacketLength(int maxPacketLength) { 1370 this.maxPacketLength = maxPacketLength; 1371 } 1372 /** Closes the underlying socket. 1373 * @throws IOException on I/O error 1374 */ 1375 protected void closeSocket() throws IOException { 1376 Socket s = null; 1377 synchronized (this) { 1378 if (socket != null) { 1379 s = socket; // we don't want more than one thread 1380 socket = null; // attempting to close the socket 1381 } 1382 } 1383 if (s != null) { 1384 try { 1385 s.setSoLinger (soLingerOn, soLingerSeconds); 1386 if (shutdownSupportedBySocket(s) && !isSoLingerForcingImmediateTcpReset()) 1387 s.shutdownOutput(); // This will force a TCP FIN to be sent on regular sockets, 1388 } catch (SocketException e) { 1389 // NOPMD 1390 // safe to ignore - can be closed already 1391 // e.printStackTrace(); 1392 } 1393 s.close (); 1394 } 1395 } 1396 private boolean shutdownSupportedBySocket(Socket s) { 1397 return !(s instanceof SSLSocket); // we can't close output on SSL connections, see [jPOS-85] 1398 } 1399 private boolean isSoLingerForcingImmediateTcpReset() { 1400 return soLingerOn && soLingerSeconds == 0; 1401 } 1402 public Object clone(){ 1403 try { 1404 BaseChannel channel = (BaseChannel) super.clone(); 1405 channel.cnt = cnt.clone(); 1406 // The lock objects must also be cloned, and the DataStreams nullified, as it makes no sense 1407 // to use the new lock objects to protect the old DataStreams. 1408 // This should be safe as the only code that calls BaseChannel.clone() is ISOServer.run(), 1409 // and it immediately calls accept(ServerSocket) which does a connect(), and that sets the stream objects. 1410 channel.serverInLock = new ReentrantLock(); 1411 channel.serverOutLock = new ReentrantLock(); 1412 channel.serverIn = null; 1413 channel.serverOut = null; 1414 channel.usable = false; 1415 channel.socket = null; 1416 return channel; 1417 } catch (CloneNotSupportedException e) { 1418 throw new InternalError(); 1419 } 1420 } 1421 1422 private UUID getSocketUUID() { 1423 return socket != null ? 1424 new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ socket.hashCode()) : 1425 uuid; 1426 } 1427 1428 /** Increments the incoming message counter for the given message. 1429 * @param m the received message 1430 * @throws ISOException on error 1431 */ 1432 protected void incrementMsgInCounter(ISOMsg m) throws ISOException { 1433 if (isoMsgMetrics != null) { 1434 isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_IN); 1435 } 1436 } 1437 /** Increments the outgoing message counter for the given message. 1438 * @param m the sent message 1439 * @throws ISOException on error 1440 */ 1441 protected void incrementMsgOutCounter(ISOMsg m) throws ISOException { 1442 if (isoMsgMetrics != null) { 1443 isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_OUT); 1444 } 1445 } 1446 private void applyTags (LogEvent evt, ISOMsg m) { 1447 if (m.hasField(3)) { 1448 String f3 = m.getString(3); 1449 if (f3 != null && f3.length() >= 2) 1450 evt.withTag("pcode", f3.substring(0, 2)); 1451 } 1452 if (m.hasField(41)) 1453 evt.withTag("tid", m.getString(41)); 1454 if (m.hasField(42)) 1455 evt.withTag("mid", m.getString(42)); 1456 try { 1457 String mti = m.getMTI(); 1458 if (mti != null) 1459 evt.withTag("mti", mti); 1460 } catch (ISOException ignored) { } 1461 } 1462}