001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.Environment;
024import org.jpos.iso.BaseChannel;
025import org.jpos.iso.Channel;
026import org.jpos.iso.FactoryChannel;
027import org.jpos.iso.FilteredChannel;
028import org.jpos.iso.ISOChannel;
029import org.jpos.iso.ISOClientSocketFactory;
030import org.jpos.iso.ISOFilter;
031import org.jpos.iso.ISOMsg;
032import org.jpos.iso.ISOPackager;
033import org.jpos.q2.QBeanSupport;
034import org.jpos.q2.QFactory;
035import org.jpos.space.Space;
036import org.jpos.space.SpaceFactory;
037import org.jpos.space.SpaceUtil;
038import org.jpos.util.LogEvent;
039import org.jpos.util.LogSource;
040import org.jpos.util.Logger;
041import org.jpos.util.NameRegistrar;
042import org.jpos.util.Realm;
043
044import java.io.IOException;
045import java.util.Date;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.SynchronousQueue;
049import java.util.concurrent.ThreadPoolExecutor;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.atomic.AtomicInteger;
052
053/**
054 * OneShotChannelAdaptorMK2 connects and disconnects a channel for every message
055 * exchange. It is similar to OneShotChannelAdaptor but uses a thread pool instead
056 * of opening threads statically and supports mux pooling by exposing channel readiness.
057 *
058 * @author Alejandro Revilla
059 * @author Thomas L. Kjeldsen
060 * @author Victor Salaman
061 */
062@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
063public class OneShotChannelAdaptorMK2
064        extends QBeanSupport
065        implements OneShotChannelAdaptorMK2MBean, Channel, Runnable
066{
067    Space<String, Object> sp;
068    String in, out, ready;
069    long delay;
070    long checkInterval;
071    int maxConnections;
072    int[] handbackFields;
073    ThreadPoolExecutor threadPool = null;
074    AtomicInteger cnt;
075    Element channelElement;
076
077    ScheduledExecutorService checkTimer;
078
079    /** Default constructor. */
080    public OneShotChannelAdaptorMK2()
081    {
082        super();
083    }
084
085    @Override
086    protected String defaultRealm() {
087        return Realm.COMM_CLIENT;
088    }
089
090    @SuppressWarnings("unchecked")
091    private Space<String, Object> grabSpace(Element e)
092    {
093        return (Space<String, Object>) SpaceFactory.getSpace(e != null ? e.getText() : "");
094    }
095
096    @Override
097    protected void initService() throws Exception
098    {
099        Element persist = getPersist();
100        channelElement = persist.getChild("channel");
101        if (channelElement == null)
102        {
103            throw new ConfigurationException("channel element missing");
104        }
105        sp = grabSpace(persist.getChild("space"));
106        in = Environment.get(persist.getChildTextTrim ("in"));
107        out = Environment.get(persist.getChildTextTrim ("out"));
108        ready = getName() + ".ready";
109
110        String s = Environment.get(persist.getChildTextTrim ("max-connections"));
111        maxConnections = s != null ? Integer.parseInt(s) : 1;
112        handbackFields = cfg.getInts("handback-field");
113
114        s = Environment.get(persist.getChildTextTrim ("delay"));
115        delay = s != null ? Integer.parseInt(s) : 2500;
116
117        s = Environment.get(persist.getChildTextTrim("check-interval"));
118        checkInterval = s != null ? Integer.parseInt(s) : 60000;
119
120        NameRegistrar.register(getName(), this);
121    }
122
123    public void startService()
124    {
125        cnt = new AtomicInteger(0);
126        threadPool = new ThreadPoolExecutor(1,
127                                            maxConnections,
128                                            10,
129                                            TimeUnit.SECONDS,
130                                            new SynchronousQueue<Runnable>());
131        new Thread(this).start();
132
133        checkTimer=Executors.newScheduledThreadPool(1);
134        checkTimer.scheduleAtFixedRate(new CheckChannelTask(), 0L, checkInterval,TimeUnit.MILLISECONDS);
135    }
136
137    public void stopService()
138    {
139        if(checkTimer!=null)
140        {
141            checkTimer.shutdown();
142            checkTimer=null;
143        }
144
145        takeOffline();
146        sp.out(in, new Object());
147        threadPool.shutdown();
148        while (!threadPool.isTerminated())
149        {
150            try
151            {
152                Thread.sleep(1000L);
153            }
154            catch (InterruptedException e)
155            {
156            }
157        }
158
159        int c=0;
160        while(running())
161        {
162            try
163            {
164                Thread.sleep(500);
165            }
166            catch (InterruptedException e)
167            {
168            }
169            c++;
170            if(c>10) break;
171        }
172    }
173
174    public void destroyService()
175    {
176        NameRegistrar.unregister(getName());
177    }
178
179    public boolean isConnected()
180    {
181        return sp != null && sp.rdp(ready) != null;
182    }
183
184    @Override
185    @SuppressWarnings({"StatementWithEmptyBody", "ConstantConditions"})
186    public void run()
187    {
188        while (running())
189        {
190            try
191            {
192                Object o = sp.in(in, delay);
193                if (o instanceof ISOMsg)
194                {
195                    if(!isConnected())
196                    {
197                        continue;
198                    }
199                    ISOMsg m = (ISOMsg) o;
200                    int i = cnt.incrementAndGet();
201                    if (i > 9999)
202                    {
203                        cnt.set(0);
204                        i = cnt.incrementAndGet();
205                    }
206                    threadPool.execute(new Worker(m, i));
207                }
208            }
209            catch (Exception e)
210            {
211                getLog().warn(getName(), e.getMessage());
212            }
213        }
214    }
215
216    private class CheckChannelTask implements Runnable
217    {
218        @Override
219        public void run()
220        {
221            try
222            {
223                Date lastOnline = (Date) sp.rdp(ready);
224                final LogEvent ev = getLog().createLogEvent("status");
225                if (isChannelConnectable(true))
226                {
227                    if (lastOnline == null)
228                    {
229                        ev.addMessage("Channel is now online");
230                        Logger.log(ev);
231                        flushInput();
232                    }
233                    takeOnline();
234                }
235                else
236                {
237                    takeOffline();
238                    if (lastOnline != null)
239                    {
240                        ev.addMessage("Channel is now offline");
241                        Logger.log(ev);
242                    }
243                }
244            }
245            catch (Throwable e)
246            {
247                getLog().warn(getName(), e.getMessage());
248            }
249        }
250
251        private boolean isChannelConnectable(boolean showExceptions)
252        {
253            boolean res = false;
254
255            ISOChannel channel = null;
256            try
257            {
258                channel = newChannel(channelElement, getFactory());
259                if (channel instanceof BaseChannel)
260                {
261                    BaseChannel bc = (BaseChannel) channel;
262                    bc.setLogger(null, null);
263                }
264                channel.connect();
265                res = true;
266            }
267            catch (Exception e)
268            {
269                if (showExceptions)
270                {
271                    getLog().error(e.getMessage());
272                }
273            }
274            finally
275            {
276                if (channel != null && channel.isConnected())
277                {
278                    try
279                    {
280                        channel.disconnect();
281                    }
282                    catch (IOException e)
283                    {
284                        getLog().error(e);
285                    }
286                    NameRegistrar.unregister("channel."+channel.getName());
287                }
288            }
289
290            return res;
291        }
292    }
293
294    private void flushInput()
295    {
296        SpaceUtil.wipe(sp,in);
297    }
298
299    private void takeOffline()
300    {
301        SpaceUtil.wipe(sp, ready);
302    }
303
304    private void takeOnline()
305    {
306        sp.put(ready, new Date());
307    }
308
309    /**
310     * Sends a message via the inbound queue with no expiration.
311     *
312     * @param m message to send
313     */
314    public void send(ISOMsg m)
315    {
316        sp.out(in, m);
317    }
318
319    /**
320     * Sends a message via the inbound queue with a per-entry lease.
321     *
322     * @param m message to send
323     * @param timeout entry lease in milliseconds
324     */
325    public void send(ISOMsg m, long timeout)
326    {
327        sp.out(in, m, timeout);
328    }
329
330    public ISOMsg receive()
331    {
332        return (ISOMsg) sp.in(out);
333    }
334
335    public ISOMsg receive(long timeout)
336    {
337        return (ISOMsg) sp.in(out, timeout);
338    }
339
340    private ISOChannel newChannel(Element e, QFactory f)
341            throws ConfigurationException
342    {
343        String channelName = QFactory.getAttributeValue(e, "class");
344        if (channelName == null)
345        {
346            throw new ConfigurationException("class attribute missing from channel element.");
347        }
348
349        String packagerName = QFactory.getAttributeValue(e, "packager");
350
351        ISOChannel channel = (ISOChannel) f.newInstance(channelName);
352        ISOPackager packager;
353        if (packagerName != null)
354        {
355            packager = (ISOPackager) f.newInstance(packagerName);
356            channel.setPackager(packager);
357            f.setConfiguration(packager, e);
358        }
359        QFactory.invoke(channel, "setHeader", QFactory.getAttributeValue(e, "header"));
360        f.setLogger(channel, e, getRealm());
361        f.setConfiguration(channel, e);
362
363        if (channel instanceof FilteredChannel)
364        {
365            addFilters((FilteredChannel) channel, e, f);
366        }
367
368        String socketFactoryString = getSocketFactory();
369        if (socketFactoryString != null && channel instanceof FactoryChannel)
370        {
371            ISOClientSocketFactory sFac = (ISOClientSocketFactory) getFactory().newInstance(socketFactoryString);
372            if (sFac != null && sFac instanceof LogSource)
373            {
374                ((LogSource) sFac).setLogger(log.getLogger(), getRealm());
375            }
376            getFactory().setConfiguration(sFac, e);
377            ((FactoryChannel) channel).setSocketFactory(sFac);
378        }
379
380        return channel;
381    }
382
383    private void addFilters(FilteredChannel channel, Element e, QFactory fact)
384            throws ConfigurationException
385    {
386        for (Object o : e.getChildren("filter"))
387        {
388            Element f = (Element) o;
389            String clazz = QFactory.getAttributeValue(f, "class");
390            ISOFilter filter = (ISOFilter) fact.newInstance(clazz);
391            fact.setLogger(filter, f);
392            fact.setConfiguration(filter, f);
393            String direction = QFactory.getAttributeValue(f, "direction");
394            if (direction == null)
395            {
396                channel.addFilter(filter);
397            }
398            else if ("incoming".equalsIgnoreCase(direction))
399            {
400                channel.addIncomingFilter(filter);
401            }
402            else if ("outgoing".equalsIgnoreCase(direction))
403            {
404                channel.addOutgoingFilter(filter);
405            }
406            else if ("both".equalsIgnoreCase(direction))
407            {
408                channel.addIncomingFilter(filter);
409                channel.addOutgoingFilter(filter);
410            }
411        }
412    }
413
414    public String getInQueue()
415    {
416        return in;
417    }
418
419    public synchronized void setInQueue(String in)
420    {
421        String old = this.in;
422        this.in = in;
423        if (old != null)
424        {
425            sp.out(old, new Object());
426        }
427
428        getPersist().getChild("in").setText(in);
429        setModified(true);
430    }
431
432    public String getOutQueue()
433    {
434        return out;
435    }
436
437    public synchronized void setOutQueue(String out)
438    {
439        this.out = out;
440        getPersist().getChild("out").setText(out);
441        setModified(true);
442    }
443
444    public String getHost()
445    {
446        return getProperty(getProperties("channel"), "host");
447    }
448
449    public synchronized void setHost(String host)
450    {
451        setProperty(getProperties("channel"), "host", host);
452        setModified(true);
453    }
454
455    public int getPort()
456    {
457        int port = 0;
458        try
459        {
460            port = Integer.parseInt(
461                    getProperty(getProperties("channel"), "port")
462            );
463        }
464        catch (NumberFormatException e)
465        {
466            getLog().error(e);
467        }
468        return port;
469    }
470
471    public synchronized void setPort(int port)
472    {
473        setProperty(
474                getProperties("channel"), "port", Integer.toString(port)
475        );
476        setModified(true);
477    }
478
479    /**
480     * Returns the configured socket-factory class name.
481     *
482     * @return socket factory class name
483     */
484    public String getSocketFactory()
485    {
486        return getProperty(getProperties("channel"), "socketFactory");
487    }
488
489    /**
490     * Sets the socket-factory class name.
491     *
492     * @param sFac socket factory class name
493     */
494    public synchronized void setSocketFactory(String sFac)
495    {
496        setProperty(getProperties("channel"), "socketFactory", sFac);
497        setModified(true);
498    }
499
500    /** Per-message worker that opens an ad-hoc channel, sends a request, and forwards the response. */
501    public class Worker implements Runnable
502    {
503        ISOMsg req;
504        int id;
505
506        /**
507         * Constructs a Worker for the given request.
508         *
509         * @param req message to send
510         * @param id worker identifier (used in the thread name)
511         */
512        public Worker(ISOMsg req, int id)
513        {
514            this.req = req;
515            this.id = id;
516        }
517
518        public void run()
519        {
520            Thread.currentThread().setName("channel-worker-" + id);
521            ISOChannel channel = null;
522
523            try
524            {
525                channel = newChannel(channelElement, getFactory());
526                if (getName() != null)
527                {
528                    channel.setName(getName() + id);
529                }
530
531                ISOMsg handBack = null;
532                if (handbackFields.length > 0)
533                {
534                    handBack = (ISOMsg) req.clone(handbackFields);
535                }
536                try
537                {
538                    channel.connect();
539                }
540                catch (Throwable e)
541                {
542                    takeOffline();
543                }
544                if (channel.isConnected())
545                {
546                    takeOnline();
547                    channel.send(req);
548                    ISOMsg rsp = channel.receive();
549                    channel.disconnect();
550                    if (handBack != null)
551                    {
552                        rsp.merge(handBack);
553                    }
554                    sp.out(out, rsp);
555                }
556            }
557            catch (Exception e)
558            {
559                getLog().warn("channel-worker-" + id, e.getMessage());
560            }
561            finally
562            {
563                try
564                {
565                    if (channel != null)
566                    {
567                        channel.disconnect();
568                    }
569                }
570                catch (Exception e)
571                {
572                    getLog().warn("channel-worker-" + id, e.getMessage());
573                }
574                finally
575                {
576                    NameRegistrar.unregister("channel." + getName() + id);
577                }
578            }
579        }
580    }
581}