001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.Environment; 024import org.jpos.iso.*; 025import org.jpos.q2.QBeanSupport; 026import org.jpos.q2.QFactory; 027import org.jpos.space.Space; 028import org.jpos.space.SpaceFactory; 029import org.jpos.util.LogSource; 030import org.jpos.util.NameRegistrar; 031 032/** 033 * OneShotChannelAdaptor connects and disconnects a channel for every message 034 * exchange. 035 * 036 * <p>Example qbean:</p> 037 * <client class="org.jpos.q2.iso.OneShotChannelAdaptor" logger="Q2" name="channel-adaptor"><br> 038 * <channel ...<br> 039 * ...<br> 040 * ...<br> 041 * </channel><br> 042 * <max-connections>5</max-connections><br> 043 * <max-connect-attempts>15</max-connect-attempts><br> 044 * <in>send</in><br> 045 * <out>receive</out><br> 046 * </client><br> 047 * 048 * @author Alejandro Revilla 049 * @author Thomas L. Kjeldsen 050 * @version $Revision$ $Date$ 051 * 052 */ 053public class OneShotChannelAdaptor 054 extends QBeanSupport 055 implements OneShotChannelAdaptorMBean, Channel 056{ 057 Space<String,Object> sp; 058 String in, out; 059 long delay; 060 int maxConnections; 061 int maxConnectAttempts; 062 public OneShotChannelAdaptor () { 063 super (); 064 } 065 066 @SuppressWarnings("unchecked") 067 private Space<String,Object> grabSpace (Element e) { 068 return (Space<String,Object>) SpaceFactory.getSpace (e != null ? e.getText() : ""); 069 } 070 071 public void initAdaptor() { 072 Element persist = getPersist (); 073 sp = grabSpace (persist.getChild ("space")); 074 in = Environment.get(persist.getChildTextTrim ("in")); 075 out = Environment.get(persist.getChildTextTrim ("out")); 076 delay = 5000; 077 078 String s = Environment.get(persist.getChildTextTrim ("max-connections")); 079 maxConnections = s!=null ? Integer.parseInt(s) : 1; // reasonable default 080 s = Environment.get(persist.getChildTextTrim ("max-connect-attempts")); 081 maxConnectAttempts = s!=null ? Integer.parseInt(s) : 15; // reasonable default 082 } 083 public void startService () { 084 try { 085 initAdaptor(); 086 for (int i=0; i<maxConnections; i++) { 087 Worker w = new Worker(i); 088 w.initChannel(); 089 new Thread(w).start(); 090 } 091 NameRegistrar.register (getName(), this); 092 } catch (Exception e) { 093 getLog().warn ("error starting service", e); 094 } 095 } 096 public void stopService () { 097 try { 098 for (int i=0; i<maxConnections; i++) { 099 sp.out(in, new Object()); 100 } 101 } catch (Exception e) { 102 getLog().warn ("error stopping service", e); 103 } 104 } 105 public void destroyService () { 106 NameRegistrar.unregister (getName ()); 107 NameRegistrar.unregister ("channel." + getName ()); 108 } 109 110 111 /** 112 * Queue a message to be transmitted by this adaptor 113 * @param m message to send 114 */ 115 public void send (ISOMsg m) { 116 sp.out (in, m); 117 } 118 /** 119 * Queue a message to be transmitted by this adaptor 120 * @param m message to send 121 * @param timeout in millis 122 */ 123 public void send (ISOMsg m, long timeout) { 124 sp.out (in, m, timeout); 125 } 126 127 /** 128 * Receive message 129 */ 130 public ISOMsg receive () { 131 return (ISOMsg) sp.in (out); 132 } 133 134 /** 135 * Receive message 136 * @param timeout time to wait for an incoming message 137 */ 138 public ISOMsg receive (long timeout) { 139 return (ISOMsg) sp.in (out, timeout); 140 } 141 142 public class Worker implements Runnable { 143 ISOChannel channel; 144 int id; 145 public Worker (int i) { 146 super (); 147 id = i; 148 } 149 public void run () { 150 Thread.currentThread().setName ("channel-worker-" + id); 151 int[] handbackFields = cfg.getInts ("handback-field"); 152 while (running ()){ 153 try { 154 Object o = sp.in (in, delay); 155 if (o instanceof ISOMsg) { 156 ISOMsg m = (ISOMsg) o; 157 ISOMsg handBack = null; 158 if (handbackFields.length > 0) 159 handBack = (ISOMsg) m.clone (handbackFields); 160 for (int i=0; !channel.isConnected() 161 && i<maxConnectAttempts; i++) 162 { 163 channel.reconnect(); 164 if (!channel.isConnected()) 165 ISOUtil.sleep (1000L); 166 } 167 if (channel.isConnected()) { 168 channel.send (m); 169 m = channel.receive(); 170 channel.disconnect(); 171 if (handBack != null) 172 m.merge (handBack); 173 sp.out (out, m); 174 } 175 } 176 } catch (Exception e) { 177 getLog().warn ("channel-worker-"+id, e.getMessage ()); 178 ISOUtil.sleep (1000); 179 } finally { 180 try { 181 channel.disconnect(); 182 } catch (Exception e) { 183 getLog().warn ("channel-worker-"+id, e.getMessage ()); 184 } 185 } 186 } 187 } 188 189 public void initChannel () throws ConfigurationException { 190 Element persist = getPersist (); 191 Element e = persist.getChild ("channel"); 192 if (e == null) 193 throw new ConfigurationException ("channel element missing"); 194 195 channel = newChannel (e, getFactory()); 196 197 String socketFactoryString = getSocketFactory(); 198 if (socketFactoryString != null && channel instanceof FactoryChannel) { 199 ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString); 200 if (sFac instanceof LogSource) { 201 ((LogSource) sFac).setLogger(log.getLogger(),getName() + ".socket-factory"); 202 } 203 getFactory().setConfiguration (sFac, e); 204 ((FactoryChannel)channel).setSocketFactory(sFac); 205 } 206 207 } 208 private ISOChannel newChannel (Element e, QFactory f) 209 throws ConfigurationException 210 { 211 String channelName = QFactory.getAttributeValue (e, "class"); 212 if (channelName == null) 213 throw new ConfigurationException ("class attribute missing from channel element."); 214 215 String packagerName = QFactory.getAttributeValue (e, "packager"); 216 217 ISOChannel channel = f.newInstance (channelName); 218 if (packagerName != null) { 219 ISOPackager packager = f.newInstance (packagerName); 220 channel.setPackager (packager); 221 f.setConfiguration (packager, e); 222 } 223 QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header")); 224 f.setLogger (channel, e); 225 f.setConfiguration (channel, e); 226 227 if (channel instanceof FilteredChannel) { 228 addFilters ((FilteredChannel) channel, e, f); 229 } 230 if (getName () != null) 231 channel.setName (getName ()+id); 232 return channel; 233 } 234 235 private void addFilters (FilteredChannel channel, Element e, QFactory fact) 236 throws ConfigurationException 237 { 238 for (Object o : e.getChildren("filter")) { 239 Element f = (Element) o; 240 String clazz = QFactory.getAttributeValue(f, "class"); 241 ISOFilter filter = (ISOFilter) fact.newInstance(clazz); 242 fact.setLogger(filter, f); 243 fact.setConfiguration(filter, f); 244 String direction = QFactory.getAttributeValue(f, "direction"); 245 if (direction == null) 246 channel.addFilter(filter); 247 else if ("incoming".equalsIgnoreCase(direction)) 248 channel.addIncomingFilter(filter); 249 else if ("outgoing".equalsIgnoreCase(direction)) 250 channel.addOutgoingFilter(filter); 251 else if ("both".equalsIgnoreCase(direction)) { 252 channel.addIncomingFilter(filter); 253 channel.addOutgoingFilter(filter); 254 } 255 } 256 } 257 258 } 259 260 public synchronized void setInQueue (String in) { 261 String old = this.in; 262 this.in = in; 263 if (old != null) 264 sp.out (old, new Object()); 265 266 getPersist().getChild("in").setText (in); 267 setModified (true); 268 } 269 270 public String getInQueue () { 271 return in; 272 } 273 274 public synchronized void setOutQueue (String out) { 275 this.out = out; 276 getPersist().getChild("out").setText (out); 277 setModified (true); 278 } 279 280 public String getOutQueue () { 281 return out; 282 } 283 284 public synchronized void setHost (String host) { 285 setProperty (getProperties ("channel"), "host", host); 286 setModified (true); 287 } 288 289 public String getHost () { 290 return getProperty (getProperties ("channel"), "host"); 291 } 292 293 public synchronized void setPort (int port) { 294 setProperty ( 295 getProperties ("channel"), "port", Integer.toString (port) 296 ); 297 setModified (true); 298 } 299 public int getPort () { 300 int port = 0; 301 try { 302 port = Integer.parseInt ( 303 getProperty (getProperties ("channel"), "port") 304 ); 305 } catch (NumberFormatException e) { 306 getLog().error (e); 307 } 308 return port; 309 } 310 public synchronized void setSocketFactory (String sFac) { 311 setProperty(getProperties("channel"), "socketFactory", sFac); 312 setModified(true); 313 } 314 public String getSocketFactory() { 315 return getProperty(getProperties ("channel"), "socketFactory"); 316 } 317} 318