001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.Environment;
024import org.jpos.iso.*;
025import org.jpos.q2.QBeanSupport;
026import org.jpos.q2.QFactory;
027import org.jpos.space.Space;
028import org.jpos.space.SpaceFactory;
029import org.jpos.util.LogSource;
030import org.jpos.util.NameRegistrar;
031import org.jpos.util.Realm;
032
033/**
034 * OneShotChannelAdaptor connects and disconnects a channel for every message
035 * exchange.
036 * 
037 * <p>Example qbean:</p>
038 * &lt;client class="org.jpos.q2.iso.OneShotChannelAdaptor" logger="Q2" name="channel-adaptor"&gt;<br>
039 *   &lt;channel ...<br>
040 *     ...<br>
041 *     ...<br>
042 *   &lt;/channel&gt;<br>
043 *   &lt;max-connections&gt;5&lt;/max-connections&gt;<br>
044 *   &lt;max-connect-attempts&gt;15&lt;/max-connect-attempts&gt;<br>
045 *   &lt;in&gt;send&lt;/in&gt;<br>
046 *   &lt;out&gt;receive&lt;/out&gt;<br>
047 * &lt;/client&gt;<br>
048 *
049 * @author Alejandro Revilla
050 * @author Thomas L. Kjeldsen
051 * @version $Revision$ $Date$
052 *
053 */
054public class OneShotChannelAdaptor 
055    extends QBeanSupport
056    implements OneShotChannelAdaptorMBean, Channel
057{
058    Space<String,Object> sp;
059    String in, out;
060    long delay;
061    int maxConnections;
062    int maxConnectAttempts;
063    /** Default constructor. */
064    public OneShotChannelAdaptor () {
065        super ();
066    }
067
068    @Override
069    protected String defaultRealm() {
070        return Realm.COMM_CLIENT;
071    }
072
073    @SuppressWarnings("unchecked")
074    private Space<String,Object> grabSpace (Element e) {
075        return (Space<String,Object>) SpaceFactory.getSpace (e != null ? e.getText() : "");
076    }
077
078    /** Reads the bean's persist element and initializes its in/out queue references. */
079    public void initAdaptor() {
080        Element persist = getPersist ();
081        sp = grabSpace (persist.getChild ("space"));
082        in = Environment.get(persist.getChildTextTrim ("in"));
083        out = Environment.get(persist.getChildTextTrim ("out"));
084        delay = 5000;
085
086        String s = Environment.get(persist.getChildTextTrim ("max-connections"));
087        maxConnections = s!=null ? Integer.parseInt(s) : 1;  // reasonable default
088        s = Environment.get(persist.getChildTextTrim ("max-connect-attempts"));
089        maxConnectAttempts = s!=null ? Integer.parseInt(s) : 15;  // reasonable default
090    }
091    public void startService () {
092        try {
093            initAdaptor();
094            for (int i=0; i<maxConnections; i++) {
095                Worker w = new Worker(i);
096                w.initChannel();
097                new Thread(w).start();
098            }
099            NameRegistrar.register (getName(), this);
100        } catch (Exception e) {
101            getLog().warn ("error starting service", e);
102        }
103    }
104    public void stopService () {
105        try {
106            for (int i=0; i<maxConnections; i++) {
107                sp.out(in, new Object());
108            }
109        } catch (Exception e) {
110            getLog().warn ("error stopping service", e);
111        }
112    }
113    public void destroyService () {
114        NameRegistrar.unregister (getName ());
115        NameRegistrar.unregister ("channel." + getName ());
116    }
117
118
119    /**
120     * Queue a message to be transmitted by this adaptor
121     * @param m message to send
122     */
123    public void send (ISOMsg m) {
124        sp.out (in, m);
125    }
126    /**
127     * Queue a message to be transmitted by this adaptor
128     * @param m message to send
129     * @param timeout in millis
130     */
131    public void send (ISOMsg m, long timeout) {
132        sp.out (in, m, timeout);
133    }
134
135    /**
136     * Receive message
137     */
138    public ISOMsg receive () {
139        return (ISOMsg) sp.in (out);
140    }
141
142    /**
143     * Receive message
144     * @param timeout time to wait for an incoming message
145     */
146    public ISOMsg receive (long timeout) {
147        return (ISOMsg) sp.in (out, timeout);
148    }
149
150    /** Per-message worker that opens an ad-hoc channel, sends the request, and forwards any response. */
151    public class Worker implements Runnable {
152        ISOChannel channel;
153        int id;
154        /**
155         * Constructs a Worker with the given identifier.
156         *
157         * @param i worker identifier (used in the thread name)
158         */
159        public Worker (int i) {
160            super ();
161            id = i;
162        }
163        public void run () {
164            Thread.currentThread().setName ("channel-worker-" + id);
165            int[] handbackFields = cfg.getInts ("handback-field");
166            while (running ()){
167                try {
168                    Object o = sp.in (in, delay);
169                    if (o instanceof ISOMsg) {
170                        ISOMsg m = (ISOMsg) o;
171                        ISOMsg handBack = null;
172                        if (handbackFields.length > 0)
173                            handBack = (ISOMsg) m.clone (handbackFields);
174                        for (int i=0; !channel.isConnected() 
175                                && i<maxConnectAttempts; i++) 
176                        {
177                            channel.reconnect();
178                            if (!channel.isConnected())
179                                ISOUtil.sleep (1000L);
180                        }
181                        if (channel.isConnected()) {
182                            channel.send (m);
183                            m = channel.receive();
184                            channel.disconnect();
185                            if (handBack != null)
186                                m.merge (handBack);
187                            sp.out (out, m);
188                        }
189                    }
190                } catch (Exception e) { 
191                    getLog().warn ("channel-worker-"+id, e.getMessage ());
192                    ISOUtil.sleep (1000);
193                } finally {
194                    try {
195                        channel.disconnect();
196                    } catch (Exception e) {
197                        getLog().warn ("channel-worker-"+id, e.getMessage ());
198                    }
199                }
200            }
201        }
202
203        /**
204         * Builds and configures the channel used by this worker for one request/response cycle.
205         *
206         * @throws ConfigurationException if the channel configuration is missing or invalid
207         */
208        public void initChannel () throws ConfigurationException {
209            Element persist = getPersist ();
210            Element e = persist.getChild ("channel");
211            if (e == null)
212                throw new ConfigurationException ("channel element missing");
213
214            channel = newChannel (e, getFactory());
215            
216            String socketFactoryString = getSocketFactory();
217            if (socketFactoryString != null && channel instanceof FactoryChannel) {
218                ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString);
219                if (sFac instanceof LogSource) {
220                    ((LogSource) sFac).setLogger(log.getLogger(), getRealm());
221                }
222                getFactory().setConfiguration (sFac, e);
223                ((FactoryChannel)channel).setSocketFactory(sFac);
224            }
225
226        }
227        private ISOChannel newChannel (Element e, QFactory f) 
228            throws ConfigurationException
229        {
230            String channelName  = QFactory.getAttributeValue (e, "class");
231            if (channelName == null)
232                throw new ConfigurationException ("class attribute missing from channel element.");
233            
234            String packagerName = QFactory.getAttributeValue (e, "packager");
235
236            ISOChannel channel = f.newInstance (channelName);
237            if (packagerName != null) {
238                ISOPackager packager = f.newInstance (packagerName);
239                channel.setPackager (packager);
240                f.setConfiguration (packager, e);
241            }
242            QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header"));
243            f.setLogger       (channel, e, getRealm());
244            f.setConfiguration (channel, e);
245
246            if (channel instanceof FilteredChannel) {
247                addFilters ((FilteredChannel) channel, e, f);
248            }
249            if (getName () != null)
250                channel.setName (getName ()+id);
251            return channel;
252        }
253
254        private void addFilters (FilteredChannel channel, Element e, QFactory fact) 
255            throws ConfigurationException
256        {
257            for (Object o : e.getChildren("filter")) {
258                Element f = (Element) o;
259                String clazz = QFactory.getAttributeValue(f, "class");
260                ISOFilter filter = (ISOFilter) fact.newInstance(clazz);
261                fact.setLogger(filter, f);
262                fact.setConfiguration(filter, f);
263                String direction = QFactory.getAttributeValue(f, "direction");
264                if (direction == null)
265                    channel.addFilter(filter);
266                else if ("incoming".equalsIgnoreCase(direction))
267                    channel.addIncomingFilter(filter);
268                else if ("outgoing".equalsIgnoreCase(direction))
269                    channel.addOutgoingFilter(filter);
270                else if ("both".equalsIgnoreCase(direction)) {
271                    channel.addIncomingFilter(filter);
272                    channel.addOutgoingFilter(filter);
273                }
274            }
275        }
276    
277    }
278
279    public synchronized void setInQueue (String in) {
280        String old = this.in;
281        this.in = in;
282        if (old != null)
283            sp.out (old, new Object());
284
285        getPersist().getChild("in").setText (in);
286        setModified (true);
287    }
288
289    public String getInQueue () {
290        return in;
291    }
292
293    public synchronized void setOutQueue (String out) {
294        this.out = out;
295        getPersist().getChild("out").setText (out);
296        setModified (true);
297    }
298
299    public String getOutQueue () {
300        return out;
301    }
302
303    public synchronized void setHost (String host) {
304        setProperty (getProperties ("channel"), "host", host);
305        setModified (true);
306    }
307
308    public String getHost () {
309        return getProperty (getProperties ("channel"), "host");
310    }
311
312    public synchronized void setPort (int port) {
313        setProperty (
314            getProperties ("channel"), "port", Integer.toString (port)
315        );
316        setModified (true);
317    }
318    public int getPort () {
319        int port = 0;
320        try {
321            port = Integer.parseInt (
322                getProperty (getProperties ("channel"), "port")
323            );
324        } catch (NumberFormatException e) {
325            getLog().error (e);
326        }
327        return port;
328    }
329    public synchronized void setSocketFactory (String sFac) {
330        setProperty(getProperties("channel"), "socketFactory", sFac);
331        setModified(true);
332    }
333    public String getSocketFactory() {
334        return getProperty(getProperties ("channel"), "socketFactory");
335    }
336}