001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import io.micrometer.core.instrument.Gauge;
022import io.micrometer.core.instrument.Tags;
023import io.micrometer.core.instrument.binder.BaseUnits;
024import org.jdom2.Element;
025import org.jpos.core.ConfigurationException;
026import org.jpos.core.Environment;
027import org.jpos.core.annotation.Config;
028import org.jpos.core.handlers.exception.ExceptionHandlerAware;
029import org.jpos.core.handlers.exception.ExceptionHandlerConfigAware;
030import org.jpos.iso.*;
031import org.jpos.metrics.MeterFactory;
032import org.jpos.metrics.MeterInfo;
033import org.jpos.metrics.iso.ISOMsgCounter;
034import org.jpos.metrics.iso.ISOMsgMetrics;
035import org.jpos.q2.QBeanSupport;
036import org.jpos.q2.QFactory;
037import org.jpos.space.Space;
038import org.jpos.space.SpaceFactory;
039import org.jpos.space.SpaceUtil;
040import org.jpos.util.LogSource;
041import org.jpos.util.Loggeable;
042import org.jpos.util.NameRegistrar;
043
044import java.io.EOFException;
045import java.io.IOException;
046import java.io.PrintStream;
047import java.net.SocketTimeoutException;
048import java.time.Duration;
049import java.time.Instant;
050import java.time.ZoneId;
051import java.util.Date;
052import java.util.concurrent.ExecutorService;
053import java.util.concurrent.Executors;
054import java.util.concurrent.ScheduledExecutorService;
055import java.util.concurrent.TimeUnit;
056
057/**
058 * @author Alejandro Revilla
059 */
060@SuppressWarnings("unchecked")
061public class ChannelAdaptor
062    extends QBeanSupport
063    implements ChannelAdaptorMBean, Channel, Loggeable, ExceptionHandlerConfigAware
064{
065    protected Space sp;
066    private ISOChannel channel;
067    String in, out, ready, reconnect;
068    long delay;
069    boolean keepAlive = false;
070    boolean ignoreISOExceptions = false;
071    boolean writeOnly = false;
072    int rx, tx, connects;
073    long lastTxn = 0L;
074    long timeout = 0L;
075    boolean waitForWorkersOnStop;
076    private Thread receiver;
077    private Thread sender;
078    private final Object disconnectLock = Boolean.TRUE;
079
080    private ExecutorService executor;
081    private ScheduledExecutorService scheduledExecutor;
082
083    private Gauge connectionsGauge;
084
085    @Config("soft-stop") private long softStop;
086
087    public ChannelAdaptor () {
088        super ();
089        resetCounters();
090    }
091    public void initService() throws ConfigurationException {
092        if (softStop < 0)
093            throw new ConfigurationException ("Invalid soft-stop %d".formatted(Long.valueOf(softStop)));
094        initSpaceAndQueues();
095        NameRegistrar.register (getName(), this);
096        executor = QFactory.executorService(cfg.getBoolean("virtual-threads", false));
097        scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
098            Thread.ofVirtual().factory()
099        );
100    }
101    public void startService () {
102        try {
103            channel = initChannel ();
104            executor.submit(new Sender());
105            if (!writeOnly) { // fixes #426 && jPOS-20
106                executor.submit (new Receiver());
107            }
108            initMeters();
109        } catch (Exception e) {
110            getLog().warn ("error starting service", e);
111        }
112    }
113    public void stopService () {
114        try {
115            sp.out (in, Boolean.TRUE);
116            if (channel != null) {
117                if (softStop > 0L)
118                    disconnectLater(softStop);
119                else
120                    disconnect();
121            }
122            if (waitForWorkersOnStop)
123                executor.awaitTermination(Math.max(5000L, softStop), TimeUnit.MILLISECONDS);
124            sender = null;
125            receiver = null;
126            removeMeters();
127        } catch (Exception e) {
128            getLog().warn ("error disconnecting from remote host", e);
129        }
130    }
131    public void destroyService () {
132        NameRegistrar.unregister (getName ());
133        NameRegistrar.unregister ("channel." + getName ());
134    }
135
136    public synchronized void setReconnectDelay (long delay) {
137        getPersist().getChild ("reconnect-delay")
138            .setText (Long.toString (delay));
139        this.delay = delay;
140        setModified (true);
141    }
142    public long getReconnectDelay () {
143        return delay;
144    }
145    public synchronized void setInQueue (String in) {
146        String old = this.in;
147        this.in = in;
148        if (old != null)
149            sp.out (old, Boolean.TRUE);
150
151        getPersist().getChild("in").setText (in);
152        setModified (true);
153    }
154    public String getInQueue () {
155        return in;
156    }
157    public synchronized void setOutQueue (String out) {
158        this.out = out;
159        getPersist().getChild("out").setText (out);
160        setModified (true);
161    }
162
163    /**
164     * Queue a message to be transmitted by this adaptor
165     * @param m message to send
166     */
167    public void send (ISOMsg m) {
168        sp.out (in, m);
169    }
170    /**
171     * Queue a message to be transmitted by this adaptor
172     * @param m message to send
173     * @param timeout timeout in millis
174     */
175    public void send (ISOMsg m, long timeout) {
176        sp.out (in, m, timeout);
177    }
178
179    /**
180     * Receive message
181     */
182    public ISOMsg receive () {
183        return (ISOMsg) sp.in (out);
184    }
185
186    /**
187     * Receive message
188     * @param timeout time to wait for an incoming message
189     */
190    public ISOMsg receive (long timeout) {
191        return (ISOMsg) sp.in (out, timeout);
192    }
193    /**
194     * @return true if channel is connected
195     */
196    public boolean isConnected () {
197        return sp != null && sp.rdp (ready) != null;
198    }
199
200    public String getOutQueue () {
201        return out;
202    }
203
204    /** Parses a {@code <channel>}  element, returning an {@link ISOChannel} */
205    public ISOChannel newChannel (Element e, QFactory f) throws ConfigurationException {
206        String channelName  = QFactory.getAttributeValue (e, "class");
207        String packagerName = QFactory.getAttributeValue (e, "packager");
208
209        ISOChannel channel = f.newInstance(channelName);
210        if (packagerName != null) {
211            ISOPackager packager = f.newInstance(packagerName);
212            channel.setPackager (packager);
213            f.setConfiguration (packager, e);
214        }
215        QFactory.invoke (channel, "setHeader", QFactory.getAttributeValue (e, "header"));
216        f.setLogger        (channel, e);
217        f.setConfiguration (channel, e);
218
219        if (channel instanceof FilteredChannel) {
220            addFilters ((FilteredChannel) channel, e, f);
221        }
222
223        if (channel instanceof ExceptionHandlerAware) {
224            addExceptionHandlers((ExceptionHandlerAware) channel, e, f);
225        }
226
227        if (channel instanceof ISOMsgMetrics.Source metricsChannel) {
228            String type = "default";                                    // default alias, in case metrics not defined
229            String clazz = null;
230
231            Element met = e.getChild("metrics");
232            if (met != null) {
233                if (QFactory.isEnabled(met)) {
234                    clazz = QFactory.getAttributeValue(met, "class");
235                    String typeAttr = QFactory.getAttributeValue(met, "type");
236                    type =  (clazz != null)     ? "class"  :            // class attribute has precedence over type
237                            (typeAttr != null)  ? typeAttr :
238                            type;
239                } else {
240                    type = "none";                                      // <metrics enabled="false" /> equivalent to type="none"
241                }
242            }
243
244            ISOMsgMetrics m = switch (type) {
245                case "none" -> null;
246
247                case "default" -> {
248                    var mc = new ISOMsgCounter();
249                    if (met != null)
250                        f.setLogger(mc, met);
251                    else
252                        mc.setLogger(this.getLog().getLogger(), this.getRealm()+"/metrics");
253                    yield mc;
254                }
255
256                case "counter" -> {
257                    var mc = new ISOMsgCounter();
258                    f.setLogger(mc, met);
259                    f.setConfiguration(mc, met);
260                    yield mc;
261                }
262
263                case "class" -> {
264                    ISOMsgMetrics mc = f.newInstance(clazz);
265                    f.setLogger(mc, met);
266                    f.setConfiguration(mc, met);
267                    yield mc;
268                }
269
270                default -> throw new ConfigurationException("Unknown metric type '"+type+"'");
271            };
272
273            metricsChannel.setISOMsgMetrics(m);
274        } // metrics config
275
276        if (getName () != null)
277            channel.setName (getName ());
278
279        return channel;
280    }
281
282    protected void addFilters (FilteredChannel channel, Element e, QFactory fact)
283        throws ConfigurationException
284    {
285        for (Element f : e.getChildren("filter")) {
286            ISOFilter filter = fact.newInstance(f);
287            if (filter == null) continue;
288            String direction = QFactory.getAttributeValue(f, "direction");
289            if (direction == null)
290                channel.addFilter(filter);
291            else if ("incoming".equalsIgnoreCase(direction))
292                channel.addIncomingFilter(filter);
293            else if ("outgoing".equalsIgnoreCase(direction))
294                channel.addOutgoingFilter(filter);
295            else if ("both".equalsIgnoreCase(direction)) {
296                channel.addIncomingFilter(filter);
297                channel.addOutgoingFilter(filter);
298            }
299        }
300    }
301
302
303    protected ISOChannel initChannel () throws ConfigurationException {
304        Element persist = getPersist ();
305        Element e = persist.getChild ("channel");
306        if (e == null)
307            throw new ConfigurationException ("channel element missing");
308
309        ISOChannel c = newChannel (e, getFactory());
310
311        String socketFactoryString = getSocketFactory();
312        if (socketFactoryString != null && c instanceof FactoryChannel) {
313            ISOClientSocketFactory sFac = getFactory().newInstance(socketFactoryString);
314            if (sFac instanceof LogSource) {
315                ((LogSource) sFac).setLogger(log.getLogger(),getName() + ".socket-factory");
316            }
317            getFactory().setConfiguration (sFac, e);
318            ((FactoryChannel)c).setSocketFactory(sFac);
319        }
320
321        return c;
322    }
323
324    protected void initSpaceAndQueues () throws ConfigurationException {
325        Element persist = getPersist ();
326        sp = grabSpace (persist.getChild ("space"));
327        in      = Environment.get(persist.getChildTextTrim ("in"));
328        out     = Environment.get(persist.getChildTextTrim ("out"));
329        writeOnly = "yes".equalsIgnoreCase (getPersist().getChildTextTrim ("write-only"));
330        if (in == null || (out == null && !writeOnly)) {
331            throw new ConfigurationException ("Misconfigured channel. Please verify in/out queues");
332        }
333        String s = Environment.get(persist.getChildTextTrim ("reconnect-delay"));
334        delay    = s != null ? Long.parseLong (s) : 10000; // reasonable default
335        keepAlive = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("keep-alive")));
336        ignoreISOExceptions = "yes".equalsIgnoreCase (Environment.get(persist.getChildTextTrim ("ignore-iso-exceptions")));
337        String t = Environment.get(persist.getChildTextTrim("timeout"));
338        timeout = t != null && t.length() > 0 ? Long.parseLong(t) : 0L;
339        ready   = getName() + ".ready";
340        reconnect = getName() + ".reconnect";
341        waitForWorkersOnStop = "yes".equalsIgnoreCase(Environment.get(persist.getChildTextTrim ("wait-for-workers-on-stop")));
342    }
343
344    @SuppressWarnings("unchecked")
345    public class Sender implements Runnable {
346        public Sender () {
347            super ();
348        }
349        public void run () {
350            Thread.currentThread().setName ("channel-sender-" + in);
351
352            while (running()){
353                try {
354                    checkConnection ();
355                    if (!running())
356                        break;
357                    Object o = sp.in (in, delay);
358                    if (o instanceof ISOMsg m) {
359                        if (!channel.isConnected()) {
360                            // push back the message so it can be handled by another channel adaptor
361                            sp.push(in, o);
362                            continue;
363                        }
364                        channel.send(m);
365                        tx++;
366                    } else if (o instanceof Integer) {
367                        if ((int)o != hashCode()) {
368                            // STOP indicator seems to be for another channel adaptor
369                            // sharing the same queue push it back and allow the companion
370                            // channel to get it
371                            sp.push (in, o, 500L);
372                            ISOUtil.sleep (1000L); // larger sleep so that the indicator has time to timeout
373                        }
374                    }
375                    else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) {
376                        ((BaseChannel)channel).sendKeepAlive();
377                    }
378                } catch (ISOFilter.VetoException e) {
379                    // getLog().warn ("channel-sender-"+in, e.getMessage ());
380                } catch (ISOException e) {
381                    // getLog().warn ("channel-sender-"+in, e.getMessage ());
382                    if (!ignoreISOExceptions) {
383                        disconnect ();
384                    }
385                    ISOUtil.sleep (1000); // slow down on errors
386                } catch (Exception e) {
387                    // getLog().warn ("channel-sender-"+in, e.getMessage ());
388                    disconnect ();
389                    ISOUtil.sleep (1000);
390                }
391            }
392        }
393    }
394    @SuppressWarnings("unchecked")
395    public class Receiver implements Runnable {
396        public Receiver () {
397            super ();
398        }
399        public void run () {
400            Thread.currentThread().setName ("channel-receiver-"+out);
401            boolean shuttingDown = false;
402            Instant shutdownDeadline = null;
403            final Duration gracePeriod = Duration.ofMillis(softStop);
404            while (true) {
405                if (!shuttingDown && !running()) {
406                    if (gracePeriod.isZero())
407                        break;
408                    shuttingDown = true;
409                    shutdownDeadline = Instant.now().plus(gracePeriod);
410                    getLog().info("soft-stop (%s)".formatted(shutdownDeadline.atZone(ZoneId.systemDefault())));
411                }
412                final boolean shouldExit = shuttingDown
413                  ? Instant.now().isAfter(shutdownDeadline)
414                  : !running();
415
416                if (shouldExit) {
417                    getLog().info ("stop");
418                    break;
419                }
420                try {
421                    Object r = sp.rd (ready, 5000L);
422                    if (r == null) {
423                        continue;
424                    }
425                    ISOMsg m = channel.receive ();
426                    rx++;
427                    lastTxn = System.currentTimeMillis();
428                    if (timeout > 0)
429                        sp.out (out, m, timeout);
430                    else
431                        sp.out (out, m);
432                } catch (ISOFilter.VetoException e) {
433                    // getLog().warn ("channel-receiver-"+out+"-veto-exception", e.getMessage());
434                } catch (ISOException e) {
435                    if (running()) {
436                        // getLog().warn ("channel-receiver-"+out, e);
437                        if (!ignoreISOExceptions) {
438                            sp.out (reconnect, Boolean.TRUE, delay);
439                            disconnect ();
440                            sp.push (in, hashCode()); // wake-up Sender
441                        }
442                        ISOUtil.sleep(1000);
443                    }
444                } catch (SocketTimeoutException | EOFException e) {
445                    if (running()) {
446                        // getLog().warn ("channel-receiver-"+out, "Read timeout / EOF - reconnecting");
447                        sp.out (reconnect, Boolean.TRUE, delay);
448                        disconnect ();
449                        sp.push (in, hashCode()); // wake-up Sender
450                        ISOUtil.sleep(1000);
451                    }
452                } catch (Exception e) {
453                    if (running()) {
454                        // getLog().warn ("channel-receiver-"+out, e);
455                        sp.out (reconnect, Boolean.TRUE, delay);
456                        disconnect ();
457                        sp.push (in, hashCode()); // wake-up Sender
458                        ISOUtil.sleep(1000);
459                    }
460                }
461            }
462            disconnect();
463        }
464    }
465    protected void checkConnection () {
466        while (running() && sp.rdp (reconnect) != null) {
467            ISOUtil.sleep(1000);
468        }
469        while (running() && !channel.isConnected ()) {
470            SpaceUtil.wipe(sp, ready);
471            try {
472                channel.connect ();
473            } catch (IOException ignored) {
474                // channel.connect already logs - no need for more warnings
475            }
476            if (!channel.isConnected ())
477                ISOUtil.sleep (delay);
478            else
479                connects++;
480        }
481        if (running() && sp.rdp (ready) == null)
482            sp.out (ready, new Date());
483    }
484    protected void disconnect () {
485        // do not synchronize on this as both Sender and Receiver can deadlock against a thread calling stop()
486        synchronized (disconnectLock) {
487            try {
488                SpaceUtil.wipe(sp, ready);
489                channel.disconnect();
490            } catch (Exception e) {
491                getLog().warn("disconnect", e);
492            }
493        }
494    }
495    private void disconnectLater(long delayInMillis) {
496         SpaceUtil.wipe(sp, ready);
497         scheduledExecutor.schedule(this::disconnect, delayInMillis, TimeUnit.MILLISECONDS);
498    }
499    public synchronized void setHost (String host) {
500        setProperty (getProperties ("channel"), "host", host);
501        setModified (true);
502    }
503    public String getHost () {
504        return getProperty (getProperties ("channel"), "host");
505    }
506    public synchronized void setPort (int port) {
507        setProperty (
508            getProperties ("channel"), "port", Integer.toString (port)
509        );
510        setModified (true);
511    }
512    public int getPort () {
513        int port = 0;
514        try {
515            port = Integer.parseInt (
516                getProperty (getProperties ("channel"), "port")
517            );
518        } catch (NumberFormatException e) {
519            getLog().error(e);
520        }
521        return port;
522    }
523    public synchronized void setSocketFactory (String sFac) {
524        setProperty(getProperties("channel"), "socketFactory", sFac);
525        setModified(true);
526    }
527
528    public void resetCounters () {
529        rx = tx = connects = 0;
530        lastTxn = 0L;
531    }
532    public String getCountersAsString () {
533        StringBuilder sb = new StringBuilder();
534        append (sb, "tx=", tx);
535        append (sb, ", rx=", rx);
536        append (sb, ", connects=", connects);
537        sb.append (", last=");
538        sb.append(lastTxn);
539        if (lastTxn > 0) {
540            sb.append (", idle=");
541            sb.append(System.currentTimeMillis() - lastTxn);
542            sb.append ("ms");
543        }
544        return sb.toString();
545    }
546    public int getTXCounter() {
547        return tx;
548    }
549    public int getRXCounter() {
550        return rx;
551    }
552    public int getConnectsCounter () {
553        return connects;
554    }
555    public long getLastTxnTimestampInMillis() {
556        return lastTxn;
557    }
558    public long getIdleTimeInMillis() {
559        return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L;
560    }
561    public String getSocketFactory() {
562        return getProperty(getProperties ("channel"), "socketFactory");
563    }
564    public void dump (PrintStream p, String indent) {
565        p.println (indent + getCountersAsString());
566    }
567    protected Space grabSpace (Element e) {
568        return SpaceFactory.getSpace (e != null ? e.getText() : "");
569    }
570    protected void append (StringBuilder sb, String name, int value) {
571        sb.append (name);
572        sb.append (value);
573    }
574
575    private void initMeters() {
576        var tags = Tags.of("name", getName(),
577                            "type", "client");
578        var registry = getServer().getMeterRegistry();
579
580        connectionsGauge =
581          MeterFactory.gauge
582            (registry, MeterInfo.ISOCHANNEL_CONNECTION_COUNT,
583              tags,
584              BaseUnits.SESSIONS,
585              () -> isConnected() ? 1 : 0
586            );
587
588        if (channel instanceof ISOMsgMetrics.Source ms) {
589            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
590            if (mtr != null) {
591                mtr.addTags(tags);
592                mtr.register(registry);
593            }
594        }
595    }
596
597    private void removeMeters() {
598        var registry = getServer().getMeterRegistry();
599        registry.remove(connectionsGauge);
600
601        if (channel instanceof ISOMsgMetrics.Source ms) {
602            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
603            if (mtr != null)
604                mtr.removeMeters();
605        }
606    }
607}