001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.Environment;
024import org.jpos.iso.*;
025import org.jpos.q2.QBeanSupport;
026import org.jpos.q2.QFactory;
027import org.jpos.space.Space;
028import org.jpos.space.SpaceFactory;
029import org.jpos.util.LogSource;
030import org.jpos.util.NameRegistrar;
031
032/**
033 * OneShotChannelAdaptor connects and disconnects a channel for every message
034 * exchange.
035 * 
036 * <p>Example qbean:</p>
037 * &lt;client class="org.jpos.q2.iso.OneShotChannelAdaptor" logger="Q2" name="channel-adaptor"&gt;<br>
038 *   &lt;channel ...<br>
039 *     ...<br>
040 *     ...<br>
041 *   &lt;/channel&gt;<br>
042 *   &lt;max-connections&gt;5&lt;/max-connections&gt;<br>
043 *   &lt;max-connect-attempts&gt;15&lt;/max-connect-attempts&gt;<br>
044 *   &lt;in&gt;send&lt;/in&gt;<br>
045 *   &lt;out&gt;receive&lt;/out&gt;<br>
046 * &lt;/client&gt;<br>
047 *
048 * @author Alejandro Revilla
049 * @author Thomas L. Kjeldsen
050 * @version $Revision$ $Date$
051 *
052 */
053public class OneShotChannelAdaptor 
054    extends QBeanSupport
055    implements OneShotChannelAdaptorMBean, Channel
056{
057    Space<String,Object> sp;
058    String in, out;
059    long delay;
060    int maxConnections;
061    int maxConnectAttempts;
062    public OneShotChannelAdaptor () {
063        super ();
064    }
065
066    @SuppressWarnings("unchecked")
067    private Space<String,Object> grabSpace (Element e) {
068        return (Space<String,Object>) SpaceFactory.getSpace (e != null ? e.getText() : "");
069    }
070
071    public void initAdaptor() {
072        Element persist = getPersist ();
073        sp = grabSpace (persist.getChild ("space"));
074        in = Environment.get(persist.getChildTextTrim ("in"));
075        out = Environment.get(persist.getChildTextTrim ("out"));
076        delay = 5000;
077
078        String s = Environment.get(persist.getChildTextTrim ("max-connections"));
079        maxConnections = s!=null ? Integer.parseInt(s) : 1;  // reasonable default
080        s = Environment.get(persist.getChildTextTrim ("max-connect-attempts"));
081        maxConnectAttempts = s!=null ? Integer.parseInt(s) : 15;  // reasonable default
082    }
083    public void startService () {
084        try {
085            initAdaptor();
086            for (int i=0; i<maxConnections; i++) {
087                Worker w = new Worker(i);
088                w.initChannel();
089                new Thread(w).start();
090            }
091            NameRegistrar.register (getName(), this);
092        } catch (Exception e) {
093            getLog().warn ("error starting service", e);
094        }
095    }
096    public void stopService () {
097        try {
098            for (int i=0; i<maxConnections; i++) {
099                sp.out(in, new Object());
100            }
101        } catch (Exception e) {
102            getLog().warn ("error stopping service", e);
103        }
104    }
105    public void destroyService () {
106        NameRegistrar.unregister (getName ());
107        NameRegistrar.unregister ("channel." + getName ());
108    }
109
110
111    /**
112     * Queue a message to be transmitted by this adaptor
113     * @param m message to send
114     */
115    public void send (ISOMsg m) {
116        sp.out (in, m);
117    }
118    /**
119     * Queue a message to be transmitted by this adaptor
120     * @param m message to send
121     * @param timeout in millis
122     */
123    public void send (ISOMsg m, long timeout) {
124        sp.out (in, m, timeout);
125    }
126
127    /**
128     * Receive message
129     */
130    public ISOMsg receive () {
131        return (ISOMsg) sp.in (out);
132    }
133
134    /**
135     * Receive message
136     * @param timeout time to wait for an incoming message
137     */
138    public ISOMsg receive (long timeout) {
139        return (ISOMsg) sp.in (out, timeout);
140    }
141
142    public class Worker implements Runnable {
143        ISOChannel channel;
144        int id;
145        public Worker (int i) {
146            super ();
147            id = i;
148        }
149        public void run () {
150            Thread.currentThread().setName ("channel-worker-" + id);
151            int[] handbackFields = cfg.getInts ("handback-field");
152            while (running ()){
153                try {
154                    Object o = sp.in (in, delay);
155                    if (o instanceof ISOMsg) {
156                        ISOMsg m = (ISOMsg) o;
157                        ISOMsg handBack = null;
158                        if (handbackFields.length > 0)
159                            handBack = (ISOMsg) m.clone (handbackFields);
160                        for (int i=0; !channel.isConnected() 
161                                && i<maxConnectAttempts; i++) 
162                        {
163                            channel.reconnect();
164                            if (!channel.isConnected())
165                                ISOUtil.sleep (1000L);
166                        }
167                        if (channel.isConnected()) {
168                            channel.send (m);
169                            m = channel.receive();
170                            channel.disconnect();
171                            if (handBack != null)
172                                m.merge (handBack);
173                            sp.out (out, m);
174                        }
175                    }
176                } catch (Exception e) { 
177                    getLog().warn ("channel-worker-"+id, e.getMessage ());
178                    ISOUtil.sleep (1000);
179                } finally {
180                    try {
181                        channel.disconnect();
182                    } catch (Exception e) {
183                        getLog().warn ("channel-worker-"+id, e.getMessage ());
184                    }
185                }
186            }
187        }
188
189        public void initChannel () throws ConfigurationException {
190            Element persist = getPersist ();
191            Element e = persist.getChild ("channel");
192            if (e == null)
193                throw new ConfigurationException ("channel element missing");
194
195            channel = newChannel (e, getFactory());
196            
197            String socketFactoryString = getSocketFactory();
198            if (socketFactoryString != null && channel instanceof FactoryChannel) {
199                ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString);
200                if (sFac instanceof LogSource) {
201                    ((LogSource) sFac).setLogger(log.getLogger(),getName() + ".socket-factory");
202                }
203                getFactory().setConfiguration (sFac, e);
204                ((FactoryChannel)channel).setSocketFactory(sFac);
205            }
206
207        }
208        private ISOChannel newChannel (Element e, QFactory f) 
209            throws ConfigurationException
210        {
211            String channelName  = QFactory.getAttributeValue (e, "class");
212            if (channelName == null)
213                throw new ConfigurationException ("class attribute missing from channel element.");
214            
215            String packagerName = QFactory.getAttributeValue (e, "packager");
216
217            ISOChannel channel = f.newInstance (channelName);
218            if (packagerName != null) {
219                ISOPackager packager = f.newInstance (packagerName);
220                channel.setPackager (packager);
221                f.setConfiguration (packager, e);
222            }
223            QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header"));
224            f.setLogger        (channel, e);
225            f.setConfiguration (channel, e);
226
227            if (channel instanceof FilteredChannel) {
228                addFilters ((FilteredChannel) channel, e, f);
229            }
230            if (getName () != null)
231                channel.setName (getName ()+id);
232            return channel;
233        }
234
235        private void addFilters (FilteredChannel channel, Element e, QFactory fact) 
236            throws ConfigurationException
237        {
238            for (Object o : e.getChildren("filter")) {
239                Element f = (Element) o;
240                String clazz = QFactory.getAttributeValue(f, "class");
241                ISOFilter filter = (ISOFilter) fact.newInstance(clazz);
242                fact.setLogger(filter, f);
243                fact.setConfiguration(filter, f);
244                String direction = QFactory.getAttributeValue(f, "direction");
245                if (direction == null)
246                    channel.addFilter(filter);
247                else if ("incoming".equalsIgnoreCase(direction))
248                    channel.addIncomingFilter(filter);
249                else if ("outgoing".equalsIgnoreCase(direction))
250                    channel.addOutgoingFilter(filter);
251                else if ("both".equalsIgnoreCase(direction)) {
252                    channel.addIncomingFilter(filter);
253                    channel.addOutgoingFilter(filter);
254                }
255            }
256        }
257    
258    }
259
260    public synchronized void setInQueue (String in) {
261        String old = this.in;
262        this.in = in;
263        if (old != null)
264            sp.out (old, new Object());
265
266        getPersist().getChild("in").setText (in);
267        setModified (true);
268    }
269
270    public String getInQueue () {
271        return in;
272    }
273
274    public synchronized void setOutQueue (String out) {
275        this.out = out;
276        getPersist().getChild("out").setText (out);
277        setModified (true);
278    }
279
280    public String getOutQueue () {
281        return out;
282    }
283
284    public synchronized void setHost (String host) {
285        setProperty (getProperties ("channel"), "host", host);
286        setModified (true);
287    }
288
289    public String getHost () {
290        return getProperty (getProperties ("channel"), "host");
291    }
292
293    public synchronized void setPort (int port) {
294        setProperty (
295            getProperties ("channel"), "port", Integer.toString (port)
296        );
297        setModified (true);
298    }
299    public int getPort () {
300        int port = 0;
301        try {
302            port = Integer.parseInt (
303                getProperty (getProperties ("channel"), "port")
304            );
305        } catch (NumberFormatException e) {
306            getLog().error (e);
307        }
308        return port;
309    }
310    public synchronized void setSocketFactory (String sFac) {
311        setProperty(getProperties("channel"), "socketFactory", sFac);
312        setModified(true);
313    }
314    public String getSocketFactory() {
315        return getProperty(getProperties ("channel"), "socketFactory");
316    }
317}
318