001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.iso; 020 021import org.jpos.core.Configurable; 022import org.jpos.core.Configuration; 023import org.jpos.core.ConfigurationException; 024import org.jpos.util.*; 025import org.jpos.util.NameRegistrar.NotFoundException; 026 027import java.io.IOException; 028 029/** 030 * Connector implements ISORequestListener 031 * and forward all incoming messages to a given 032 * destination MUX, or Channel handling back responses 033 * 034 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a> 035 * @version $Revision$ $Date$ 036 * @see org.jpos.iso.ISORequestListener 037 */ 038public class Connector 039 implements ISORequestListener, LogSource, Configurable 040{ 041 private Logger logger; 042 private String realm; 043 private boolean preserveSourceHeader = true; 044 protected String muxName; 045 protected String channelName; 046 protected int timeout = 0; 047 protected static ThreadPool pool; 048 049 public Connector () { 050 super(); 051 } 052 053 public void setLogger (Logger logger, String realm) { 054 this.logger = logger; 055 this.realm = realm; 056 } 057 public String getRealm () { 058 return realm; 059 } 060 public Logger getLogger() { 061 return logger; 062 } 063 /** 064 * Destination can be a Channel or a MUX. If Destination is a Channel 065 * then timeout applies (used on ISORequest to get a Response). 066 * <ul> 067 * <li>destination-mux 068 * <li>destination-channel 069 * <li>timeout 070 * <li>poolsize 071 * </ul> 072 * @param cfg Configuration 073 */ 074 public void setConfiguration (Configuration cfg) 075 throws ConfigurationException 076 { 077 timeout = cfg.getInt ("timeout"); 078 if (pool == null) 079 pool = new ThreadPool (1, cfg.getInt ("poolsize", 10)); 080 muxName = cfg.get ("destination-mux", null); 081 channelName = cfg.get ("destination-channel", null); 082 preserveSourceHeader = cfg.getBoolean ("preserve-source-header", true); 083 if (muxName == null && channelName == null) { 084 throw new ConfigurationException("Neither destination mux nor channel were specified."); 085 } 086 } 087 088 protected class Process implements Runnable { 089 ISOSource source; 090 ISOMsg m; 091 Process (ISOSource source, ISOMsg m) { 092 super(); 093 this.source = source; 094 this.m = m; 095 } 096 public void run () { 097 LogEvent evt = new LogEvent (Connector.this, 098 "connector-request-listener"); 099 try { 100 ISOMsg c = (ISOMsg) m.clone(); 101 evt.addMessage (c); 102 if (muxName != null) { 103 MUX destMux = (MUX) NameRegistrar.get (muxName); 104 ISOMsg response = destMux.request (c, timeout); 105 if (response != null) { 106 if (preserveSourceHeader) 107 response.setHeader (c.getISOHeader()); 108 source.send(response); 109 } 110 } else if (channelName != null) { 111 Channel destChannel = (Channel) NameRegistrar.get (channelName); 112 destChannel.send (c); 113 } 114 } catch (ISOException e) { 115 evt.addMessage (e); 116 } catch (IOException e) { 117 evt.addMessage (e); 118 } catch (NotFoundException e) { 119 evt.addMessage(e); 120 } 121 Logger.log (evt); 122 } 123 124 } 125 public boolean process (ISOSource source, ISOMsg m) { 126 if (pool == null) 127 pool = new ThreadPool (1, 10); 128 129 pool.execute (new Process (source, m)); 130 return true; 131 } 132}