001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.iso; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.core.handlers.exception.ExceptionHandler; 025import org.jpos.core.handlers.exception.ExceptionHandlerAware; 026import org.jpos.iso.ISOFilter.VetoException; 027import org.jpos.iso.header.BaseHeader; 028import org.jpos.jfr.ChannelEvent; 029import org.jpos.log.evt.Connect; 030import org.jpos.log.evt.Disconnect; 031import org.jpos.metrics.MeterInfo; 032import org.jpos.metrics.iso.ISOMsgMetrics; 033import org.jpos.util.*; 034 035import javax.net.ssl.SSLSocket; 036import java.io.*; 037import java.net.*; 038import java.util.*; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReentrantLock; 042 043/* 044 * BaseChannel was ISOChannel. Now ISOChannel is an interface 045 * Revision: 1.34 Date: 2000/04/08 23:54:55 046 */ 047 048/** 049 * ISOChannel is an abstract class that provides functionality that 050 * allows the transmission and reception of ISO 8583 Messages 051 * over a TCP/IP session. 052 * <p> 053 * This class is not thread-safe. 054 * <p> 055 * ISOChannel is Observable in order to support GUI components 056 * such as ISOChannelPanel. 057 * <br> 058 * It now support the new Logger architecture so we will 059 * probably setup ISOChannelPanel to be a LogListener instead 060 * of being an Observer in future releases. 061 * 062 * @author Alejandro P. Revilla 063 * @author Bharavi Gade 064 * @version $Revision$ $Date$ 065 * @see ISOMsg 066 * @see MUX 067 * @see ISOException 068 * @see org.jpos.iso.channel.CSChannel 069 * @see Logger 070 * 071 */ 072@SuppressWarnings("unchecked") 073public abstract class BaseChannel extends Observable 074 implements FilteredChannel, ClientChannel, ServerChannel, FactoryChannel, ISOMsgMetrics.Source, 075 LogSource, Configurable, BaseChannelMBean, Cloneable, ExceptionHandlerAware 076{ 077 private Socket socket; 078 private String host, localIface; 079 private String[] hosts; 080 private int[] ports; 081 private int port, timeout, connectTimeout, localPort; 082 private int maxPacketLength = 100000; 083 private boolean keepAlive; 084 private boolean expectKeepAlive; 085 private boolean soLingerOn = true; 086 private int soLingerSeconds = 5; 087 private Configuration cfg; 088 protected boolean usable; 089 protected boolean overrideHeader; 090 private String name; 091 private long sendTimeout = 15000L; 092 // private int serverPort = -1; 093 protected DataInputStream serverIn; 094 protected DataOutputStream serverOut; 095 // The lock objects should be final, and never changed, but due to the clone() method, they must be set there. 096 protected Lock serverInLock = new ReentrantLock(); 097 protected Lock serverOutLock = new ReentrantLock(); 098 protected ISOPackager packager; 099 protected ServerSocket serverSocket = null; 100 protected List<ISOFilter> incomingFilters, outgoingFilters; 101 protected ISOClientSocketFactory socketFactory = null; 102 103 protected int[] cnt; 104 105 protected Logger logger = null; 106 protected String realm = null; 107 protected String originalRealm = null; 108 protected byte[] header = null; 109 private static final int DEFAULT_TIMEOUT = 300000; 110 private int nextHostPort = 0; 111 private boolean roundRobin = false; 112 private boolean debugIsoError = true; 113 114 private ISOMsgMetrics isoMsgMetrics; 115 116 private final UUID uuid; 117 118 private final Map<Class<? extends Exception>, List<ExceptionHandler>> exceptionHandlers = new HashMap<>(); 119 120 /** 121 * constructor shared by server and client 122 * ISOChannels (which have different signatures) 123 */ 124 public BaseChannel () { 125 super(); 126 uuid = UUID.randomUUID(); 127 cnt = new int[SIZEOF_CNT]; 128 name = ""; 129 incomingFilters = new ArrayList<>(); 130 outgoingFilters = new ArrayList<>(); 131 setHost(null, 0); 132 } 133 134 /** 135 * constructs a client ISOChannel 136 * @param host server TCP Address 137 * @param port server port number 138 * @param p an ISOPackager 139 * @see ISOPackager 140 */ 141 public BaseChannel (String host, int port, ISOPackager p) { 142 this(); 143 setHost(host, port); 144 setPackager(p); 145 } 146 /** 147 * constructs a server ISOChannel 148 * @param p an ISOPackager 149 * @exception IOException on error 150 * @see ISOPackager 151 */ 152 public BaseChannel (ISOPackager p) throws IOException { 153 this(); 154 setPackager (p); 155 } 156 157 /** 158 * constructs a server ISOChannel associated with a Server Socket 159 * @param p an ISOPackager 160 * @param serverSocket where to accept a connection 161 * @exception IOException on error 162 * @see ISOPackager 163 */ 164 public BaseChannel (ISOPackager p, ServerSocket serverSocket) throws IOException { 165 this(); 166 setPackager (p); 167 setServerSocket (serverSocket); 168 } 169 170 /** 171 * initialize an ISOChannel 172 * @param host server TCP Address 173 * @param port server port number 174 */ 175 public void setHost(String host, int port) { 176 this.host = host; 177 this.port = port; 178 this.hosts = new String[] { host }; 179 this.ports = new int[] { port }; 180 } 181 182 /** 183 * initialize an ISOChannel 184 * @param iface server TCP Address 185 * @param port server port number 186 */ 187 public void setLocalAddress (String iface, int port) { 188 this.localIface = iface; 189 this.localPort = port; 190 } 191 192 193 /** 194 * @param host to connect (client ISOChannel) 195 */ 196 public void setHost (String host) { 197 this.host = host; 198 this.hosts = new String[] { host }; 199 } 200 /** 201 * @param port to connect (client ISOChannel) 202 */ 203 public void setPort (int port) { 204 this.port = port; 205 this.ports = new int[] { port }; 206 } 207 /** 208 * @return hostname (may be null) 209 */ 210 public String getHost() { 211 return host; 212 } 213 /** 214 * @return port number 215 */ 216 public int getPort() { 217 return port; 218 } 219 /** 220 * set Packager for channel 221 * @param p an ISOPackager 222 * @see ISOPackager 223 */ 224 public void setPackager(ISOPackager p) { 225 this.packager = p; 226 } 227 228 /** 229 * @return current packager 230 */ 231 public ISOPackager getPackager() { 232 return packager; 233 } 234 235 /** 236 * Associates this ISOChannel with a server socket 237 * @param sock where to accept a connection 238 */ 239 public void setServerSocket (ServerSocket sock) { 240 setHost (null, 0); 241 this.serverSocket = sock; 242 name = ""; 243 } 244 245 @Override 246 public Map<Class<? extends Exception>, List<ExceptionHandler>> getExceptionHandlers() { 247 return exceptionHandlers; 248 } 249 250 /** 251 * reset stat info 252 */ 253 public void resetCounters() { 254 for (int i=0; i<SIZEOF_CNT; i++) 255 cnt[i] = 0; 256 } 257 /** 258 * @return counters 259 */ 260 public int[] getCounters() { 261 return cnt; 262 } 263 /** 264 * @return the connection state 265 */ 266 public boolean isConnected() { 267 return socket != null && usable; 268 } 269 270 @Override 271 public void setISOMsgMetrics(ISOMsgMetrics metrics) { 272 isoMsgMetrics = metrics; 273 } 274 @Override 275 public ISOMsgMetrics getISOMsgMetrics() { 276 return isoMsgMetrics; 277 } 278 279 /** 280 * setup I/O Streams from socket 281 * @param socket a Socket (client or server) 282 * @exception IOException on error 283 */ 284 protected void connect (Socket socket) throws IOException { 285 this.socket = socket; 286 applyTimeout(); 287 InetAddress inetAddress = socket.getInetAddress(); 288 String remoteAddress = (inetAddress != null) ? "/" + inetAddress.getHostAddress() + ":" + socket.getPort() : ""; 289 setLogger(getLogger(), getOriginalRealm() + 290 remoteAddress 291 ); 292 serverInLock.lock(); 293 try { 294 serverIn = new DataInputStream ( 295 new BufferedInputStream (socket.getInputStream ()) 296 ); 297 } finally { 298 serverInLock.unlock(); 299 } 300 serverOutLock.lock(); 301 try { 302 serverOut = new DataOutputStream( 303 new BufferedOutputStream(socket.getOutputStream(), 2048) 304 ); 305 } finally { 306 serverOutLock.unlock(); 307 } 308 postConnectHook(); 309 usable = true; 310 cnt[CONNECT]++; 311 setChanged(); 312 notifyObservers(); 313 } 314 protected void postConnectHook() throws IOException { 315 // do nothing 316 } 317 /** 318 * factory method pattern (as suggested by Vincent.Greene@amo.com) 319 * @param host remote host 320 * @param port remote port 321 * @throws IOException on error 322 * 323 * Use Socket factory if exists. If it is missing create a normal socket 324 * @see ISOClientSocketFactory 325 * @return newly created socket 326 */ 327 protected Socket newSocket(String host, int port) throws IOException { 328 try { 329 if (socketFactory != null) 330 return socketFactory.createSocket (host, port); 331 else { 332 Socket s = new Socket(); 333 if (localIface != null || localPort != 0) { 334 InetAddress addr = localIface == null ? 335 InetAddress.getLocalHost() : 336 InetAddress.getByName(localIface); 337 s.bind(new InetSocketAddress(addr, localPort)); 338 } 339 s.connect ( 340 new InetSocketAddress (host, port), 341 connectTimeout 342 ); 343 return s; 344 } 345 } catch (ISOException e) { 346 throw new IOException (e.getMessage()); 347 } 348 } 349 protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt) 350 throws IOException 351 { 352 Socket s = null; 353 if (!roundRobin) 354 nextHostPort = 0; 355 356 IOException lastIOException = null; 357 String h=null; 358 int p=0; 359 for (int i=0; i<hosts.length; i++) { 360 try { 361 int ii = nextHostPort++ % hosts.length; 362 h = hosts[ii]; 363 p = ports[ii]; 364 s = newSocket (h, p); 365 evt.addMessage(new Connect( 366 s.getInetAddress().getHostAddress(), 367 s.getPort(), 368 s.getLocalPort(), null) 369 ); 370 break; 371 } catch (IOException e) { 372 lastIOException = e; 373 evt.addMessage( 374 new Connect(h, p, 0, "%s:%d %s (%s)".formatted( 375 h, p, Caller.shortClassName(lastIOException.getClass().getName()), Caller.info(1) 376 ) 377 ) 378 ); 379 } 380 } 381 if (s == null) 382 throw new IOException ("%s:%d %s (%s)".formatted(h, p, Caller.shortClassName(lastIOException.getClass().getName()), Caller.info(1))); 383 return s; 384 } 385 /** 386 * @return current socket 387 */ 388 public Socket getSocket() { 389 return socket; 390 } 391 /** 392 * @return current serverSocket 393 */ 394 public ServerSocket getServerSocket() { 395 return serverSocket; 396 } 397 398 /** 399 * sets socket timeout (as suggested by 400 * Leonard Thomas <leonard@rhinosystemsinc.com>) 401 * @param timeout in milliseconds 402 * @throws SocketException on error 403 */ 404 public void setTimeout (int timeout) throws SocketException { 405 this.timeout = timeout; 406 applyTimeout(); 407 } 408 public int getTimeout () { 409 return timeout; 410 } 411 412 /** 413 * sets timeout, and also keep alive 414 * @throws SocketException 415 */ 416 protected void applyTimeout () throws SocketException { 417 if (socket != null && socket.isConnected()) { 418 if (keepAlive) 419 socket.setKeepAlive(keepAlive); 420 if (timeout >= 0) 421 socket.setSoTimeout(timeout); 422 } 423 } 424 /** 425 * Socket SO_LINGER option to use when closing the socket. 426 * @see java.net.Socket#setSoLinger(boolean, int) 427 */ 428 public void setSoLinger(boolean on, int linger) { 429 this.soLingerOn = on; 430 this.soLingerSeconds = linger; 431 } 432 public boolean isSoLingerOn() { 433 return soLingerOn; 434 } 435 public int getSoLingerSeconds() { 436 return soLingerSeconds; 437 } 438 /** 439 * Connects client ISOChannel to server 440 * @exception IOException 441 */ 442 public void connect () throws IOException { 443 ChannelEvent jfr = new ChannelEvent.Connect(); 444 jfr.begin(); 445 LogEvent evt = new LogEvent (this, "connect").withTraceId(uuid); 446 try { 447 socket = newSocket (hosts, ports, evt); 448 if (getHost() != null) 449 jfr.setDetail("%s:%d".formatted(getHost(), getPort())); 450 connect(socket); 451 jfr.append("%d".formatted(socket.getLocalPort())); 452 evt.withTraceId(getSocketUUID()); 453 applyTimeout(); 454 } catch (IOException e) { 455 jfr = new ChannelEvent.ConnectionException(jfr.getDetail()); 456 jfr.begin(); 457 jfr.append (e.getMessage()); 458 throw e; 459 } finally { 460 Logger.log (evt); 461 jfr.commit(); 462 } 463 } 464 465 /** 466 * Accepts connection 467 * @exception IOException 468 */ 469 public void accept(ServerSocket s) throws IOException { 470 ChannelEvent jfr = new ChannelEvent.Accept(); 471 jfr.begin(); 472 try { 473 Socket ss = s.accept(); 474 this.name = "%d %s:%d".formatted( 475 ss.getLocalPort(), 476 ss.getInetAddress().getHostAddress(), 477 ss.getPort() 478 ); 479 jfr.setDetail(name); 480 connect(ss); 481 } catch (IOException e) { 482 jfr = new ChannelEvent.AcceptException(e.getMessage()); 483 jfr.begin(); 484 throw e; 485 } finally { 486 jfr.commit(); 487 } 488 489 // Warning - closing here breaks ISOServer, we need an 490 // accept that keep ServerSocket open. 491 // s.close(); 492 } 493 494 /** 495 * @param b - new Usable state (used by ISOMUX internals to 496 * flag as unusable in order to force a reconnection) 497 */ 498 public void setUsable(boolean b) { 499 Logger.log (new LogEvent (this, "usable", b)); 500 usable = b; 501 } 502 /** 503 * allow subclasses to override default packager 504 * on outgoing messages 505 * @param m outgoing ISOMsg 506 * @return ISOPackager 507 */ 508 protected ISOPackager getDynamicPackager (ISOMsg m) { 509 return packager; 510 } 511 512 /** 513 * allow subclasses to override default packager 514 * on incoming messages 515 * @param image incoming message image 516 * @return ISOPackager 517 */ 518 protected ISOPackager getDynamicPackager (byte[] image) { 519 return packager; 520 } 521 /** 522 * allow subclasses to override default packager 523 * on incoming messages 524 * @param header message header 525 * @param image incoming message image 526 * @return ISOPackager 527 */ 528 protected ISOPackager getDynamicPackager (byte[] header, byte[] image) { 529 return getDynamicPackager(image); 530 } 531 532 533 /** 534 * Allow subclasses to override the Default header on 535 * incoming messages. 536 * @param image message image 537 * @return ISOHeader instance 538 */ 539 protected ISOHeader getDynamicHeader (byte[] image) { 540 return image != null ? 541 new BaseHeader (image) : null; 542 } 543 protected void sendMessageLength(int len) throws IOException { } 544 protected void sendMessageHeader(ISOMsg m, int len) throws IOException { 545 if (!isOverrideHeader() && m.getHeader() != null) 546 serverOut.write(m.getHeader()); 547 else if (header != null) 548 serverOut.write(header); 549 } 550 /** 551 * @deprecated use sendMessageTrailer(ISOMsg m, byte[] b) instead. 552 * @param m a reference to the ISOMsg 553 * @param len the packed image length 554 * @throws IOException on error 555 */ 556 @Deprecated 557 protected void sendMessageTrailler(ISOMsg m, int len) throws IOException 558 { 559 } 560 561 /** 562 * @deprecated use sendMessageTrailer(ISOMsg m, byte[] b instead. 563 */ 564 @SuppressWarnings ("deprecation") 565 @Deprecated 566 protected void sendMessageTrailler(ISOMsg m, byte[] b) throws IOException { 567 sendMessageTrailler (m, b.length); 568 } 569 570 /** 571 * Send a Message trailer. 572 * 573 * @param m The unpacked ISOMsg. 574 * @param b The packed ISOMsg image. 575 */ 576 @SuppressWarnings("deprecation") 577 protected void sendMessageTrailer(ISOMsg m, byte[] b) throws IOException { 578 sendMessageTrailler(m, b); 579 } 580 581 582 /** 583 * @deprecated use getMessageTrailer(ISOMsg m) instead. 584 */ 585 @Deprecated 586 protected void getMessageTrailler() throws IOException { 587 } 588 589 /** 590 * Read some trailer data from this channel and optionally store it in the incoming ISOMsg. 591 * 592 * @param m The ISOMessage to store the trailer data. 593 * @see ISOMsg#setTrailer(byte[]). 594 */ 595 @SuppressWarnings("deprecation") 596 protected void getMessageTrailer(ISOMsg m) throws IOException { 597 getMessageTrailler(); 598 } 599 600 protected void getMessage (byte[] b, int offset, int len) throws IOException, ISOException { 601 serverIn.readFully(b, offset, len); 602 } 603 protected int getMessageLength() throws IOException, ISOException { 604 return -1; 605 } 606 protected int getHeaderLength() { 607 return header != null ? header.length : 0; 608 } 609 protected int getHeaderLength(byte[] b) { return 0; } 610 611 protected int getHeaderLength(ISOMsg m) { 612 return !overrideHeader && m.getHeader() != null ? 613 m.getHeader().length : getHeaderLength(); 614 } 615 616 protected byte[] streamReceive() throws IOException { 617 return new byte[0]; 618 } 619 protected void sendMessage (byte[] b, int offset, int len) 620 throws IOException 621 { 622 serverOut.write(b, offset, len); 623 } 624 /** 625 * Sends the specified {@link ISOMsg} over this ISOChannel. 626 * <p> 627 * This method performs the following steps: 628 * <ul> 629 * <li>Verifies the channel is connected.</li> 630 * <li>Sets the message direction to {@link ISOMsg#OUTGOING}.</li> 631 * <li>Retrieves and sets a dynamic packager for the message.</li> 632 * <li>Applies all registered outgoing filters, allowing them to modify or veto the message.</li> 633 * <li>Packs the message and writes its length, header, body, and trailer to the underlying stream, 634 * protected by a locking mechanism to ensure thread safety.</li> 635 * <li>Flushes the output stream and increments message counters.</li> 636 * <li>Notifies observers of the sent message.</li> 637 * <li>Logs both the message and the send operation through {@link ChannelEvent} and {@link LogEvent}.</li> 638 * </ul> 639 * 640 * If a {@link VetoException} is thrown by a filter, the message is not sent, and the exception is logged. 641 * 642 * @param m the ISO message to be sent. The message will be modified in-place: its direction and packager 643 * will be updated, and filters may alter its content. 644 * 645 * @throws IOException if the channel is not connected, if the output stream fails, if locking times out, 646 * or if an unexpected I/O error occurs. 647 * @throws ISOException if packing the message fails or other ISO-specific issues occur. 648 * @throws VetoException if an outgoing filter vetoes the message. 649 * 650 * @see ISOMsg 651 * @see ISOPackager 652 * @see ISOFilter 653 * @see ChannelEvent 654 * @see LogEvent 655 */ 656 public void send (ISOMsg m) 657 throws IOException, ISOException 658 { 659 ChannelEvent jfr = new ChannelEvent.Send(); 660 jfr.begin(); 661 LogEvent evt = new LogEvent (this, "send").withTraceId(getSocketUUID()); 662 try { 663 if (!isConnected()) 664 throw new IOException ("unconnected ISOChannel"); 665 m.setDirection(ISOMsg.OUTGOING); 666 ISOPackager p = getDynamicPackager(m); 667 m.setPackager (p); 668 m = applyOutgoingFilters (m, evt); 669 evt.addMessage (m); 670 m.setDirection(ISOMsg.OUTGOING); // filter may have dropped this info 671 m.setPackager (p); // and could have dropped packager as well 672 byte[] b = pack(m); 673 674 if (serverOutLock.tryLock(sendTimeout, TimeUnit.MILLISECONDS)) { 675 try { 676 sendMessageLength(b.length + getHeaderLength(m)); 677 sendMessageHeader(m, b.length); 678 sendMessage (b, 0, b.length); 679 sendMessageTrailer(m, b); 680 serverOut.flush (); 681 cnt[TX]++; 682 } finally { 683 serverOutLock.unlock(); 684 } 685 incrementMsgOutCounter(m); 686 } else { 687 disconnect(); 688 } 689 setChanged(); 690 notifyObservers(m); 691 jfr.setDetail(m.toString()); 692 } catch (VetoException e) { 693 //if a filter vets the message it was not added to the event 694 evt.addMessage (m); 695 evt.addMessage (e); 696 jfr.append (e.getMessage()); 697 throw e; 698 } catch (ISOException | IOException e) { 699 evt.addMessage (e); 700 jfr = new ChannelEvent.SendException(e.getMessage()); 701 throw e; 702 } catch (Exception e) { 703 evt.addMessage (e); 704 jfr = new ChannelEvent.SendException(e.getMessage()); 705 throw new IOException ("unexpected exception", e); 706 } finally { 707 Logger.log (evt); 708 jfr.commit(); 709 } 710 } 711 /** 712 * sends a byte[] over the TCP/IP session 713 * @param b the byte array to be sent 714 * @exception IOException 715 * @exception ISOException 716 * @exception ISOFilter.VetoException; 717 */ 718 public void send (byte[] b) throws IOException, ISOException { 719 var jfr = new ChannelEvent.Send(); 720 jfr.begin(); 721 LogEvent evt = new LogEvent (this, "send"); 722 try { 723 if (!isConnected()) 724 throw new ISOException ("unconnected ISOChannel"); 725 serverOutLock.lock(); 726 try { 727 serverOut.write(b); 728 serverOut.flush(); 729 } finally { 730 serverOutLock.unlock(); 731 } 732 cnt[TX]++; 733 setChanged(); 734 } catch (Exception e) { 735 evt.addMessage (e); 736 throw new ISOException ("unexpected exception", e); 737 } finally { 738 Logger.log (evt); 739 jfr.commit(); 740 } 741 } 742 /** 743 * Sends a high-level keep-alive message (zero length) 744 * @throws IOException on exception 745 */ 746 public void sendKeepAlive () throws IOException { 747 serverOutLock.lock(); 748 try { 749 sendMessageLength(0); 750 serverOut.flush (); 751 } finally { 752 serverOutLock.unlock(); 753 } 754 } 755 756 public boolean isExpectKeepAlive() { 757 return expectKeepAlive; 758 } 759 760 protected boolean isRejected(byte[] b) { 761 // VAP Header support - see VAPChannel 762 return false; 763 } 764 protected boolean shouldIgnore (byte[] b) { 765 // VAP Header support - see VAPChannel 766 return false; 767 } 768 /** 769 * support old factory method name for backward compatibility 770 * @return newly created ISOMsg 771 */ 772 protected ISOMsg createMsg () { 773 return createISOMsg(); 774 } 775 protected ISOMsg createISOMsg () { 776 return packager.createISOMsg (); 777 } 778 779 /** 780 * Reads in a message header. 781 * 782 * @param hLen The Length og the reader to read 783 * @return The header bytes that were read in 784 * @throws IOException on error 785 */ 786 protected byte[] readHeader(int hLen) throws IOException { 787 byte[] header = new byte[hLen]; 788 serverIn.readFully(header, 0, hLen); 789 return header; 790 } 791 /** 792 * Waits and receive an ISOMsg over the TCP/IP session 793 * @return the Message received 794 * @throws IOException 795 * @throws ISOException 796 */ 797 public ISOMsg receive() throws IOException, ISOException { 798 var jfr = new ChannelEvent.Receive(); 799 jfr.begin(); 800 801 byte[] b=null; 802 byte[] header=null; 803 LogEvent evt = new LogEvent (this, "receive").withTraceId(getSocketUUID()); 804 ISOMsg m = createMsg (); // call createMsg instead of createISOMsg for backward compatibility 805 806 m.setSource (this); 807 try { 808 if (!isConnected()) 809 throw new IOException ("unconnected ISOChannel"); 810 811 serverInLock.lock(); 812 try { 813 int len = getMessageLength(); 814 if (expectKeepAlive) { 815 while (len == 0) { 816 //If zero length, this is a keep alive msg 817 len = getMessageLength(); 818 } 819 } 820 int hLen = getHeaderLength(); 821 822 if (len == -1) { 823 if (hLen > 0) { 824 header = readHeader(hLen); 825 } 826 b = streamReceive(); 827 } 828 else if (len > 0 && len <= getMaxPacketLength()) { 829 if (hLen > 0) { 830 // ignore message header (TPDU) 831 // Note header length is not necessarily equal to hLen (see VAPChannel) 832 header = readHeader(hLen); 833 len -= header.length; 834 } 835 b = new byte[len]; 836 getMessage (b, 0, len); 837 getMessageTrailer(m); 838 } 839 else 840 throw new ISOException( 841 "receive length " +len + " seems strange - maxPacketLength = " + getMaxPacketLength()); 842 } finally { 843 serverInLock.unlock(); 844 } 845 m.setPackager (getDynamicPackager(header, b)); 846 m.setHeader (getDynamicHeader(header)); 847 if (b.length > 0 && !shouldIgnore (header)) // Ignore NULL messages 848 unpack (m, b); 849 m.setDirection(ISOMsg.INCOMING); 850 evt.addMessage (m); 851 m = applyIncomingFilters (m, header, b, evt); 852 m.setDirection(ISOMsg.INCOMING); 853 cnt[RX]++; 854 incrementMsgInCounter(m); 855 setChanged(); 856 notifyObservers(m); 857 } catch (ISOException e) { 858 evt.addMessage (e); 859 if (header != null) { 860 evt.addMessage ("--- header ---"); 861 evt.addMessage (ISOUtil.hexdump (header)); 862 } 863 if (b != null && debugIsoError) { 864 evt.addMessage ("--- data ---"); 865 evt.addMessage (ISOUtil.hexdump (b)); 866 } 867 throw e; 868 } catch (IOException e) { 869 evt.addMessage ( 870 new Disconnect(socket.getInetAddress().getHostAddress(), socket.getPort(), socket.getLocalPort(), 871 "%s (%s)".formatted(Caller.shortClassName(e.getClass().getName()), Caller.info()), e.getMessage()) 872 ); 873 closeSocket(); 874 throw e; 875 } catch (Exception e) { 876 closeSocket(); 877 evt.addMessage (m); 878 evt.addMessage (e); 879 throw new IOException ("unexpected exception", e); 880 } finally { 881 Logger.log (evt); 882 } 883 jfr.setDetail(m.toString()); 884 jfr.commit(); 885 return m; 886 } 887 /** 888 * Low level receive 889 * @param b byte array 890 * @throws IOException on error 891 * @return the total number of bytes read into the buffer, 892 * or -1 if there is no more data because the end of the stream has been reached. 893 */ 894 public int getBytes (byte[] b) throws IOException { 895 return serverIn.read (b); 896 } 897 /** 898 * disconnects the TCP/IP session. The instance is ready for 899 * a reconnection. There is no need to create a new ISOChannel<br> 900 * @exception IOException 901 */ 902 public void disconnect () throws IOException { 903 var jfr = new ChannelEvent.Disconnect(); 904 jfr.begin(); 905 LogEvent evt = new LogEvent (this, "disconnect"); 906 if (socket != null) { 907 String detail = socket.getRemoteSocketAddress().toString(); 908 jfr.setDetail(detail); 909 evt.addMessage(detail); 910 } 911 912 try { 913 usable = false; 914 setChanged(); 915 notifyObservers(); 916 closeSocket(); 917 if (serverIn != null) { 918 try { 919 serverIn.close(); 920 } catch (IOException ex) { evt.addMessage (ex); } 921 serverIn = null; 922 } 923 if (serverOut != null) { 924 try { 925 serverOut.close(); 926 } catch (IOException ex) { evt.addMessage (ex); } 927 serverOut = null; 928 } 929 } catch (IOException e) { 930 evt.addMessage (e); 931 jfr.append (e.getMessage()); 932 Logger.log (evt); 933 throw e; 934 } finally { 935 jfr.commit(); 936 } 937 socket = null; 938 } 939 /** 940 * Issues a disconnect followed by a connect 941 * @exception IOException 942 */ 943 public void reconnect() throws IOException { 944 disconnect(); 945 connect(); 946 } 947 public void setLogger (Logger logger, String realm) { 948 this.logger = logger; 949 this.realm = realm; 950 if (originalRealm == null) 951 originalRealm = realm; 952 } 953 public String getRealm () { 954 return realm; 955 } 956 public Logger getLogger() { 957 return logger; 958 } 959 public String getOriginalRealm() { 960 return originalRealm == null ? 961 this.getClass().getName() : originalRealm; 962 } 963 /** 964 * associates this ISOChannel with a name using NameRegistrar 965 * @param name name to register 966 * @see NameRegistrar 967 */ 968 public void setName (String name) { 969 this.name = name; 970 NameRegistrar.register("channel." + name, this); 971 } 972 /** 973 * @return this ISOChannel's name ("" if no name was set) 974 */ 975 public String getName() { 976 return this.name; 977 } 978 /** 979 * @param filter filter to add 980 * @param direction ISOMsg.INCOMING, ISOMsg.OUTGOING, 0 for both 981 */ 982 @SuppressWarnings ("unchecked") 983 public void addFilter (ISOFilter filter, int direction) { 984 switch (direction) { 985 case ISOMsg.INCOMING : 986 incomingFilters.add (filter); 987 break; 988 case ISOMsg.OUTGOING : 989 outgoingFilters.add (filter); 990 break; 991 case 0 : 992 incomingFilters.add (filter); 993 outgoingFilters.add (filter); 994 break; 995 } 996 } 997 /** 998 * @param filter incoming filter to add 999 */ 1000 public void addIncomingFilter (ISOFilter filter) { 1001 addFilter(filter, ISOMsg.INCOMING); 1002 } 1003 /** 1004 * @param filter outgoing filter to add 1005 */ 1006 public void addOutgoingFilter (ISOFilter filter) { 1007 addFilter(filter, ISOMsg.OUTGOING); 1008 } 1009 1010 /** 1011 * @param filter filter to add (both directions, incoming/outgoing) 1012 */ 1013 public void addFilter (ISOFilter filter) { 1014 addFilter (filter, 0); 1015 } 1016 /** 1017 * @param filter filter to remove 1018 * @param direction ISOMsg.INCOMING, ISOMsg.OUTGOING, 0 for both 1019 */ 1020 public void removeFilter (ISOFilter filter, int direction) { 1021 switch (direction) { 1022 case ISOMsg.INCOMING : 1023 incomingFilters.remove (filter); 1024 break; 1025 case ISOMsg.OUTGOING : 1026 outgoingFilters.remove (filter); 1027 break; 1028 case 0 : 1029 incomingFilters.remove (filter); 1030 outgoingFilters.remove (filter); 1031 break; 1032 } 1033 } 1034 /** 1035 * @param filter filter to remove (both directions) 1036 */ 1037 public void removeFilter (ISOFilter filter) { 1038 removeFilter (filter, 0); 1039 } 1040 /** 1041 * @param filter incoming filter to remove 1042 */ 1043 public void removeIncomingFilter (ISOFilter filter) { 1044 removeFilter(filter, ISOMsg.INCOMING); 1045 } 1046 /** 1047 * @param filter outgoing filter to remove 1048 */ 1049 public void removeOutgoingFilter (ISOFilter filter) { 1050 removeFilter (filter, ISOMsg.OUTGOING); 1051 } 1052 protected ISOMsg applyOutgoingFilters (ISOMsg m, LogEvent evt) 1053 throws VetoException 1054 { 1055 for (ISOFilter f :outgoingFilters) 1056 m = f.filter (this, m, evt); 1057 return m; 1058 } 1059 protected ISOMsg applyIncomingFilters (ISOMsg m, LogEvent evt) 1060 throws VetoException 1061 { 1062 return applyIncomingFilters (m, null, null, evt); 1063 } 1064 protected ISOMsg applyIncomingFilters (ISOMsg m, byte[] header, byte[] image, LogEvent evt) 1065 throws VetoException 1066 { 1067 for (ISOFilter f :incomingFilters) { 1068 if (image != null && f instanceof RawIncomingFilter) 1069 m = ((RawIncomingFilter)f).filter (this, m, header, image, evt); 1070 else 1071 m = f.filter (this, m, evt); 1072 } 1073 return m; 1074 } 1075 protected void unpack (ISOMsg m, byte[] b) throws ISOException { 1076 m.unpack (b); 1077 } 1078 protected byte[] pack (ISOMsg m) throws ISOException { 1079 return m.pack(); 1080 } 1081 /** 1082 * Implements Configurable<br> 1083 * Properties:<br> 1084 * <ul> 1085 * <li>host - destination host (if ClientChannel) 1086 * <li>port - port number (if ClientChannel) 1087 * <li>local-iface - local interfase to use (if ClientChannel) 1088 * <li>local-port - local port to bind (if ClientChannel) 1089 * </ul> 1090 * (host not present indicates a ServerChannel) 1091 * 1092 * @param cfg Configuration 1093 * @throws ConfigurationException 1094 */ 1095 public void setConfiguration (Configuration cfg) 1096 throws ConfigurationException 1097 { 1098 this.cfg = cfg; 1099 String h = cfg.get ("host"); 1100 int port = cfg.getInt ("port"); 1101 maxPacketLength = cfg.getInt ("max-packet-length", 100000); 1102 sendTimeout = cfg.getLong ("send-timeout", sendTimeout); 1103 if (h != null && h.length() > 0) { 1104 if (port == 0) 1105 throw new ConfigurationException 1106 ("invalid port for host '"+h+"'"); 1107 setHost (h, port); 1108 setLocalAddress (cfg.get("local-iface", null),cfg.getInt("local-port")); 1109 String[] altHosts = cfg.getAll ("alternate-host"); 1110 int[] altPorts = cfg.getInts ("alternate-port"); 1111 hosts = new String[altHosts.length + 1]; 1112 ports = new int[altPorts.length + 1]; 1113 if (hosts.length != ports.length) { 1114 throw new ConfigurationException ( 1115 "alternate host/port misconfiguration" 1116 ); 1117 } 1118 hosts[0] = host; 1119 ports[0] = port; 1120 System.arraycopy (altHosts, 0, hosts, 1, altHosts.length); 1121 System.arraycopy (altPorts, 0, ports, 1, altPorts.length); 1122 } 1123 setOverrideHeader(cfg.getBoolean ("override-header", false)); 1124 keepAlive = cfg.getBoolean ("keep-alive", false); 1125 expectKeepAlive = cfg.getBoolean ("expect-keep-alive", false); 1126 roundRobin = cfg.getBoolean ("round-robin", false); 1127 debugIsoError = cfg.getBoolean ("debug-iso-error", true); 1128 if (socketFactory != this && socketFactory instanceof Configurable) 1129 ((Configurable)socketFactory).setConfiguration (cfg); 1130 try { 1131 setTimeout (cfg.getInt ("timeout", DEFAULT_TIMEOUT)); 1132 connectTimeout = cfg.getInt ("connect-timeout", timeout); 1133 } catch (SocketException e) { 1134 throw new ConfigurationException (e); 1135 } 1136 } 1137 public Configuration getConfiguration() { 1138 return cfg; 1139 } 1140 public Collection<ISOFilter> getIncomingFilters() { 1141 return incomingFilters; 1142 } 1143 public Collection<ISOFilter> getOutgoingFilters() { 1144 return outgoingFilters; 1145 } 1146 public void setIncomingFilters (Collection filters) { 1147 incomingFilters = new ArrayList (filters); 1148 } 1149 public void setOutgoingFilters (Collection filters) { 1150 outgoingFilters = new ArrayList (filters); 1151 } 1152 public void setHeader (byte[] header) { 1153 this.header = header; 1154 } 1155 public void setHeader (String header) { 1156 setHeader (header.getBytes()); 1157 } 1158 public byte[] getHeader () { 1159 return header; 1160 } 1161 public void setOverrideHeader (boolean overrideHeader) { 1162 this.overrideHeader = overrideHeader; 1163 } 1164 public boolean isOverrideHeader () { 1165 return overrideHeader; 1166 } 1167 /** 1168 * @param name the Channel's name (without the "channel." prefix) 1169 * @return ISOChannel instance with given name. 1170 * @throws NameRegistrar.NotFoundException; 1171 * @see NameRegistrar 1172 */ 1173 public static ISOChannel getChannel (String name) 1174 throws NameRegistrar.NotFoundException 1175 { 1176 return (ISOChannel) NameRegistrar.get ("channel."+name); 1177 } 1178 /** 1179 * Gets the ISOClientSocketFactory (may be null) 1180 * @see ISOClientSocketFactory 1181 * @since 1.3.3 \ 1182 * @return ISOClientSocketFactory 1183 */ 1184 public ISOClientSocketFactory getSocketFactory() { 1185 return socketFactory; 1186 } 1187 /** 1188 * Sets the specified Socket Factory to create sockets 1189 * @param socketFactory the ISOClientSocketFactory 1190 * @see ISOClientSocketFactory 1191 * @since 1.3.3 1192 */ 1193 public void setSocketFactory(ISOClientSocketFactory socketFactory) { 1194 this.socketFactory = socketFactory; 1195 } 1196 public int getMaxPacketLength() { 1197 return maxPacketLength; 1198 } 1199 public void setMaxPacketLength(int maxPacketLength) { 1200 this.maxPacketLength = maxPacketLength; 1201 } 1202 protected void closeSocket() throws IOException { 1203 Socket s = null; 1204 synchronized (this) { 1205 if (socket != null) { 1206 s = socket; // we don't want more than one thread 1207 socket = null; // attempting to close the socket 1208 } 1209 } 1210 if (s != null) { 1211 try { 1212 s.setSoLinger (soLingerOn, soLingerSeconds); 1213 if (shutdownSupportedBySocket(s) && !isSoLingerForcingImmediateTcpReset()) 1214 s.shutdownOutput(); // This will force a TCP FIN to be sent on regular sockets, 1215 } catch (SocketException e) { 1216 // NOPMD 1217 // safe to ignore - can be closed already 1218 // e.printStackTrace(); 1219 } 1220 s.close (); 1221 } 1222 } 1223 private boolean shutdownSupportedBySocket(Socket s) { 1224 return !(s instanceof SSLSocket); // we can't close output on SSL connections, see [jPOS-85] 1225 } 1226 private boolean isSoLingerForcingImmediateTcpReset() { 1227 return soLingerOn && soLingerSeconds == 0; 1228 } 1229 public Object clone(){ 1230 try { 1231 BaseChannel channel = (BaseChannel) super.clone(); 1232 channel.cnt = cnt.clone(); 1233 // The lock objects must also be cloned, and the DataStreams nullified, as it makes no sense 1234 // to use the new lock objects to protect the old DataStreams. 1235 // This should be safe as the only code that calls BaseChannel.clone() is ISOServer.run(), 1236 // and it immediately calls accept(ServerSocket) which does a connect(), and that sets the stream objects. 1237 channel.serverInLock = new ReentrantLock(); 1238 channel.serverOutLock = new ReentrantLock(); 1239 channel.serverIn = null; 1240 channel.serverOut = null; 1241 channel.usable = false; 1242 channel.socket = null; 1243 return channel; 1244 } catch (CloneNotSupportedException e) { 1245 throw new InternalError(); 1246 } 1247 } 1248 1249 private UUID getSocketUUID() { 1250 return socket != null ? 1251 new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ socket.hashCode()) : 1252 uuid; 1253 } 1254 1255 protected void incrementMsgInCounter(ISOMsg m) throws ISOException { 1256 if (isoMsgMetrics != null) { 1257 isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_IN); 1258 } 1259 } 1260 protected void incrementMsgOutCounter(ISOMsg m) throws ISOException { 1261 if (isoMsgMetrics != null) { 1262 isoMsgMetrics.recordMessage(m, MeterInfo.ISOMSG_OUT); 1263 } 1264 } 1265}