001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import io.micrometer.core.instrument.*;
022import io.micrometer.core.instrument.Timer;
023import org.HdrHistogram.AtomicHistogram;
024import org.jdom2.Element;
025import org.jpos.core.ConfigurationException;
026import org.jpos.core.Environment;
027import org.jpos.iso.*;
028import org.jpos.metrics.MeterFactory;
029import org.jpos.metrics.MeterInfo;
030import org.jpos.q2.QBeanSupport;
031import org.jpos.q2.QFactory;
032import org.jpos.space.*;
033import org.jpos.util.*;
034import org.jpos.util.Metrics;
035
036import java.io.IOException;
037import java.io.PrintStream;
038import java.util.*;
039import java.util.concurrent.ScheduledFuture;
040import java.util.concurrent.TimeUnit;
041
042/**
043 * A Q2-managed multiplexer that routes ISO messages between channels and listeners.
044 * @author Alejandro Revilla
045 */
046@SuppressWarnings("unchecked")
047public class QMUX
048    extends QBeanSupport
049    implements SpaceListener, MUX, QMUXMBean, Loggeable, MetricsProvider
050{
051    static final String nomap = "0123456789";
052    static final String DEFAULT_KEY = "41, 11";
053    /** Local space backing the in/out queues. */
054    protected LocalSpace sp;
055    /** Queue names: inbound responses, outbound requests, and unhandled messages. */
056    protected String in, out, unhandled;
057    /** Optional list of "ready indicator" keys polled to determine connectivity. */
058    protected String[] ready;
059    /** Default field-number key components used to correlate responses. */
060    protected String[] key;
061    /** Comma-separated response codes to ignore (treat as if no response was received). */
062    protected String ignorerc;
063    /** Three-character MTI mapping table used by {@link #mapMTI(String)}. */
064    protected String[] mtiMapping;
065    private boolean headerIsKey;
066    private boolean returnRejects;
067    private LocalSpace isp; // internal space
068    private Map<String,String[]> mtiKey = new HashMap<>();
069    private Metrics metrics = new Metrics(new AtomicHistogram(60000, 2));
070
071    List<ISORequestListener> listeners;
072    private volatile int rx, tx, rxExpired, txExpired, rxPending, rxUnhandled, rxForwarded;
073    private volatile long lastTxn = 0L;
074    private boolean listenerRegistered;
075
076    private Gauge statusGauge;
077    private Gauge rxPendingGauge;
078    private Timer responseTimer;
079
080    private Counter txCounter;
081    private Counter rxCounter;
082    private Counter rxMatchCounter;
083    private Counter rxUnhandledCounter;
084
085    /** Default constructor. */
086    public QMUX () {
087        super ();
088        listeners = new ArrayList<>();
089    }
090
091    @Override
092    protected String defaultRealm() {
093        return Realm.COMM_MUX;
094    }
095
096    public void initService () throws ConfigurationException {
097        Element e = getPersist ();
098        sp        = grabSpace (e.getChild ("space"));
099        isp       = cfg.getBoolean("reuse-space", false) ? sp : new TSpace();
100        in        = Environment.get(e.getChildTextTrim ("in"));
101        out       = Environment.get(e.getChildTextTrim ("out"));
102
103        if (in == null || out == null) {
104            throw new ConfigurationException ("Misconfigured QMUX. Please verify in/out queues");
105        }
106        ignorerc  = Environment.get(e.getChildTextTrim ("ignore-rc"));
107        key = toStringArray(DEFAULT_KEY, ", ", null);
108        returnRejects = cfg.getBoolean("return-rejects", false);
109        for (Element keyElement : e.getChildren("key")) {
110            String mtiOverride = QFactory.getAttributeValue(keyElement, "mti");
111            if (mtiOverride != null && mtiOverride.length() >= 2) {
112                String pcode = sanitizePcode(QFactory.getAttributeValue(keyElement, "pcode"));
113                String mapKey = buildMtiKey(mtiOverride.substring(0,2), pcode);
114                mtiKey.put (mapKey, toStringArray(keyElement.getTextTrim(), ", ", null));
115            } else {
116                key = toStringArray(e.getChildTextTrim("key"), ", ", DEFAULT_KEY);
117            }
118        }
119        ready     = toStringArray(Environment.get(e.getChildTextTrim ("ready")));
120        mtiMapping = toStringArray(Environment.get(e.getChildTextTrim ("mtimapping")));
121        if (mtiMapping == null || mtiMapping.length != 3)
122            mtiMapping = new String[] { nomap, nomap, "0022446689" };
123        addListeners ();
124        unhandled = Environment.get(e.getChildTextTrim ("unhandled"));
125        initMeters();
126        NameRegistrar.register ("mux."+getName (), this);
127    }
128    public void startService () {
129        if (!listenerRegistered) {
130            listenerRegistered = true;
131            // Handle messages that could be in the in queue at start time
132            synchronized (sp) {
133                Object[] pending = SpaceUtil.inpAll(sp, in);
134                sp.addListener (in, this);
135                for (Object o : pending)
136                    sp.out(in, o);
137            }
138        }
139    }
140    public void stopService () {
141        listenerRegistered = false;
142        sp.removeListener (in, this);
143        removeMeters();
144    }
145    public void destroyService () {
146        NameRegistrar.unregister ("mux."+getName ());
147    }
148
149    /**
150     * Returns the MUX registered under the given name.
151     *
152     * @param name MUX name (without the {@code mux.} prefix)
153     * @return MUX with name using NameRegistrar
154     * @throws NameRegistrar.NotFoundException if not found in registry
155     * @see NameRegistrar
156     */
157    public static MUX getMUX (String name)
158        throws NameRegistrar.NotFoundException
159    {
160        return (MUX) NameRegistrar.get ("mux."+name);
161    }
162
163    /**
164     * @param m message to send
165     * @param timeout amount of time in millis to wait for a response
166     * @return response or null
167     */
168    public ISOMsg request (ISOMsg m, long timeout) throws ISOException {
169        String key = getKey (m);
170        String req = key + ".req";
171        synchronized (isp) {
172            if (isp.rdp (req) != null)
173                throw new ISOException ("Duplicate key '" + req + "' detected");
174            isp.out (req, m);
175        }
176        m.setDirection(0);
177        Chronometer c = new Chronometer();
178        if (timeout > 0)
179            sp.out (out, m, timeout);
180        else
181            sp.out (out, m);
182
183        txCounter.increment();
184        ISOMsg resp;
185        try {
186            synchronized (this) { tx++; rxPending++; }
187
188            for (;;) {
189                resp = (ISOMsg) isp.in (key, timeout);
190                if (!shouldIgnore (resp))
191                    break;
192            }
193            if (resp == null && isp.inp (req) == null) {
194                // possible race condition, retry for a few extra seconds
195                resp = (ISOMsg) isp.in (key, 10000);
196            }
197            synchronized (this) {
198                if (resp != null) {
199                    rx++;
200                    lastTxn = System.currentTimeMillis();
201                } else {
202                    rxExpired++;
203                    if (m.getDirection() != ISOMsg.OUTGOING)
204                        txExpired++;
205                }
206            }
207        } finally {
208            synchronized (this) { rxPending--; }
209        }
210        long elapsed = c.elapsed();
211        metrics.record("all", elapsed);
212        if (resp != null) {
213            responseTimer.record(elapsed, TimeUnit.MILLISECONDS);
214            metrics.record("ok", elapsed);
215        }
216        return resp;
217    }
218    public void request (ISOMsg m, long timeout, ISOResponseListener rl, Object handBack)
219      throws ISOException
220    {
221        String key = getKey (m);
222        String req = key + ".req";
223        synchronized (isp) {
224            if (isp.rdp (req) != null)
225                throw new ISOException ("Duplicate key '" + req + "' detected.");
226            m.setDirection(0);
227            AsyncRequest ar = new AsyncRequest (rl, handBack);
228            synchronized (ar) {
229                if (timeout > 0)
230                    ar.setFuture(getScheduledThreadPoolExecutor().schedule(ar, timeout, TimeUnit.MILLISECONDS));
231            }
232            isp.out (req, ar, timeout);
233        }
234        if (timeout > 0)
235            sp.out (out, m, timeout);
236        else
237            sp.out (out, m);
238        synchronized (this) { tx++; rxPending++; }
239    }
240
241    /**
242     * Returns whether {@code msg} should be considered for response-matching by {@link #notify(Object, Object)}.
243     *
244     * @param msg message just dequeued from the inbound space
245     * @return {@code true} when the message should be matched against pending requests
246     */
247    protected boolean isNotifyEligible(ISOMsg msg) {
248        if (returnRejects)
249            return true;
250
251        try {
252            return msg.isResponse();
253        } catch (RuntimeException | ISOException ex) {
254            // * ArrayIndexOutOfBoundsException - It may occur for messages where
255            // MTI is not standard 4 characters (eg. FSDISOMsg), then notification is expected.
256            // * ISOException: When there is no field 0, the error should be logged
257            return true;
258        }
259    }
260
261    @Override
262    public void notify (Object k, Object value) {
263        Object obj = sp.inp (k);
264        if (obj instanceof ISOMsg) {
265            ISOMsg m = (ISOMsg) obj;
266            rxCounter.increment();
267            try {
268                if (isNotifyEligible(m)) {
269                    String key = getKey (m);
270                    String req = key + ".req";
271                    Object r = isp.inp (req);
272                    if (r != null) {
273                        if (r instanceof AsyncRequest ar) {
274                            ar.responseReceived (m);
275                        } else {
276                            isp.out (key, m);
277                        }
278                        rxMatchCounter.increment();
279                        return;
280                    }
281                }
282            } catch (ISOException e) {
283                LogEvent evt = getLog().createLogEvent("notify");
284                evt.addMessage(e);
285                evt.addMessage(obj);
286                Logger.log(evt);
287            }
288            processUnhandled (m);
289        }
290    }
291
292    /**
293     * Builds the correlation key used to pair a response with its request,
294     * applying any per-MTI/per-PCODE overrides and special-cases for STAN/PAN fields.
295     *
296     * @param m message whose key should be derived
297     * @return the computed key (queue prefix + MTI + selected field values)
298     * @throws ISOException if no key fields are present in {@code m}
299     */
300    public String getKey (ISOMsg m) throws ISOException {
301        if (out == null)
302            throw new NullPointerException ("Misconfigured QMUX. Please verify out queue is not null.");
303        StringBuilder sb = new StringBuilder (out);
304        sb.append ('.');
305        sb.append (mapMTI(m.getMTI()));
306        if (headerIsKey && m.getHeader()!=null) {
307            sb.append ('.');
308            sb.append(ISOUtil.hexString(m.getHeader()));
309            sb.append ('.');
310        }
311        boolean hasFields = false;
312        String mti = m.getMTI();
313        String mtiPrefix = mti.substring(0,2);
314        String[] k = null;
315        String pcode = m.hasField(3) ? sanitizePcode(m.getString(3)) : null;
316        if (pcode != null) {
317            k = mtiKey.get(buildMtiKey(mtiPrefix, pcode));
318        }
319        if (k == null) {
320            k = mtiKey.getOrDefault(mtiPrefix, key);
321        }
322        for (String f : k) {
323            String v = m.getString(f);
324            if (v != null) {
325                if ("11".equals(f)) {
326                    String vt = v.trim();
327                    int l = m.getMTI().charAt(0) == '2' ? 12 : 6;
328                    if (vt.length() < l)
329                        v = ISOUtil.zeropad(vt, l);
330                }
331                if ("41".equals(f)) {
332                    v = ISOUtil.zeropad(v.trim(), 16); // BIC ANSI to ISO hack
333                }
334                hasFields = true;
335                sb.append(v);
336            }
337        }
338        if (!hasFields)
339            throw new ISOException ("Key fields not found - not sending " + sb.toString());
340        return sb.toString();
341    }
342
343    private String sanitizePcode(String pcode) {
344        if (pcode == null)
345            return null;
346        String trimmed = pcode.trim();
347        return trimmed.isEmpty() ? null : trimmed;
348    }
349
350    private String buildMtiKey(String mtiPrefix, String pcode) {
351        return pcode == null ? mtiPrefix : mtiPrefix + ':' + pcode;
352    }
353
354    @Override
355    public Metrics getMetrics() {
356        return metrics;
357    }
358
359    private String mapMTI (String mti) throws ISOException {
360        StringBuilder sb = new StringBuilder();
361        if (mti != null) {
362            if (mti.length() < 4)
363                mti = ISOUtil.zeropad(mti, 4); // #jPOS-55
364            if (mti.length() == 4) {
365                for (int i=0; i<mtiMapping.length; i++) {
366                    int c = mti.charAt (i) - '0';
367                    if (c >= 0 && c < 10)
368                        sb.append (mtiMapping[i].charAt(c));
369                }
370            }
371        }
372        return sb.toString();
373    }
374    public synchronized void setInQueue (String in) {
375        this.in = in;
376        getPersist().getChild("in").setText (in);
377        setModified (true);
378    }
379    public String getInQueue () {
380        return in;
381    }
382    public synchronized void setOutQueue (String out) {
383        this.out = out;
384        getPersist().getChild("out").setText (out);
385        setModified (true);
386    }
387    public String getOutQueue () {
388        return out;
389    }
390    /**
391     * Returns the {@link Space} backing this MUX's queues.
392     *
393     * @return the local space
394     */
395    public Space getSpace() {
396        return sp;
397    }
398    public synchronized void setUnhandledQueue (String unhandled) {
399        this.unhandled = unhandled;
400        getPersist().getChild("unhandled").setText (unhandled);
401        setModified (true);
402    }
403    public String getUnhandledQueue () {
404        return unhandled;
405    }
406    /**
407     * Returns the configured ready-indicator key names polled to determine connectivity.
408     *
409     * @return the ready indicator names, or {@code null} if none were configured
410     */
411    @SuppressWarnings("unused")
412    public String[] getReadyIndicatorNames() {
413        return ready;
414    }
415
416    private void addListeners() throws ConfigurationException {
417        QFactory factory = getFactory ();
418        for (Element l : getPersist().getChildren("request-listener")) {
419            ISORequestListener listener = factory.newInstance(l);
420            if (listener != null)
421                addISORequestListener (listener);
422        }
423    }
424    /**
425     * Registers a request listener invoked for messages that don't match a pending request.
426     *
427     * @param l listener to add
428     */
429    public void addISORequestListener(ISORequestListener l) {
430        listeners.add (l);
431    }
432    /**
433     * Removes a previously registered request listener.
434     *
435     * @param l listener to remove
436     * @return {@code true} if the listener was registered, {@code false} otherwise
437     */
438    public boolean removeISORequestListener(ISORequestListener l) {
439        return listeners.remove(l);
440    }
441    /**
442     * Resets all in-memory transaction counters and the last-transaction timestamp.
443     */
444    public synchronized void resetCounters() {
445        rx = tx = rxExpired = txExpired = rxPending = rxUnhandled = rxForwarded = 0;
446        lastTxn = 0l;
447    }
448    /**
449     * Returns the current counters formatted as a single human-readable string.
450     *
451     * @return a comma-separated counter snapshot suitable for diagnostics
452     */
453    public String getCountersAsString () {
454        StringBuffer sb = new StringBuffer();
455        append (sb, "tx=", tx);
456        append (sb, ", rx=", rx);
457        append (sb, ", tx_expired=", getTXExpired());
458        append (sb, ", tx_pending=", getTXPending());
459        append (sb, ", rx_expired=", getRXExpired());
460        append (sb, ", rx_pending=", getRXPending());
461        append (sb, ", rx_unhandled=", getRXUnhandled());
462        append (sb, ", rx_forwarded=", getRXForwarded());
463        sb.append (", connected=");
464        sb.append (Boolean.toString(isConnected()));
465        sb.append (", last=");
466        sb.append (lastTxn);
467        if (lastTxn > 0) {
468            sb.append (", idle=");
469            sb.append(System.currentTimeMillis() - lastTxn);
470            sb.append ("ms");
471        }
472        return sb.toString();
473    }
474
475    public int getTXCounter() {
476        return tx;
477    }
478    public int getRXCounter() {
479        return rx;
480    }
481
482    @Override
483    public int getTXExpired() {
484        return txExpired;
485    }
486
487    @Override
488    public int getTXPending() {
489        return sp.size(out);
490    }
491
492    @Override
493    public int getRXExpired() {
494        return rxExpired;
495    }
496
497    @Override
498    public int getRXPending() {
499        return rxPending;
500    }
501
502    @Override
503    public int getRXUnhandled() {
504        return rxUnhandled;
505    }
506
507    @Override
508    public int getRXForwarded() {
509        return rxForwarded;
510    }
511
512    public long getLastTxnTimestampInMillis() {
513        return lastTxn;
514    }
515    public long getIdleTimeInMillis() {
516        return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L;
517    }
518
519    /**
520     * Dispatches an inbound message that did not match any pending request,
521     * giving registered request listeners a chance to handle it before falling
522     * back to the configured {@link #unhandled} queue.
523     *
524     * @param m the unmatched inbound message
525     */
526    protected void processUnhandled (ISOMsg m) {
527        ISOSource source = m.getSource();
528        source = source != null ? source : this;
529        rxUnhandledCounter.increment();
530        Iterator<ISORequestListener> iter = listeners.iterator();
531        if (iter.hasNext())
532            synchronized (this) { rxForwarded++; }
533        while (iter.hasNext())
534            if (iter.next().process (source, m))
535                return;
536        if (unhandled != null) {
537            synchronized (this) { rxUnhandled++; }
538            sp.out (unhandled, m, 120000);
539        }
540    }
541    private LocalSpace grabSpace (Element e)
542        throws ConfigurationException
543    {
544        String uri = e != null ? e.getText() : "";
545        Space sp = SpaceFactory.getSpace (uri);
546        if (sp instanceof LocalSpace) {
547            return (LocalSpace) sp;
548        }
549        throw new ConfigurationException ("Invalid space " + uri);
550    }
551
552    /**
553     * sends (or hands back) an ISOMsg
554     *
555     * @param m the Message to be sent
556     * @throws java.io.IOException if the underlying space cannot accept the message
557     * @throws org.jpos.iso.ISOException on pack/unpack error
558     * @throws org.jpos.iso.ISOFilter.VetoException if a filter vetoes the message
559     */
560    public void send(ISOMsg m) throws IOException, ISOException {
561        if (!isConnected())
562            throw new ISOException ("MUX is not connected");
563        sp.out (out, m);
564        txCounter.increment();
565    }
566
567    public boolean isConnected() {
568        if (running() && ready != null && ready.length > 0) {
569            for (String aReady : ready)
570                if (sp.rdp(aReady) != null)
571                    return true;
572            return false;
573        }
574        return running();
575    }
576    public void dump (PrintStream p, String indent) {
577        p.println (indent + getCountersAsString());
578        metrics.dump (p, indent);
579    }
580    private String[] toStringArray(String s, String delimiter, String def) {
581        if (s == null)
582            s = def;
583        String[] arr = null;
584        if (s != null && s.length() > 0) {
585            StringTokenizer st;
586            if (delimiter != null)
587                st = new StringTokenizer(s, delimiter);
588            else
589                st = new StringTokenizer(s);
590
591            List<String> l = new ArrayList<String>();
592            while (st.hasMoreTokens()) {
593                String t = st.nextToken();
594                if ("header".equalsIgnoreCase(t)) {
595                    headerIsKey = true;
596                } else {
597                    l.add (t);
598                }
599            }
600            arr = l.toArray(new String[l.size()]);
601        }
602        return arr;
603    }
604    private String[] toStringArray(String s) {
605        return toStringArray(s, null,null);
606    }
607    private boolean shouldIgnore (ISOMsg m) {
608        if (m != null && ignorerc != null
609            && ignorerc.length() > 0 && m.hasField(39))
610        {
611            return ignorerc.contains(m.getString(39));
612        }
613        return false;
614    }
615    private void append (StringBuffer sb, String name, int value) {
616        sb.append (name);
617        sb.append (value);
618    }
619    /**
620     * Tracks an asynchronous request awaiting a response, with optional timeout
621     * scheduling and elapsed-time measurement.
622     */
623    public class AsyncRequest implements Runnable {
624        ISOResponseListener rl;
625        Object handBack;
626        ScheduledFuture future;
627        Chronometer chrono;
628        /**
629         * Constructs an async request paired with the given listener and hand-back token.
630         *
631         * @param rl listener to invoke on response/expiration
632         * @param handBack opaque token relayed back to {@code rl}
633         */
634        public AsyncRequest (ISOResponseListener rl, Object handBack) {
635            super();
636            this.rl = rl;
637            this.handBack = handBack;
638            this.chrono = new Chronometer();
639        }
640        /**
641         * Sets the scheduled future used to enforce the request timeout.
642         *
643         * @param future timeout future, or {@code null} for no timeout
644         */
645        public void setFuture(ScheduledFuture future) {
646            this.future = future;
647        }
648        /**
649         * Notifies the listener that a response has been received, cancelling the timeout future.
650         *
651         * @param response inbound response message
652         */
653        public void responseReceived (ISOMsg response) {
654            if (future == null || future.cancel(false)) {
655                synchronized (QMUX.this) {
656                    rx++;
657                    rxPending--;
658                    lastTxn = System.currentTimeMillis();
659                }
660                long elapsed = chrono.elapsed();
661                metrics.record("all", elapsed);
662                metrics.record("ok", elapsed);
663                rl.responseReceived(response, handBack);
664            }
665        }
666        public void run() {
667            synchronized(QMUX.this) {
668                rxPending--;
669            }
670            metrics.record("all", chrono.elapsed());
671            rl.expired(handBack);
672        }
673    }
674
675    private void initMeters() {
676        var tags = io.micrometer.core.instrument.Tags.of("name", getName());
677        var registry = getServer().getMeterRegistry();
678        statusGauge =
679          MeterFactory.gauge
680            (registry, MeterInfo.MUX_STATUS,
681              tags,
682              null,
683              () -> isConnected() ? 1 : 0
684            );
685
686        rxPendingGauge =
687          MeterFactory.gauge
688            (registry, MeterInfo.MUX_RX_PENDING,
689              tags,
690              null,
691              () -> rxPending
692            );
693
694        txCounter = MeterFactory.counter(registry, MeterInfo.MUX_TX, tags);
695        rxCounter = MeterFactory.counter(registry, MeterInfo.MUX_RX, tags);
696        rxMatchCounter = MeterFactory.counter(registry, MeterInfo.MUX_MATCH, tags.and("type", "match"));
697        rxUnhandledCounter = MeterFactory.counter(registry, MeterInfo.MUX_UNHANDLED, tags.and("type", "unhandled"));
698        responseTimer = MeterFactory.timer(registry, MeterInfo.MUX_RESPONSE_TIMER, tags);
699    }
700
701    private void removeMeters() {
702        MeterFactory.remove (getServer().getMeterRegistry(),
703          statusGauge, rxPendingGauge, txCounter, rxCounter, rxMatchCounter, rxUnhandledCounter, responseTimer
704        );
705    }
706}