001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.Environment;
024import org.jpos.iso.BaseChannel;
025import org.jpos.iso.Channel;
026import org.jpos.iso.FactoryChannel;
027import org.jpos.iso.FilteredChannel;
028import org.jpos.iso.ISOChannel;
029import org.jpos.iso.ISOClientSocketFactory;
030import org.jpos.iso.ISOFilter;
031import org.jpos.iso.ISOMsg;
032import org.jpos.iso.ISOPackager;
033import org.jpos.q2.QBeanSupport;
034import org.jpos.q2.QFactory;
035import org.jpos.space.Space;
036import org.jpos.space.SpaceFactory;
037import org.jpos.space.SpaceUtil;
038import org.jpos.util.LogEvent;
039import org.jpos.util.LogSource;
040import org.jpos.util.Logger;
041import org.jpos.util.NameRegistrar;
042
043import java.io.IOException;
044import java.util.Date;
045import java.util.concurrent.Executors;
046import java.util.concurrent.ScheduledExecutorService;
047import java.util.concurrent.SynchronousQueue;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicInteger;
051
052/**
053 * OneShotChannelAdaptorMK2 connects and disconnects a channel for every message
054 * exchange. It is similar to OneShotChannelAdaptor but uses a thread pool instead
055 * of opening threads statically and supports mux pooling by exposing channel readiness.
056 *
057 * @author Alejandro Revilla
058 * @author Thomas L. Kjeldsen
059 * @author Victor Salaman
060 */
061@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
062public class OneShotChannelAdaptorMK2
063        extends QBeanSupport
064        implements OneShotChannelAdaptorMK2MBean, Channel, Runnable
065{
066    Space<String, Object> sp;
067    String in, out, ready;
068    long delay;
069    long checkInterval;
070    int maxConnections;
071    int[] handbackFields;
072    ThreadPoolExecutor threadPool = null;
073    AtomicInteger cnt;
074    Element channelElement;
075
076    ScheduledExecutorService checkTimer;
077
078    public OneShotChannelAdaptorMK2()
079    {
080        super();
081    }
082
083    @SuppressWarnings("unchecked")
084    private Space<String, Object> grabSpace(Element e)
085    {
086        return (Space<String, Object>) SpaceFactory.getSpace(e != null ? e.getText() : "");
087    }
088
089    @Override
090    protected void initService() throws Exception
091    {
092        Element persist = getPersist();
093        channelElement = persist.getChild("channel");
094        if (channelElement == null)
095        {
096            throw new ConfigurationException("channel element missing");
097        }
098        sp = grabSpace(persist.getChild("space"));
099        in = Environment.get(persist.getChildTextTrim ("in"));
100        out = Environment.get(persist.getChildTextTrim ("out"));
101        ready = getName() + ".ready";
102
103        String s = Environment.get(persist.getChildTextTrim ("max-connections"));
104        maxConnections = s != null ? Integer.parseInt(s) : 1;
105        handbackFields = cfg.getInts("handback-field");
106
107        s = Environment.get(persist.getChildTextTrim ("delay"));
108        delay = s != null ? Integer.parseInt(s) : 2500;
109
110        s = Environment.get(persist.getChildTextTrim("check-interval"));
111        checkInterval = s != null ? Integer.parseInt(s) : 60000;
112
113        NameRegistrar.register(getName(), this);
114    }
115
116    public void startService()
117    {
118        setRealm(getName());
119        cnt = new AtomicInteger(0);
120        threadPool = new ThreadPoolExecutor(1,
121                                            maxConnections,
122                                            10,
123                                            TimeUnit.SECONDS,
124                                            new SynchronousQueue<Runnable>());
125        new Thread(this).start();
126
127        checkTimer=Executors.newScheduledThreadPool(1);
128        checkTimer.scheduleAtFixedRate(new CheckChannelTask(), 0L, checkInterval,TimeUnit.MILLISECONDS);
129    }
130
131    public void stopService()
132    {
133        if(checkTimer!=null)
134        {
135            checkTimer.shutdown();
136            checkTimer=null;
137        }
138
139        takeOffline();
140        sp.out(in, new Object());
141        threadPool.shutdown();
142        while (!threadPool.isTerminated())
143        {
144            try
145            {
146                Thread.sleep(1000L);
147            }
148            catch (InterruptedException e)
149            {
150            }
151        }
152
153        int c=0;
154        while(running())
155        {
156            try
157            {
158                Thread.sleep(500);
159            }
160            catch (InterruptedException e)
161            {
162            }
163            c++;
164            if(c>10) break;
165        }
166    }
167
168    public void destroyService()
169    {
170        NameRegistrar.unregister(getName());
171    }
172
173    public boolean isConnected()
174    {
175        return sp != null && sp.rdp(ready) != null;
176    }
177
178    @Override
179    @SuppressWarnings({"StatementWithEmptyBody", "ConstantConditions"})
180    public void run()
181    {
182        while (running())
183        {
184            try
185            {
186                Object o = sp.in(in, delay);
187                if (o instanceof ISOMsg)
188                {
189                    if(!isConnected())
190                    {
191                        continue;
192                    }
193                    ISOMsg m = (ISOMsg) o;
194                    int i = cnt.incrementAndGet();
195                    if (i > 9999)
196                    {
197                        cnt.set(0);
198                        i = cnt.incrementAndGet();
199                    }
200                    threadPool.execute(new Worker(m, i));
201                }
202            }
203            catch (Exception e)
204            {
205                getLog().warn(getName(), e.getMessage());
206            }
207        }
208    }
209
210    private class CheckChannelTask implements Runnable
211    {
212        @Override
213        public void run()
214        {
215            try
216            {
217                Date lastOnline = (Date) sp.rdp(ready);
218                final LogEvent ev = getLog().createLogEvent("status");
219                if (isChannelConnectable(true))
220                {
221                    if (lastOnline == null)
222                    {
223                        ev.addMessage("Channel is now online");
224                        Logger.log(ev);
225                        flushInput();
226                    }
227                    takeOnline();
228                }
229                else
230                {
231                    takeOffline();
232                    if (lastOnline != null)
233                    {
234                        ev.addMessage("Channel is now offline");
235                        Logger.log(ev);
236                    }
237                }
238            }
239            catch (Throwable e)
240            {
241                getLog().warn(getName(), e.getMessage());
242            }
243        }
244
245        private boolean isChannelConnectable(boolean showExceptions)
246        {
247            boolean res = false;
248
249            ISOChannel channel = null;
250            try
251            {
252                channel = newChannel(channelElement, getFactory());
253                if (channel instanceof BaseChannel)
254                {
255                    BaseChannel bc = (BaseChannel) channel;
256                    bc.setLogger(null, null);
257                }
258                channel.connect();
259                res = true;
260            }
261            catch (Exception e)
262            {
263                if (showExceptions)
264                {
265                    getLog().error(e.getMessage());
266                }
267            }
268            finally
269            {
270                if (channel != null && channel.isConnected())
271                {
272                    try
273                    {
274                        channel.disconnect();
275                    }
276                    catch (IOException e)
277                    {
278                        getLog().error(e);
279                    }
280                    NameRegistrar.unregister("channel."+channel.getName());
281                }
282            }
283
284            return res;
285        }
286    }
287
288    private void flushInput()
289    {
290        SpaceUtil.wipe(sp,in);
291    }
292
293    private void takeOffline()
294    {
295        SpaceUtil.wipe(sp, ready);
296    }
297
298    private void takeOnline()
299    {
300        sp.put(ready, new Date());
301    }
302
303    public void send(ISOMsg m)
304    {
305        sp.out(in, m);
306    }
307
308    public void send(ISOMsg m, long timeout)
309    {
310        sp.out(in, m, timeout);
311    }
312
313    public ISOMsg receive()
314    {
315        return (ISOMsg) sp.in(out);
316    }
317
318    public ISOMsg receive(long timeout)
319    {
320        return (ISOMsg) sp.in(out, timeout);
321    }
322
323    private ISOChannel newChannel(Element e, QFactory f)
324            throws ConfigurationException
325    {
326        String channelName = QFactory.getAttributeValue(e, "class");
327        if (channelName == null)
328        {
329            throw new ConfigurationException("class attribute missing from channel element.");
330        }
331
332        String packagerName = QFactory.getAttributeValue(e, "packager");
333
334        ISOChannel channel = (ISOChannel) f.newInstance(channelName);
335        ISOPackager packager;
336        if (packagerName != null)
337        {
338            packager = (ISOPackager) f.newInstance(packagerName);
339            channel.setPackager(packager);
340            f.setConfiguration(packager, e);
341        }
342        QFactory.invoke(channel, "setHeader", QFactory.getAttributeValue(e, "header"));
343        f.setLogger(channel, e);
344        f.setConfiguration(channel, e);
345
346        if (channel instanceof FilteredChannel)
347        {
348            addFilters((FilteredChannel) channel, e, f);
349        }
350
351        String socketFactoryString = getSocketFactory();
352        if (socketFactoryString != null && channel instanceof FactoryChannel)
353        {
354            ISOClientSocketFactory sFac = (ISOClientSocketFactory) getFactory().newInstance(socketFactoryString);
355            if (sFac != null && sFac instanceof LogSource)
356            {
357                ((LogSource) sFac).setLogger(log.getLogger(), getName() + ".socket-factory");
358            }
359            getFactory().setConfiguration(sFac, e);
360            ((FactoryChannel) channel).setSocketFactory(sFac);
361        }
362
363        return channel;
364    }
365
366    private void addFilters(FilteredChannel channel, Element e, QFactory fact)
367            throws ConfigurationException
368    {
369        for (Object o : e.getChildren("filter"))
370        {
371            Element f = (Element) o;
372            String clazz = QFactory.getAttributeValue(f, "class");
373            ISOFilter filter = (ISOFilter) fact.newInstance(clazz);
374            fact.setLogger(filter, f);
375            fact.setConfiguration(filter, f);
376            String direction = QFactory.getAttributeValue(f, "direction");
377            if (direction == null)
378            {
379                channel.addFilter(filter);
380            }
381            else if ("incoming".equalsIgnoreCase(direction))
382            {
383                channel.addIncomingFilter(filter);
384            }
385            else if ("outgoing".equalsIgnoreCase(direction))
386            {
387                channel.addOutgoingFilter(filter);
388            }
389            else if ("both".equalsIgnoreCase(direction))
390            {
391                channel.addIncomingFilter(filter);
392                channel.addOutgoingFilter(filter);
393            }
394        }
395    }
396
397    public String getInQueue()
398    {
399        return in;
400    }
401
402    public synchronized void setInQueue(String in)
403    {
404        String old = this.in;
405        this.in = in;
406        if (old != null)
407        {
408            sp.out(old, new Object());
409        }
410
411        getPersist().getChild("in").setText(in);
412        setModified(true);
413    }
414
415    public String getOutQueue()
416    {
417        return out;
418    }
419
420    public synchronized void setOutQueue(String out)
421    {
422        this.out = out;
423        getPersist().getChild("out").setText(out);
424        setModified(true);
425    }
426
427    public String getHost()
428    {
429        return getProperty(getProperties("channel"), "host");
430    }
431
432    public synchronized void setHost(String host)
433    {
434        setProperty(getProperties("channel"), "host", host);
435        setModified(true);
436    }
437
438    public int getPort()
439    {
440        int port = 0;
441        try
442        {
443            port = Integer.parseInt(
444                    getProperty(getProperties("channel"), "port")
445            );
446        }
447        catch (NumberFormatException e)
448        {
449            getLog().error(e);
450        }
451        return port;
452    }
453
454    public synchronized void setPort(int port)
455    {
456        setProperty(
457                getProperties("channel"), "port", Integer.toString(port)
458        );
459        setModified(true);
460    }
461
462    public String getSocketFactory()
463    {
464        return getProperty(getProperties("channel"), "socketFactory");
465    }
466
467    public synchronized void setSocketFactory(String sFac)
468    {
469        setProperty(getProperties("channel"), "socketFactory", sFac);
470        setModified(true);
471    }
472
473    public class Worker implements Runnable
474    {
475        ISOMsg req;
476        int id;
477
478        public Worker(ISOMsg req, int id)
479        {
480            this.req = req;
481            this.id = id;
482        }
483
484        public void run()
485        {
486            Thread.currentThread().setName("channel-worker-" + id);
487            ISOChannel channel = null;
488
489            try
490            {
491                channel = newChannel(channelElement, getFactory());
492                if (getName() != null)
493                {
494                    channel.setName(getName() + id);
495                }
496
497                ISOMsg handBack = null;
498                if (handbackFields.length > 0)
499                {
500                    handBack = (ISOMsg) req.clone(handbackFields);
501                }
502                try
503                {
504                    channel.connect();
505                }
506                catch (Throwable e)
507                {
508                    takeOffline();
509                }
510                if (channel.isConnected())
511                {
512                    takeOnline();
513                    channel.send(req);
514                    ISOMsg rsp = channel.receive();
515                    channel.disconnect();
516                    if (handBack != null)
517                    {
518                        rsp.merge(handBack);
519                    }
520                    sp.out(out, rsp);
521                }
522            }
523            catch (Exception e)
524            {
525                getLog().warn("channel-worker-" + id, e.getMessage());
526            }
527            finally
528            {
529                try
530                {
531                    if (channel != null)
532                    {
533                        channel.disconnect();
534                    }
535                }
536                catch (Exception e)
537                {
538                    getLog().warn("channel-worker-" + id, e.getMessage());
539                }
540                finally
541                {
542                    NameRegistrar.unregister("channel." + getName() + id);
543                }
544            }
545        }
546    }
547}