001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import io.micrometer.core.instrument.Gauge;
022import io.micrometer.core.instrument.Tags;
023import io.micrometer.core.instrument.binder.BaseUnits;
024import org.jdom2.Element;
025import org.jpos.core.ConfigurationException;
026import org.jpos.core.Environment;
027import org.jpos.core.annotation.Config;
028import org.jpos.core.handlers.exception.ExceptionHandlerAware;
029import org.jpos.core.handlers.exception.ExceptionHandlerConfigAware;
030import org.jpos.iso.*;
031import org.jpos.metrics.MeterFactory;
032import org.jpos.metrics.MeterInfo;
033import org.jpos.metrics.iso.ISOMsgCounter;
034import org.jpos.metrics.iso.ISOMsgMetrics;
035import org.jpos.q2.QBeanSupport;
036import org.jpos.q2.QFactory;
037import org.jpos.space.Space;
038import org.jpos.space.SpaceFactory;
039import org.jpos.space.SpaceUtil;
040import org.jpos.util.LogSource;
041import org.jpos.util.Loggeable;
042import org.jpos.util.NameRegistrar;
043import org.jpos.util.Realm;
044
045import java.io.EOFException;
046import java.io.IOException;
047import java.io.PrintStream;
048import java.net.SocketTimeoutException;
049import java.time.Duration;
050import java.time.Instant;
051import java.time.ZoneId;
052import java.util.Date;
053import java.util.concurrent.ExecutorService;
054import java.util.concurrent.Executors;
055import java.util.concurrent.ScheduledExecutorService;
056import java.util.concurrent.TimeUnit;
057
058/**
059 * A Q2 adaptor that wraps an {@link org.jpos.iso.ISOChannel} for use within the Q2 container.
060 * @author Alejandro Revilla
061 */
062@SuppressWarnings("unchecked")
063public class ChannelAdaptor
064    extends QBeanSupport
065    implements ChannelAdaptorMBean, Channel, Loggeable, ExceptionHandlerConfigAware
066{
067    /** The space used for inter-process message passing. */
068    protected Space sp;
069    private ISOChannel channel;
070    String in, out, ready, reconnect;
071    long delay;
072    boolean keepAlive = false;
073    boolean ignoreISOExceptions = false;
074    boolean writeOnly = false;
075    int rx, tx, connects;
076    long lastTxn = 0L;
077    long timeout = 0L;
078    boolean waitForWorkersOnStop;
079    private Thread receiver;
080    private Thread sender;
081    private final Object disconnectLock = Boolean.TRUE;
082
083    private ExecutorService executor;
084    private ScheduledExecutorService scheduledExecutor;
085
086    private Gauge connectionsGauge;
087
088    @Config("soft-stop") private long softStop;
089
090    /** Default constructor. */
091    public ChannelAdaptor () {
092        super ();
093        resetCounters();
094    }
095
096    @Override
097    protected String defaultRealm() {
098        return Realm.COMM_CHANNEL;
099    }
100
101    public void initService() throws ConfigurationException {
102        if (softStop < 0)
103            throw new ConfigurationException ("Invalid soft-stop %d".formatted(Long.valueOf(softStop)));
104        initSpaceAndQueues();
105        NameRegistrar.register (getName(), this);
106        executor = QFactory.executorService(cfg.getBoolean("virtual-threads", false));
107        scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
108            Thread.ofVirtual().factory()
109        );
110    }
111    public void startService () {
112        try {
113            channel = initChannel ();
114            executor.submit(new Sender());
115            if (!writeOnly) { // fixes #426 && jPOS-20
116                executor.submit (new Receiver());
117            }
118            initMeters();
119        } catch (Exception e) {
120            getLog().warn ("error starting service", e);
121        }
122    }
123    public void stopService () {
124        try {
125            sp.out (in, Boolean.TRUE);
126            if (channel != null) {
127                if (softStop > 0L)
128                    disconnectLater(softStop);
129                else
130                    disconnect();
131            }
132            if (waitForWorkersOnStop)
133                executor.awaitTermination(Math.max(5000L, softStop), TimeUnit.MILLISECONDS);
134            sender = null;
135            receiver = null;
136            removeMeters();
137        } catch (Exception e) {
138            getLog().warn ("error disconnecting from remote host", e);
139        }
140    }
141    public void destroyService () {
142        NameRegistrar.unregister (getName ());
143        NameRegistrar.unregister ("channel." + getName ());
144    }
145
146    public synchronized void setReconnectDelay (long delay) {
147        getPersist().getChild ("reconnect-delay")
148            .setText (Long.toString (delay));
149        this.delay = delay;
150        setModified (true);
151    }
152    public long getReconnectDelay () {
153        return delay;
154    }
155    public synchronized void setInQueue (String in) {
156        String old = this.in;
157        this.in = in;
158        if (old != null)
159            sp.out (old, Boolean.TRUE);
160
161        getPersist().getChild("in").setText (in);
162        setModified (true);
163    }
164    public String getInQueue () {
165        return in;
166    }
167    public synchronized void setOutQueue (String out) {
168        this.out = out;
169        getPersist().getChild("out").setText (out);
170        setModified (true);
171    }
172
173    /**
174     * Queue a message to be transmitted by this adaptor
175     * @param m message to send
176     */
177    public void send (ISOMsg m) {
178        sp.out (in, m);
179    }
180    /**
181     * Queue a message to be transmitted by this adaptor
182     * @param m message to send
183     * @param timeout timeout in millis
184     */
185    public void send (ISOMsg m, long timeout) {
186        sp.out (in, m, timeout);
187    }
188
189    /**
190     * Receive message
191     */
192    public ISOMsg receive () {
193        return (ISOMsg) sp.in (out);
194    }
195
196    /**
197     * Receive message
198     * @param timeout time to wait for an incoming message
199     */
200    public ISOMsg receive (long timeout) {
201        return (ISOMsg) sp.in (out, timeout);
202    }
203    /**
204     * @return true if channel is connected
205     */
206    public boolean isConnected () {
207        return sp != null && sp.rdp (ready) != null;
208    }
209
210    public String getOutQueue () {
211        return out;
212    }
213
214    /**
215     * Parses a {@code <channel>} element, returning an {@link ISOChannel}.
216     * @param e the configuration element
217     * @param f the QFactory
218     * @return a configured ISOChannel
219     * @throws ConfigurationException on configuration error
220     */
221    public ISOChannel newChannel (Element e, QFactory f) throws ConfigurationException {
222        return newChannel(e, f, getRealm());
223    }
224
225    /**
226     * Parses a {@code <channel>} element, using the provided fallback realm when none is configured.
227     * @param e             the configuration element
228     * @param f             the QFactory
229     * @param fallbackRealm realm to use if none is configured
230     * @return a configured ISOChannel
231     * @throws ConfigurationException on configuration error
232     */
233    public ISOChannel newChannel (Element e, QFactory f, String fallbackRealm) throws ConfigurationException {
234        String channelName  = QFactory.getAttributeValue (e, "class");
235        String packagerName = QFactory.getAttributeValue (e, "packager");
236
237        ISOChannel channel = f.newInstance(channelName);
238        if (packagerName != null) {
239            ISOPackager packager = f.newInstance(packagerName);
240            channel.setPackager (packager);
241            f.setConfiguration (packager, e);
242        }
243        QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header"));
244        f.setLogger       (channel, e, fallbackRealm);
245        f.setConfiguration (channel, e);
246
247        if (channel instanceof FilteredChannel) {
248            addFilters ((FilteredChannel) channel, e, f);
249        }
250
251        if (channel instanceof ExceptionHandlerAware) {
252            addExceptionHandlers((ExceptionHandlerAware) channel, e, f);
253        }
254
255        if (channel instanceof ISOMsgMetrics.Source metricsChannel) {
256            String type = "default";                                    // default alias, in case metrics not defined
257            String clazz = null;
258
259            Element met = e.getChild("metrics");
260            if (met != null) {
261                if (QFactory.isEnabled(met)) {
262                    clazz = QFactory.getAttributeValue(met, "class");
263                    String typeAttr = QFactory.getAttributeValue(met, "type");
264                    type =  (clazz != null)     ? "class"  :            // class attribute has precedence over type
265                            (typeAttr != null)  ? typeAttr :
266                            type;
267                } else {
268                    type = "none";                                      // <metrics enabled="false" /> equivalent to type="none"
269                }
270            }
271
272            ISOMsgMetrics m = switch (type) {
273                case "none" -> null;
274
275                case "default" -> {
276                    var mc = new ISOMsgCounter();
277                    if (met != null)
278                        f.setLogger(mc, met);
279                    else
280                        mc.setLogger(this.getLog().getLogger(), this.getRealm());
281                    yield mc;
282                }
283
284                case "counter" -> {
285                    var mc = new ISOMsgCounter();
286                    f.setLogger(mc, met);
287                    f.setConfiguration(mc, met);
288                    yield mc;
289                }
290
291                case "class" -> {
292                    ISOMsgMetrics mc = f.newInstance(clazz);
293                    f.setLogger(mc, met);
294                    f.setConfiguration(mc, met);
295                    yield mc;
296                }
297
298                default -> throw new ConfigurationException("Unknown metric type '"+type+"'");
299            };
300
301            metricsChannel.setISOMsgMetrics(m);
302        } // metrics config
303
304        if (getName () != null)
305            channel.setName (getName ());
306
307        return channel;
308    }
309
310    /**
311     * Registers filters defined in the configuration element with the channel.
312     * @param channel the channel to configure
313     * @param e       the configuration element
314     * @param fact    the QFactory
315     * @throws ConfigurationException on error
316     */
317    protected void addFilters (FilteredChannel channel, Element e, QFactory fact)
318        throws ConfigurationException
319    {
320        for (Element f : e.getChildren("filter")) {
321            ISOFilter filter = fact.newInstance(f);
322            if (filter == null) continue;
323            String direction = QFactory.getAttributeValue(f, "direction");
324            if (direction == null)
325                channel.addFilter(filter);
326            else if ("incoming".equalsIgnoreCase(direction))
327                channel.addIncomingFilter(filter);
328            else if ("outgoing".equalsIgnoreCase(direction))
329                channel.addOutgoingFilter(filter);
330            else if ("both".equalsIgnoreCase(direction)) {
331                channel.addIncomingFilter(filter);
332                channel.addOutgoingFilter(filter);
333            }
334        }
335    }
336
337
338    /**
339     * Initialises and returns the ISOChannel from the current configuration.
340     * @return the initialised channel
341     * @throws ConfigurationException on error
342     */
343    protected ISOChannel initChannel () throws ConfigurationException {
344        Element persist = getPersist ();
345        Element e = persist.getChild ("channel");
346        if (e == null)
347            throw new ConfigurationException ("channel element missing");
348
349        ISOChannel c = newChannel (e, getFactory());
350
351        String socketFactoryString = getSocketFactory();
352        if (socketFactoryString != null && c instanceof FactoryChannel) {
353            ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString);
354            if (sFac instanceof LogSource) {
355                ((LogSource) sFac).setLogger(log.getLogger(), getRealm());
356            }
357            getFactory().setConfiguration (sFac, e);
358            ((FactoryChannel)c).setSocketFactory(sFac);
359        }
360
361        return c;
362    }
363
364    /**
365     * Initialises the Space and in/out queues used by this adaptor.
366     * @throws ConfigurationException on error
367     */
368    protected void initSpaceAndQueues () throws ConfigurationException {
369        Element persist = getPersist ();
370        sp = grabSpace (persist.getChild ("space"));
371        in      = Environment.get(persist.getChildTextTrim ("in"));
372        out     = Environment.get(persist.getChildTextTrim ("out"));
373        writeOnly = "yes".equalsIgnoreCase (getPersist().getChildTextTrim ("write-only"));
374        if (in == null || (out == null && !writeOnly)) {
375            throw new ConfigurationException ("Misconfigured channel. Please verify in/out queues");
376        }
377        String s = Environment.get(persist.getChildTextTrim ("reconnect-delay"));
378        delay    = s != null ? Long.parseLong (s) : 10000; // reasonable default
379        keepAlive = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("keep-alive")));
380        ignoreISOExceptions = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("ignore-iso-exceptions")));
381        String t = Environment.get(persist.getChildTextTrim("timeout"));
382        timeout = t != null && t.length() > 0 ? Long.parseLong(t) : 0L;
383        ready   = getName() + ".ready";
384        reconnect = getName() + ".reconnect";
385        waitForWorkersOnStop = "yes".equalsIgnoreCase(Environment.get(persist.getChildTextTrim ("wait-for-workers-on-stop")));
386    }
387
388    /** Background thread that forwards outgoing messages to the channel. */
389    @SuppressWarnings("unchecked")
390    public class Sender implements Runnable {
391        /** Default constructor. */
392        public Sender () {
393            super ();
394        }
395        public void run () {
396            Thread.currentThread().setName ("channel-sender-" + in);
397
398            while (running()){
399                try {
400                    checkConnection ();
401                    if (!running())
402                        break;
403                    Object o = sp.in (in, delay);
404                    if (o instanceof ISOMsg m) {
405                        if (!channel.isConnected()) {
406                            // push back the message so it can be handled by another channel adaptor
407                            sp.push(in, o);
408                            continue;
409                        }
410                        channel.send(m);
411                        tx++;
412                    } else if (o instanceof Integer) {
413                        if ((int)o != hashCode()) {
414                            // STOP indicator seems to be for another channel adaptor
415                            // sharing the same queue push it back and allow the companion
416                            // channel to get it
417                            sp.push (in, o, 500L);
418                            ISOUtil.sleep (1000L); // larger sleep so that the indicator has time to timeout
419                        }
420                    }
421                    else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) {
422                        ((BaseChannel)channel).sendKeepAlive();
423                    }
424                } catch (ISOFilter.VetoException e) {
425                    // getLog().warn ("channel-sender-"+in, e.getMessage ());
426                } catch (ISOException e) {
427                    // getLog().warn ("channel-sender-"+in, e.getMessage ());
428                    if (!ignoreISOExceptions) {
429                        disconnect ();
430                    }
431                    ISOUtil.sleep (1000); // slow down on errors
432                } catch (Exception e) {
433                    // getLog().warn ("channel-sender-"+in, e.getMessage ());
434                    disconnect ();
435                    ISOUtil.sleep (1000);
436                }
437            }
438        }
439    }
440    /** Background thread that reads incoming messages from the channel. */
441    @SuppressWarnings("unchecked")
442    public class Receiver implements Runnable {
443        /** Default constructor. */
444        public Receiver () {
445            super ();
446        }
447        public void run () {
448            Thread.currentThread().setName ("channel-receiver-"+out);
449            boolean shuttingDown = false;
450            Instant shutdownDeadline = null;
451            final Duration gracePeriod = Duration.ofMillis(softStop);
452            while (true) {
453                if (!shuttingDown && !running()) {
454                    if (gracePeriod.isZero())
455                        break;
456                    shuttingDown = true;
457                    shutdownDeadline = Instant.now().plus(gracePeriod);
458                    getLog().info("soft-stop (%s)".formatted(shutdownDeadline.atZone(ZoneId.systemDefault())));
459                }
460                final boolean shouldExit = shuttingDown
461                  ? Instant.now().isAfter(shutdownDeadline)
462                  : !running();
463
464                if (shouldExit) {
465                    getLog().info ("stop");
466                    break;
467                }
468                try {
469                    Object r = sp.rd (ready, 5000L);
470                    if (r == null) {
471                        continue;
472                    }
473                    ISOMsg m = channel.receive ();
474                    rx++;
475                    lastTxn = System.currentTimeMillis();
476                    if (timeout > 0)
477                        sp.out (out, m, timeout);
478                    else
479                        sp.out (out, m);
480                } catch (ISOFilter.VetoException e) {
481                    // getLog().warn ("channel-receiver-"+out+"-veto-exception", e.getMessage());
482                } catch (ISOException e) {
483                    if (running()) {
484                        // getLog().warn ("channel-receiver-"+out, e);
485                        if (!ignoreISOExceptions) {
486                            sp.out (reconnect, Boolean.TRUE, delay);
487                            disconnect ();
488                            sp.push (in, hashCode()); // wake-up Sender
489                        }
490                        ISOUtil.sleep(1000);
491                    }
492                } catch (SocketTimeoutException | EOFException e) {
493                    if (running()) {
494                        // getLog().warn ("channel-receiver-"+out, "Read timeout / EOF - reconnecting");
495                        sp.out (reconnect, Boolean.TRUE, delay);
496                        disconnect ();
497                        sp.push (in, hashCode()); // wake-up Sender
498                        ISOUtil.sleep(1000);
499                    }
500                } catch (Exception e) {
501                    if (running()) {
502                        // getLog().warn ("channel-receiver-"+out, e);
503                        sp.out (reconnect, Boolean.TRUE, delay);
504                        disconnect ();
505                        sp.push (in, hashCode()); // wake-up Sender
506                        ISOUtil.sleep(1000);
507                    }
508                }
509            }
510            disconnect();
511        }
512    }
513    /**
514     * Waits until the reconnect token clears, then attempts to reconnect.
515     */
516    protected void checkConnection () {
517        while (running() && sp.rdp (reconnect) != null) {
518            ISOUtil.sleep(1000);
519        }
520        while (running() && !channel.isConnected ()) {
521            SpaceUtil.wipe(sp, ready);
522            try {
523                channel.connect ();
524            } catch (IOException ignored) {
525                // channel.connect already logs - no need for more warnings
526            }
527            if (!channel.isConnected ())
528                ISOUtil.sleep (delay);
529            else
530                connects++;
531        }
532        if (running() && sp.rdp (ready) == null)
533            sp.out (ready, new Date());
534    }
535    /**
536     * Disconnects the channel and releases associated resources.
537     */
538    protected void disconnect () {
539        // do not synchronize on this as both Sender and Receiver can deadlock against a thread calling stop()
540        synchronized (disconnectLock) {
541            try {
542                SpaceUtil.wipe(sp, ready);
543                channel.disconnect();
544            } catch (Exception e) {
545                getLog().warn("disconnect", e);
546            }
547        }
548    }
549    private void disconnectLater(long delayInMillis) {
550         SpaceUtil.wipe(sp, ready);
551         scheduledExecutor.schedule(this::disconnect, delayInMillis, TimeUnit.MILLISECONDS);
552    }
553    public synchronized void setHost (String host) {
554        setProperty (getProperties ("channel"), "host", host);
555        setModified (true);
556    }
557    public String getHost () {
558        return getProperty (getProperties ("channel"), "host");
559    }
560    public synchronized void setPort (int port) {
561        setProperty (
562            getProperties ("channel"), "port", Integer.toString (port)
563        );
564        setModified (true);
565    }
566    public int getPort () {
567        int port = 0;
568        try {
569            port = Integer.parseInt (
570                getProperty (getProperties ("channel"), "port")
571            );
572        } catch (NumberFormatException e) {
573            getLog().error(e);
574        }
575        return port;
576    }
577    public synchronized void setSocketFactory (String sFac) {
578        setProperty(getProperties("channel"), "socketFactory", sFac);
579        setModified(true);
580    }
581
582    public void resetCounters () {
583        rx = tx = connects = 0;
584        lastTxn = 0L;
585    }
586    public String getCountersAsString () {
587        StringBuilder sb = new StringBuilder();
588        append (sb, "tx=", tx);
589        append (sb, ", rx=", rx);
590        append (sb, ", connects=", connects);
591        sb.append (", last=");
592        sb.append(lastTxn);
593        if (lastTxn > 0) {
594            sb.append (", idle=");
595            sb.append(System.currentTimeMillis() - lastTxn);
596            sb.append ("ms");
597        }
598        return sb.toString();
599    }
600    public int getTXCounter() {
601        return tx;
602    }
603    public int getRXCounter() {
604        return rx;
605    }
606    public int getConnectsCounter () {
607        return connects;
608    }
609    public long getLastTxnTimestampInMillis() {
610        return lastTxn;
611    }
612    public long getIdleTimeInMillis() {
613        return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L;
614    }
615    public String getSocketFactory() {
616        return getProperty(getProperties ("channel"), "socketFactory");
617    }
618    public void dump (PrintStream p, String indent) {
619        p.println (indent + getCountersAsString());
620    }
621    /**
622     * Returns the Space referenced by the given element, or the default space if null.
623     * @param e the element whose text names the space
624     * @return the resolved Space
625     */
626    protected Space grabSpace (Element e) {
627        return SpaceFactory.getSpace (e != null ? e.getText() : "");
628    }
629    /**
630     * Appends a name=value counter entry to the string builder.
631     * @param sb    the builder to append to
632     * @param name  the counter name
633     * @param value the counter value
634     */
635    protected void append (StringBuilder sb, String name, int value) {
636        sb.append (name);
637        sb.append (value);
638    }
639
640    private void initMeters() {
641        var tags = Tags.of("name", getName(),
642                            "type", "client");
643        var registry = getServer().getMeterRegistry();
644
645        connectionsGauge =
646          MeterFactory.gauge
647            (registry, MeterInfo.ISOCHANNEL_CONNECTION_COUNT,
648              tags,
649              BaseUnits.SESSIONS,
650              () -> isConnected() ? 1 : 0
651            );
652
653        if (channel instanceof ISOMsgMetrics.Source ms) {
654            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
655            if (mtr != null) {
656                mtr.addTags(tags);
657                mtr.register(registry);
658            }
659        }
660    }
661
662    private void removeMeters() {
663        var registry = getServer().getMeterRegistry();
664        registry.remove(connectionsGauge);
665
666        if (channel instanceof ISOMsgMetrics.Source ms) {
667            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
668            if (mtr != null)
669                mtr.removeMeters();
670        }
671    }
672}