001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import io.micrometer.core.instrument.*;
022import io.micrometer.core.instrument.Timer;
023import org.HdrHistogram.AtomicHistogram;
024import org.jdom2.Element;
025import org.jpos.core.ConfigurationException;
026import org.jpos.core.Environment;
027import org.jpos.iso.*;
028import org.jpos.metrics.MeterFactory;
029import org.jpos.metrics.MeterInfo;
030import org.jpos.q2.QBeanSupport;
031import org.jpos.q2.QFactory;
032import org.jpos.space.*;
033import org.jpos.util.*;
034import org.jpos.util.Metrics;
035
036import java.io.IOException;
037import java.io.PrintStream;
038import java.util.*;
039import java.util.concurrent.ScheduledFuture;
040import java.util.concurrent.TimeUnit;
041
042/**
043 * @author Alejandro Revilla
044 */
045@SuppressWarnings("unchecked")
046public class QMUX
047    extends QBeanSupport
048    implements SpaceListener, MUX, QMUXMBean, Loggeable, MetricsProvider
049{
050    static final String nomap = "0123456789";
051    static final String DEFAULT_KEY = "41, 11";
052    protected LocalSpace sp;
053    protected String in, out, unhandled;
054    protected String[] ready;
055    protected String[] key;
056    protected String ignorerc;
057    protected String[] mtiMapping;
058    private boolean headerIsKey;
059    private boolean returnRejects;
060    private LocalSpace isp; // internal space
061    private Map<String,String[]> mtiKey = new HashMap<>();
062    private Metrics metrics = new Metrics(new AtomicHistogram(60000, 2));
063
064    List<ISORequestListener> listeners;
065    private volatile int rx, tx, rxExpired, txExpired, rxPending, rxUnhandled, rxForwarded;
066    private volatile long lastTxn = 0L;
067    private boolean listenerRegistered;
068
069    private Gauge statusGauge;
070    private Gauge rxPendingGauge;
071    private Timer responseTimer;
072
073    private Counter txCounter;
074    private Counter rxCounter;
075    private Counter rxMatchCounter;
076    private Counter rxUnhandledCounter;
077
078    public QMUX () {
079        super ();
080        listeners = new ArrayList<>();
081    }
082    public void initService () throws ConfigurationException {
083        Element e = getPersist ();
084        sp        = grabSpace (e.getChild ("space"));
085        isp       = cfg.getBoolean("reuse-space", false) ? sp : new TSpace();
086        in        = Environment.get(e.getChildTextTrim ("in"));
087        out       = Environment.get(e.getChildTextTrim ("out"));
088
089        if (in == null || out == null) {
090            throw new ConfigurationException ("Misconfigured QMUX. Please verify in/out queues");
091        }
092        ignorerc  = Environment.get(e.getChildTextTrim ("ignore-rc"));
093        key = toStringArray(DEFAULT_KEY, ", ", null);
094        returnRejects = cfg.getBoolean("return-rejects", false);
095        for (Element keyElement : e.getChildren("key")) {
096            String mtiOverride = QFactory.getAttributeValue(keyElement, "mti");
097            if (mtiOverride != null && mtiOverride.length() >= 2) {
098                String pcode = sanitizePcode(QFactory.getAttributeValue(keyElement, "pcode"));
099                String mapKey = buildMtiKey(mtiOverride.substring(0,2), pcode);
100                mtiKey.put (mapKey, toStringArray(keyElement.getTextTrim(), ", ", null));
101            } else {
102                key = toStringArray(e.getChildTextTrim("key"), ", ", DEFAULT_KEY);
103            }
104        }
105        ready     = toStringArray(Environment.get(e.getChildTextTrim ("ready")));
106        mtiMapping = toStringArray(Environment.get(e.getChildTextTrim ("mtimapping")));
107        if (mtiMapping == null || mtiMapping.length != 3)
108            mtiMapping = new String[] { nomap, nomap, "0022446689" };
109        addListeners ();
110        unhandled = Environment.get(e.getChildTextTrim ("unhandled"));
111        initMeters();
112        NameRegistrar.register ("mux."+getName (), this);
113    }
114    public void startService () {
115        if (!listenerRegistered) {
116            listenerRegistered = true;
117            // Handle messages that could be in the in queue at start time
118            synchronized (sp) {
119                Object[] pending = SpaceUtil.inpAll(sp, in);
120                sp.addListener (in, this);
121                for (Object o : pending)
122                    sp.out(in, o);
123            }
124        }
125    }
126    public void stopService () {
127        listenerRegistered = false;
128        sp.removeListener (in, this);
129        removeMeters();
130    }
131    public void destroyService () {
132        NameRegistrar.unregister ("mux."+getName ());
133    }
134
135    /**
136     * @return MUX with name using NameRegistrar
137     * @throws NameRegistrar.NotFoundException
138     * @see NameRegistrar
139     */
140    public static MUX getMUX (String name)
141        throws NameRegistrar.NotFoundException
142    {
143        return (MUX) NameRegistrar.get ("mux."+name);
144    }
145
146    /**
147     * @param m message to send
148     * @param timeout amount of time in millis to wait for a response
149     * @return response or null
150     */
151    public ISOMsg request (ISOMsg m, long timeout) throws ISOException {
152        String key = getKey (m);
153        String req = key + ".req";
154        synchronized (isp) {
155            if (isp.rdp (req) != null)
156                throw new ISOException ("Duplicate key '" + req + "' detected");
157            isp.out (req, m);
158        }
159        m.setDirection(0);
160        Chronometer c = new Chronometer();
161        if (timeout > 0)
162            sp.out (out, m, timeout);
163        else
164            sp.out (out, m);
165
166        txCounter.increment();
167        ISOMsg resp;
168        try {
169            synchronized (this) { tx++; rxPending++; }
170
171            for (;;) {
172                resp = (ISOMsg) isp.in (key, timeout);
173                if (!shouldIgnore (resp))
174                    break;
175            }
176            if (resp == null && isp.inp (req) == null) {
177                // possible race condition, retry for a few extra seconds
178                resp = (ISOMsg) isp.in (key, 10000);
179            }
180            synchronized (this) {
181                if (resp != null) {
182                    rx++;
183                    lastTxn = System.currentTimeMillis();
184                } else {
185                    rxExpired++;
186                    if (m.getDirection() != ISOMsg.OUTGOING)
187                        txExpired++;
188                }
189            }
190        } finally {
191            synchronized (this) { rxPending--; }
192        }
193        long elapsed = c.elapsed();
194        metrics.record("all", elapsed);
195        if (resp != null) {
196            responseTimer.record(elapsed, TimeUnit.MILLISECONDS);
197            metrics.record("ok", elapsed);
198        }
199        return resp;
200    }
201    public void request (ISOMsg m, long timeout, ISOResponseListener rl, Object handBack)
202      throws ISOException
203    {
204        String key = getKey (m);
205        String req = key + ".req";
206        synchronized (isp) {
207            if (isp.rdp (req) != null)
208                throw new ISOException ("Duplicate key '" + req + "' detected.");
209            m.setDirection(0);
210            AsyncRequest ar = new AsyncRequest (rl, handBack);
211            synchronized (ar) {
212                if (timeout > 0)
213                    ar.setFuture(getScheduledThreadPoolExecutor().schedule(ar, timeout, TimeUnit.MILLISECONDS));
214            }
215            isp.out (req, ar, timeout);
216        }
217        if (timeout > 0)
218            sp.out (out, m, timeout);
219        else
220            sp.out (out, m);
221        synchronized (this) { tx++; rxPending++; }
222    }
223
224    protected boolean isNotifyEligible(ISOMsg msg) {
225        if (returnRejects)
226            return true;
227
228        try {
229            return msg.isResponse();
230        } catch (RuntimeException | ISOException ex) {
231            // * ArrayIndexOutOfBoundsException - It may occur for messages where
232            // MTI is not standard 4 characters (eg. FSDISOMsg), then notification is expected.
233            // * ISOException: When there is no field 0, the error should be logged
234            return true;
235        }
236    }
237
238    @Override
239    public void notify (Object k, Object value) {
240        Object obj = sp.inp (k);
241        if (obj instanceof ISOMsg) {
242            ISOMsg m = (ISOMsg) obj;
243            rxCounter.increment();
244            try {
245                if (isNotifyEligible(m)) {
246                    String key = getKey (m);
247                    String req = key + ".req";
248                    Object r = isp.inp (req);
249                    if (r != null) {
250                        if (r instanceof AsyncRequest ar) {
251                            ar.responseReceived (m);
252                        } else {
253                            isp.out (key, m);
254                        }
255                        rxMatchCounter.increment();
256                        return;
257                    }
258                }
259            } catch (ISOException e) {
260                LogEvent evt = getLog().createLogEvent("notify");
261                evt.addMessage(e);
262                evt.addMessage(obj);
263                Logger.log(evt);
264            }
265            processUnhandled (m);
266        }
267    }
268
269    public String getKey (ISOMsg m) throws ISOException {
270        if (out == null)
271            throw new NullPointerException ("Misconfigured QMUX. Please verify out queue is not null.");
272        StringBuilder sb = new StringBuilder (out);
273        sb.append ('.');
274        sb.append (mapMTI(m.getMTI()));
275        if (headerIsKey && m.getHeader()!=null) {
276            sb.append ('.');
277            sb.append(ISOUtil.hexString(m.getHeader()));
278            sb.append ('.');
279        }
280        boolean hasFields = false;
281        String mti = m.getMTI();
282        String mtiPrefix = mti.substring(0,2);
283        String[] k = null;
284        String pcode = m.hasField(3) ? sanitizePcode(m.getString(3)) : null;
285        if (pcode != null) {
286            k = mtiKey.get(buildMtiKey(mtiPrefix, pcode));
287        }
288        if (k == null) {
289            k = mtiKey.getOrDefault(mtiPrefix, key);
290        }
291        for (String f : k) {
292            String v = m.getString(f);
293            if (v != null) {
294                if ("11".equals(f)) {
295                    String vt = v.trim();
296                    int l = m.getMTI().charAt(0) == '2' ? 12 : 6;
297                    if (vt.length() < l)
298                        v = ISOUtil.zeropad(vt, l);
299                }
300                if ("41".equals(f)) {
301                    v = ISOUtil.zeropad(v.trim(), 16); // BIC ANSI to ISO hack
302                }
303                hasFields = true;
304                sb.append(v);
305            }
306        }
307        if (!hasFields)
308            throw new ISOException ("Key fields not found - not sending " + sb.toString());
309        return sb.toString();
310    }
311
312    private String sanitizePcode(String pcode) {
313        if (pcode == null)
314            return null;
315        String trimmed = pcode.trim();
316        return trimmed.isEmpty() ? null : trimmed;
317    }
318
319    private String buildMtiKey(String mtiPrefix, String pcode) {
320        return pcode == null ? mtiPrefix : mtiPrefix + ':' + pcode;
321    }
322
323    @Override
324    public Metrics getMetrics() {
325        return metrics;
326    }
327
328    private String mapMTI (String mti) throws ISOException {
329        StringBuilder sb = new StringBuilder();
330        if (mti != null) {
331            if (mti.length() < 4)
332                mti = ISOUtil.zeropad(mti, 4); // #jPOS-55
333            if (mti.length() == 4) {
334                for (int i=0; i<mtiMapping.length; i++) {
335                    int c = mti.charAt (i) - '0';
336                    if (c >= 0 && c < 10)
337                        sb.append (mtiMapping[i].charAt(c));
338                }
339            }
340        }
341        return sb.toString();
342    }
343    public synchronized void setInQueue (String in) {
344        this.in = in;
345        getPersist().getChild("in").setText (in);
346        setModified (true);
347    }
348    public String getInQueue () {
349        return in;
350    }
351    public synchronized void setOutQueue (String out) {
352        this.out = out;
353        getPersist().getChild("out").setText (out);
354        setModified (true);
355    }
356    public String getOutQueue () {
357        return out;
358    }
359    public Space getSpace() {
360        return sp;
361    }
362    public synchronized void setUnhandledQueue (String unhandled) {
363        this.unhandled = unhandled;
364        getPersist().getChild("unhandled").setText (unhandled);
365        setModified (true);
366    }
367    public String getUnhandledQueue () {
368        return unhandled;
369    }
370    @SuppressWarnings("unused")
371    public String[] getReadyIndicatorNames() {
372        return ready;
373    }
374
375    private void addListeners() throws ConfigurationException {
376        QFactory factory = getFactory ();
377        for (Element l : getPersist().getChildren("request-listener")) {
378            ISORequestListener listener = factory.newInstance(l);
379            if (listener != null)
380                addISORequestListener (listener);
381        }
382    }
383    public void addISORequestListener(ISORequestListener l) {
384        listeners.add (l);
385    }
386    public boolean removeISORequestListener(ISORequestListener l) {
387        return listeners.remove(l);
388    }
389    public synchronized void resetCounters() {
390        rx = tx = rxExpired = txExpired = rxPending = rxUnhandled = rxForwarded = 0;
391        lastTxn = 0l;
392    }
393    public String getCountersAsString () {
394        StringBuffer sb = new StringBuffer();
395        append (sb, "tx=", tx);
396        append (sb, ", rx=", rx);
397        append (sb, ", tx_expired=", getTXExpired());
398        append (sb, ", tx_pending=", getTXPending());
399        append (sb, ", rx_expired=", getRXExpired());
400        append (sb, ", rx_pending=", getRXPending());
401        append (sb, ", rx_unhandled=", getRXUnhandled());
402        append (sb, ", rx_forwarded=", getRXForwarded());
403        sb.append (", connected=");
404        sb.append (Boolean.toString(isConnected()));
405        sb.append (", last=");
406        sb.append (lastTxn);
407        if (lastTxn > 0) {
408            sb.append (", idle=");
409            sb.append(System.currentTimeMillis() - lastTxn);
410            sb.append ("ms");
411        }
412        return sb.toString();
413    }
414
415    public int getTXCounter() {
416        return tx;
417    }
418    public int getRXCounter() {
419        return rx;
420    }
421
422    @Override
423    public int getTXExpired() {
424        return txExpired;
425    }
426
427    @Override
428    public int getTXPending() {
429        return sp.size(out);
430    }
431
432    @Override
433    public int getRXExpired() {
434        return rxExpired;
435    }
436
437    @Override
438    public int getRXPending() {
439        return rxPending;
440    }
441
442    @Override
443    public int getRXUnhandled() {
444        return rxUnhandled;
445    }
446
447    @Override
448    public int getRXForwarded() {
449        return rxForwarded;
450    }
451
452    public long getLastTxnTimestampInMillis() {
453        return lastTxn;
454    }
455    public long getIdleTimeInMillis() {
456        return lastTxn > 0L ? System.currentTimeMillis() - lastTxn : -1L;
457    }
458
459    protected void processUnhandled (ISOMsg m) {
460        ISOSource source = m.getSource();
461        source = source != null ? source : this;
462        rxUnhandledCounter.increment();
463        Iterator<ISORequestListener> iter = listeners.iterator();
464        if (iter.hasNext())
465            synchronized (this) { rxForwarded++; }
466        while (iter.hasNext())
467            if (iter.next().process (source, m))
468                return;
469        if (unhandled != null) {
470            synchronized (this) { rxUnhandled++; }
471            sp.out (unhandled, m, 120000);
472        }
473    }
474    private LocalSpace grabSpace (Element e)
475        throws ConfigurationException
476    {
477        String uri = e != null ? e.getText() : "";
478        Space sp = SpaceFactory.getSpace (uri);
479        if (sp instanceof LocalSpace) {
480            return (LocalSpace) sp;
481        }
482        throw new ConfigurationException ("Invalid space " + uri);
483    }
484
485    /**
486     * sends (or hands back) an ISOMsg
487     *
488     * @param m the Message to be sent
489     * @throws java.io.IOException
490     * @throws org.jpos.iso.ISOException
491     * @throws org.jpos.iso.ISOFilter.VetoException;
492     */
493    public void send(ISOMsg m) throws IOException, ISOException {
494        if (!isConnected())
495            throw new ISOException ("MUX is not connected");
496        sp.out (out, m);
497        txCounter.increment();
498    }
499
500    public boolean isConnected() {
501        if (running() && ready != null && ready.length > 0) {
502            for (String aReady : ready)
503                if (sp.rdp(aReady) != null)
504                    return true;
505            return false;
506        }
507        return running();
508    }
509    public void dump (PrintStream p, String indent) {
510        p.println (indent + getCountersAsString());
511        metrics.dump (p, indent);
512    }
513    private String[] toStringArray(String s, String delimiter, String def) {
514        if (s == null)
515            s = def;
516        String[] arr = null;
517        if (s != null && s.length() > 0) {
518            StringTokenizer st;
519            if (delimiter != null)
520                st = new StringTokenizer(s, delimiter);
521            else
522                st = new StringTokenizer(s);
523
524            List<String> l = new ArrayList<String>();
525            while (st.hasMoreTokens()) {
526                String t = st.nextToken();
527                if ("header".equalsIgnoreCase(t)) {
528                    headerIsKey = true;
529                } else {
530                    l.add (t);
531                }
532            }
533            arr = l.toArray(new String[l.size()]);
534        }
535        return arr;
536    }
537    private String[] toStringArray(String s) {
538        return toStringArray(s, null,null);
539    }
540    private boolean shouldIgnore (ISOMsg m) {
541        if (m != null && ignorerc != null
542            && ignorerc.length() > 0 && m.hasField(39))
543        {
544            return ignorerc.contains(m.getString(39));
545        }
546        return false;
547    }
548    private void append (StringBuffer sb, String name, int value) {
549        sb.append (name);
550        sb.append (value);
551    }
552    public class AsyncRequest implements Runnable {
553        ISOResponseListener rl;
554        Object handBack;
555        ScheduledFuture future;
556        Chronometer chrono;
557        public AsyncRequest (ISOResponseListener rl, Object handBack) {
558            super();
559            this.rl = rl;
560            this.handBack = handBack;
561            this.chrono = new Chronometer();
562        }
563        public void setFuture(ScheduledFuture future) {
564            this.future = future;
565        }
566        public void responseReceived (ISOMsg response) {
567            if (future == null || future.cancel(false)) {
568                synchronized (QMUX.this) {
569                    rx++;
570                    rxPending--;
571                    lastTxn = System.currentTimeMillis();
572                }
573                long elapsed = chrono.elapsed();
574                metrics.record("all", elapsed);
575                metrics.record("ok", elapsed);
576                rl.responseReceived(response, handBack);
577            }
578        }
579        public void run() {
580            synchronized(QMUX.this) {
581                rxPending--;
582            }
583            metrics.record("all", chrono.elapsed());
584            rl.expired(handBack);
585        }
586    }
587
588    private void initMeters() {
589        var tags = io.micrometer.core.instrument.Tags.of("name", getName());
590        var registry = getServer().getMeterRegistry();
591        statusGauge =
592          MeterFactory.gauge
593            (registry, MeterInfo.MUX_STATUS,
594              tags,
595              null,
596              () -> isConnected() ? 1 : 0
597            );
598
599        rxPendingGauge =
600          MeterFactory.gauge
601            (registry, MeterInfo.MUX_RX_PENDING,
602              tags,
603              null,
604              () -> rxPending
605            );
606
607        txCounter = MeterFactory.counter(registry, MeterInfo.MUX_TX, tags);
608        rxCounter = MeterFactory.counter(registry, MeterInfo.MUX_RX, tags);
609        rxMatchCounter = MeterFactory.counter(registry, MeterInfo.MUX_MATCH, tags.and("type", "match"));
610        rxUnhandledCounter = MeterFactory.counter(registry, MeterInfo.MUX_UNHANDLED, tags.and("type", "unhandled"));
611        responseTimer = MeterFactory.timer(registry, MeterInfo.MUX_RESPONSE_TIMER, tags);
612    }
613
614    private void removeMeters() {
615        MeterFactory.remove (getServer().getMeterRegistry(),
616          statusGauge, rxPendingGauge, txCounter, rxCounter, rxMatchCounter, rxUnhandledCounter, responseTimer
617        );
618    }
619}