001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.Environment; 024import org.jpos.iso.*; 025import org.jpos.q2.QBeanSupport; 026import org.jpos.q2.QFactory; 027import org.jpos.space.Space; 028import org.jpos.space.SpaceFactory; 029import org.jpos.util.LogSource; 030import org.jpos.util.NameRegistrar; 031import org.jpos.util.Realm; 032 033/** 034 * OneShotChannelAdaptor connects and disconnects a channel for every message 035 * exchange. 036 * 037 * <p>Example qbean:</p> 038 * <client class="org.jpos.q2.iso.OneShotChannelAdaptor" logger="Q2" name="channel-adaptor"><br> 039 * <channel ...<br> 040 * ...<br> 041 * ...<br> 042 * </channel><br> 043 * <max-connections>5</max-connections><br> 044 * <max-connect-attempts>15</max-connect-attempts><br> 045 * <in>send</in><br> 046 * <out>receive</out><br> 047 * </client><br> 048 * 049 * @author Alejandro Revilla 050 * @author Thomas L. Kjeldsen 051 * @version $Revision$ $Date$ 052 * 053 */ 054public class OneShotChannelAdaptor 055 extends QBeanSupport 056 implements OneShotChannelAdaptorMBean, Channel 057{ 058 Space<String,Object> sp; 059 String in, out; 060 long delay; 061 int maxConnections; 062 int maxConnectAttempts; 063 /** Default constructor. */ 064 public OneShotChannelAdaptor () { 065 super (); 066 } 067 068 @Override 069 protected String defaultRealm() { 070 return Realm.COMM_CLIENT; 071 } 072 073 @SuppressWarnings("unchecked") 074 private Space<String,Object> grabSpace (Element e) { 075 return (Space<String,Object>) SpaceFactory.getSpace (e != null ? e.getText() : ""); 076 } 077 078 /** Reads the bean's persist element and initializes its in/out queue references. */ 079 public void initAdaptor() { 080 Element persist = getPersist (); 081 sp = grabSpace (persist.getChild ("space")); 082 in = Environment.get(persist.getChildTextTrim ("in")); 083 out = Environment.get(persist.getChildTextTrim ("out")); 084 delay = 5000; 085 086 String s = Environment.get(persist.getChildTextTrim ("max-connections")); 087 maxConnections = s!=null ? Integer.parseInt(s) : 1; // reasonable default 088 s = Environment.get(persist.getChildTextTrim ("max-connect-attempts")); 089 maxConnectAttempts = s!=null ? Integer.parseInt(s) : 15; // reasonable default 090 } 091 public void startService () { 092 try { 093 initAdaptor(); 094 for (int i=0; i<maxConnections; i++) { 095 Worker w = new Worker(i); 096 w.initChannel(); 097 new Thread(w).start(); 098 } 099 NameRegistrar.register (getName(), this); 100 } catch (Exception e) { 101 getLog().warn ("error starting service", e); 102 } 103 } 104 public void stopService () { 105 try { 106 for (int i=0; i<maxConnections; i++) { 107 sp.out(in, new Object()); 108 } 109 } catch (Exception e) { 110 getLog().warn ("error stopping service", e); 111 } 112 } 113 public void destroyService () { 114 NameRegistrar.unregister (getName ()); 115 NameRegistrar.unregister ("channel." + getName ()); 116 } 117 118 119 /** 120 * Queue a message to be transmitted by this adaptor 121 * @param m message to send 122 */ 123 public void send (ISOMsg m) { 124 sp.out (in, m); 125 } 126 /** 127 * Queue a message to be transmitted by this adaptor 128 * @param m message to send 129 * @param timeout in millis 130 */ 131 public void send (ISOMsg m, long timeout) { 132 sp.out (in, m, timeout); 133 } 134 135 /** 136 * Receive message 137 */ 138 public ISOMsg receive () { 139 return (ISOMsg) sp.in (out); 140 } 141 142 /** 143 * Receive message 144 * @param timeout time to wait for an incoming message 145 */ 146 public ISOMsg receive (long timeout) { 147 return (ISOMsg) sp.in (out, timeout); 148 } 149 150 /** Per-message worker that opens an ad-hoc channel, sends the request, and forwards any response. */ 151 public class Worker implements Runnable { 152 ISOChannel channel; 153 int id; 154 /** 155 * Constructs a Worker with the given identifier. 156 * 157 * @param i worker identifier (used in the thread name) 158 */ 159 public Worker (int i) { 160 super (); 161 id = i; 162 } 163 public void run () { 164 Thread.currentThread().setName ("channel-worker-" + id); 165 int[] handbackFields = cfg.getInts ("handback-field"); 166 while (running ()){ 167 try { 168 Object o = sp.in (in, delay); 169 if (o instanceof ISOMsg) { 170 ISOMsg m = (ISOMsg) o; 171 ISOMsg handBack = null; 172 if (handbackFields.length > 0) 173 handBack = (ISOMsg) m.clone (handbackFields); 174 for (int i=0; !channel.isConnected() 175 && i<maxConnectAttempts; i++) 176 { 177 channel.reconnect(); 178 if (!channel.isConnected()) 179 ISOUtil.sleep (1000L); 180 } 181 if (channel.isConnected()) { 182 channel.send (m); 183 m = channel.receive(); 184 channel.disconnect(); 185 if (handBack != null) 186 m.merge (handBack); 187 sp.out (out, m); 188 } 189 } 190 } catch (Exception e) { 191 getLog().warn ("channel-worker-"+id, e.getMessage ()); 192 ISOUtil.sleep (1000); 193 } finally { 194 try { 195 channel.disconnect(); 196 } catch (Exception e) { 197 getLog().warn ("channel-worker-"+id, e.getMessage ()); 198 } 199 } 200 } 201 } 202 203 /** 204 * Builds and configures the channel used by this worker for one request/response cycle. 205 * 206 * @throws ConfigurationException if the channel configuration is missing or invalid 207 */ 208 public void initChannel () throws ConfigurationException { 209 Element persist = getPersist (); 210 Element e = persist.getChild ("channel"); 211 if (e == null) 212 throw new ConfigurationException ("channel element missing"); 213 214 channel = newChannel (e, getFactory()); 215 216 String socketFactoryString = getSocketFactory(); 217 if (socketFactoryString != null && channel instanceof FactoryChannel) { 218 ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString); 219 if (sFac instanceof LogSource) { 220 ((LogSource) sFac).setLogger(log.getLogger(), getRealm()); 221 } 222 getFactory().setConfiguration (sFac, e); 223 ((FactoryChannel)channel).setSocketFactory(sFac); 224 } 225 226 } 227 private ISOChannel newChannel (Element e, QFactory f) 228 throws ConfigurationException 229 { 230 String channelName = QFactory.getAttributeValue (e, "class"); 231 if (channelName == null) 232 throw new ConfigurationException ("class attribute missing from channel element."); 233 234 String packagerName = QFactory.getAttributeValue (e, "packager"); 235 236 ISOChannel channel = f.newInstance (channelName); 237 if (packagerName != null) { 238 ISOPackager packager = f.newInstance (packagerName); 239 channel.setPackager (packager); 240 f.setConfiguration (packager, e); 241 } 242 QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header")); 243 f.setLogger (channel, e, getRealm()); 244 f.setConfiguration (channel, e); 245 246 if (channel instanceof FilteredChannel) { 247 addFilters ((FilteredChannel) channel, e, f); 248 } 249 if (getName () != null) 250 channel.setName (getName ()+id); 251 return channel; 252 } 253 254 private void addFilters (FilteredChannel channel, Element e, QFactory fact) 255 throws ConfigurationException 256 { 257 for (Object o : e.getChildren("filter")) { 258 Element f = (Element) o; 259 String clazz = QFactory.getAttributeValue(f, "class"); 260 ISOFilter filter = (ISOFilter) fact.newInstance(clazz); 261 fact.setLogger(filter, f); 262 fact.setConfiguration(filter, f); 263 String direction = QFactory.getAttributeValue(f, "direction"); 264 if (direction == null) 265 channel.addFilter(filter); 266 else if ("incoming".equalsIgnoreCase(direction)) 267 channel.addIncomingFilter(filter); 268 else if ("outgoing".equalsIgnoreCase(direction)) 269 channel.addOutgoingFilter(filter); 270 else if ("both".equalsIgnoreCase(direction)) { 271 channel.addIncomingFilter(filter); 272 channel.addOutgoingFilter(filter); 273 } 274 } 275 } 276 277 } 278 279 public synchronized void setInQueue (String in) { 280 String old = this.in; 281 this.in = in; 282 if (old != null) 283 sp.out (old, new Object()); 284 285 getPersist().getChild("in").setText (in); 286 setModified (true); 287 } 288 289 public String getInQueue () { 290 return in; 291 } 292 293 public synchronized void setOutQueue (String out) { 294 this.out = out; 295 getPersist().getChild("out").setText (out); 296 setModified (true); 297 } 298 299 public String getOutQueue () { 300 return out; 301 } 302 303 public synchronized void setHost (String host) { 304 setProperty (getProperties ("channel"), "host", host); 305 setModified (true); 306 } 307 308 public String getHost () { 309 return getProperty (getProperties ("channel"), "host"); 310 } 311 312 public synchronized void setPort (int port) { 313 setProperty ( 314 getProperties ("channel"), "port", Integer.toString (port) 315 ); 316 setModified (true); 317 } 318 public int getPort () { 319 int port = 0; 320 try { 321 port = Integer.parseInt ( 322 getProperty (getProperties ("channel"), "port") 323 ); 324 } catch (NumberFormatException e) { 325 getLog().error (e); 326 } 327 return port; 328 } 329 public synchronized void setSocketFactory (String sFac) { 330 setProperty(getProperties("channel"), "socketFactory", sFac); 331 setModified(true); 332 } 333 public String getSocketFactory() { 334 return getProperty(getProperties ("channel"), "socketFactory"); 335 } 336}