001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.iso; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.util.*; 025import org.jpos.util.NameRegistrar.NotFoundException; 026 027import java.io.IOException; 028 029/** 030 * Connector implements ISORequestListener 031 * and forward all incoming messages to a given 032 * destination MUX, or Channel handling back responses 033 * 034 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a> 035 * @version $Revision$ $Date$ 036 * @see org.jpos.iso.ISORequestListener 037 */ 038public class Connector 039 implements ISORequestListener, LogSource, Configurable 040{ 041 private Logger logger; 042 private String realm; 043 private boolean preserveSourceHeader = true; 044 /** Name of the QMUX to look up. */ 045 protected String muxName; 046 /** Name of the channel to look up. */ 047 protected String channelName; 048 /** Response timeout in milliseconds (0 = no timeout). */ 049 protected int timeout = 0; 050 /** Shared thread pool for processing incoming messages. */ 051 protected static ThreadPool pool; 052 053 /** Default constructor. */ 054 public Connector () { 055 super(); 056 } 057 058 public void setLogger (Logger logger, String realm) { 059 this.logger = logger; 060 this.realm = realm; 061 } 062 public String getRealm () { 063 return realm; 064 } 065 public Logger getLogger() { 066 return logger; 067 } 068 /** 069 * Destination can be a Channel or a MUX. If Destination is a Channel 070 * then timeout applies (used on ISORequest to get a Response). 071 * <ul> 072 * <li>destination-mux 073 * <li>destination-channel 074 * <li>timeout 075 * <li>poolsize 076 * </ul> 077 * @param cfg Configuration 078 */ 079 public void setConfiguration (Configuration cfg) 080 throws ConfigurationException 081 { 082 timeout = cfg.getInt ("timeout"); 083 if (pool == null) 084 pool = new ThreadPool (1, cfg.getInt ("poolsize", 10)); 085 muxName = cfg.get ("destination-mux", null); 086 channelName = cfg.get ("destination-channel", null); 087 preserveSourceHeader = cfg.getBoolean ("preserve-source-header", true); 088 if (muxName == null && channelName == null) { 089 throw new ConfigurationException("Neither destination mux nor channel were specified."); 090 } 091 } 092 093 /** Runnable that processes a single incoming message through the MUX. */ 094 protected class Process implements Runnable { 095 ISOSource source; 096 ISOMsg m; 097 Process (ISOSource source, ISOMsg m) { 098 super(); 099 this.source = source; 100 this.m = m; 101 } 102 public void run () { 103 LogEvent evt = new LogEvent (Connector.this, 104 "connector-request-listener"); 105 try { 106 ISOMsg c = (ISOMsg) m.clone(); 107 evt.addMessage (c); 108 if (muxName != null) { 109 MUX destMux = (MUX) NameRegistrar.get (muxName); 110 ISOMsg response = destMux.request (c, timeout); 111 if (response != null) { 112 if (preserveSourceHeader) 113 response.setHeader (c.getISOHeader()); 114 source.send(response); 115 } 116 } else if (channelName != null) { 117 Channel destChannel = (Channel) NameRegistrar.get (channelName); 118 destChannel.send (c); 119 } 120 } catch (ISOException e) { 121 evt.addMessage (e); 122 } catch (IOException e) { 123 evt.addMessage (e); 124 } catch (NotFoundException e) { 125 evt.addMessage(e); 126 } 127 Logger.log (evt); 128 } 129 130 } 131 public boolean process (ISOSource source, ISOMsg m) { 132 if (pool == null) 133 pool = new ThreadPool (1, 10); 134 135 pool.execute (new Process (source, m)); 136 return true; 137 } 138}