001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.iso.*;
024import org.jpos.q2.QBeanSupport;
025import org.jpos.q2.QFactory;
026import org.jpos.space.Space;
027import org.jpos.space.SpaceFactory;
028import org.jpos.util.NameRegistrar;
029import org.jpos.util.Realm;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.concurrent.atomic.AtomicInteger;
035
036/**
037 * Pool of {@link MUX} instances that selects a delegate per request based on
038 * the configured strategy (one of {@link #PRIMARY_SECONDARY}, {@link #ROUND_ROBIN},
039 * {@link #ROUND_ROBIN_WITH_OVERRIDE}, or {@link #SPLIT_BY_DIVISOR}).
040 *
041 * @author apr
042 */
043public class MUXPool extends QBeanSupport implements MUX, MUXPoolMBean {
044    /** Default constructor; no instance state to initialise. */
045    public MUXPool() {}
046    /** Strategy: try MUXes in order, falling through to the next when unusable. */
047    public static final int PRIMARY_SECONDARY = 0;
048    /** Strategy: round-robin selection across the registered MUXes. */
049    public static final int ROUND_ROBIN = 1;
050    /** Strategy: round-robin selection with per-MTI override routing. */
051    public static final int ROUND_ROBIN_WITH_OVERRIDE = 2;
052    /** Strategy: hash-by-divisor selection driven by a configured field value. */
053    public static final int SPLIT_BY_DIVISOR = 3;
054
055    int strategy = 0;
056    String[] muxName;
057    MUX[] mux;
058    AtomicInteger msgno = new AtomicInteger();
059    String[] overrideMTIs;
060    String originalChannelField = "";
061    String splitField = "";
062    boolean checkEnabled;
063    Space sp;
064    StrategyHandler strategyHandler;
065
066    @Override
067    protected String defaultRealm() {
068        return Realm.COMM_MUX;
069    }
070
071    public void initService () throws ConfigurationException {
072        Element e = getPersist ();
073        muxName = toStringArray(e.getChildTextTrim ("muxes"));
074        strategy = getStrategy(e.getChildTextTrim("strategy"));
075        overrideMTIs = toStringArray(e.getChildTextTrim("follower-override"));
076        originalChannelField = e.getChildTextTrim("original-channel-field");
077        splitField = e.getChildTextTrim("split-field");
078        checkEnabled = cfg.getBoolean("check-enabled");
079        sp = grabSpace (e.getChild ("space"));
080
081        // Sanitize muxes list using only muxes that exist
082        if (muxName == null)
083            throw new ConfigurationException(
084                String.format("<muxes> element not configured for %s '%s'", getClass().getName(), getName()));
085
086        List<MUX> muxes = new ArrayList<>();
087        List<String> found = new ArrayList<>();
088        List<String> notFound = new ArrayList<>();
089        for (String s : muxName) {
090            MUX m = NameRegistrar.getIfExists("mux." + s);
091            if (m != null) {
092                found.add(s);
093                muxes.add(m);
094            } else
095                notFound.add(s);
096        }
097
098        if (!notFound.isEmpty()) {
099            log.warn("MUXPool "+getName()+": some muxes not found and will be removed "+notFound);
100        }
101        if (muxes.isEmpty()) {
102            throw new ConfigurationException ("MUXPool "+getName()+" has no available muxes");
103        }
104
105        muxName = found.toArray(new String[0]);
106        mux = muxes.toArray(new MUX[0]);
107
108        initHandler(e.getChild("strategy-handler"));
109        NameRegistrar.register ("mux."+getName (), this);
110    }
111
112    public void stopService () {
113        NameRegistrar.unregister ("mux."+getName ());
114    }
115
116    /**
117     * Instantiates a custom {@link StrategyHandler} from an XML configuration element.
118     *
119     * @param e {@code <strategy-handler>} element, or {@code null} to leave the handler unset
120     * @throws ConfigurationException if the handler cannot be instantiated
121     */
122    protected void initHandler(Element e) throws ConfigurationException {
123        if (e == null)
124            return;
125        strategyHandler = getFactory().newInstance(e);
126    }
127
128    public ISOMsg request (ISOMsg m, long timeout) throws ISOException {
129        if (timeout == 0) {
130            // a zero timeout intent is to fire-and-forget,
131            // you should use 'send' instead of 'request'
132            try {
133                send(m);
134            } catch (IOException e) {
135                throw new ISOException(e.getMessage(), e);
136            }
137        }
138
139        long maxWait = System.currentTimeMillis() + timeout;
140        MUX mux = getMUX(m,maxWait);
141        if (mux != null) {
142            long remainingTimeout = maxWait - System.currentTimeMillis();
143            if (remainingTimeout >= 0)
144                return mux.request(m, remainingTimeout);
145        }
146        return null;
147    }
148
149    public void request(ISOMsg m, long timeout, final ISOResponseListener r, final Object handBack)
150        throws ISOException
151    {
152        if (timeout == 0) {
153            // a zero timeout intent is to fire-and-forget,
154            // you should use 'send' instead of 'request'
155            try {
156                send(m);
157                new Thread(() -> r.expired(handBack)).start();
158            } catch (IOException e) {
159                throw new ISOException(e.getMessage(), e);
160            }
161        }
162
163        long maxWait = System.currentTimeMillis() + timeout;
164        MUX mux = getMUX(m,maxWait);
165        if (mux != null) {
166            long remainingTimeout = maxWait - System.currentTimeMillis();
167            if (remainingTimeout >= 0)
168                mux.request(m, remainingTimeout, r, handBack);
169            else {
170                new Thread(()->r.expired(handBack)).start();
171            }
172        } else
173            throw new ISOException ("No MUX available");
174    }
175
176    public void send (ISOMsg m) throws ISOException, IOException {
177        long maxWait = System.currentTimeMillis() + 1000L; // reasonable default
178        MUX mux = getMUX(m,maxWait);
179
180        if (mux == null)
181            throw new ISOException ("No available MUX");
182
183        mux.send(m);
184    }
185
186    /**
187     * Returns the first usable MUX, scanning from index 0 and waiting up to {@code maxWait}.
188     *
189     * @param maxWait wall-clock deadline in milliseconds since epoch
190     * @return a usable MUX, or {@code null} if none became available before the deadline
191     */
192    protected MUX firstAvailableMUX (long maxWait) {
193        return nextAvailableMUX(0, maxWait);
194    }
195
196    /**
197     * Returns the next usable MUX starting from {@code mnumber}, waiting up to {@code maxWait}.
198     *
199     * @param mnumber starting index (round-robin counter)
200     * @param maxWait wall-clock deadline in milliseconds since epoch
201     * @return a usable MUX, or {@code null} if none became available before the deadline
202     */
203    protected MUX nextAvailableMUX (int mnumber, long maxWait) {
204        do {
205            for (int i=0; i<mux.length; i++) {
206                int j = (mnumber+i) % mux.length;
207                if (isUsable(mux[j]))
208                    return mux[j];
209                msgno.incrementAndGet();
210            }
211            ISOUtil.sleep (1000);
212        } while (System.currentTimeMillis() < maxWait);
213        return null;
214    }
215
216    private boolean overrideMTI(String mtiReq) {
217        if(overrideMTIs != null){
218            for (String mti : overrideMTIs) {
219                if(mti.equals(mtiReq))
220                    return true;
221            }
222        }
223        return false;
224    }
225
226    private MUX nextAvailableWithOverrideMUX(ISOMsg m, long maxWait) {
227        try{
228            if(originalChannelField != null && !"".equals(originalChannelField)){
229                String channelName = m.getString(originalChannelField);
230                if(channelName != null && !"".equals(channelName) && overrideMTI(m.getMTI())){
231                    ChannelAdaptor channel = NameRegistrar.get (channelName);
232                    for (MUX mx : mux) {
233                        if(channel != null && ((QMUX)mx).getInQueue().equals(channel.getOutQueue())){
234                            if(isUsable(mx))
235                                return mx;
236                        }
237                    }
238                }
239            }
240            return nextAvailableMUX(msgno.incrementAndGet(), maxWait);
241        }catch(Exception e){
242            getLog().warn(e);
243        }
244        return null;
245    }
246
247    private MUX splitByDivisorMUX(ISOMsg m, long maxWait) {
248        try{
249            if(splitField != null && !"".equals(splitField)){
250                String split = m.getString(splitField);
251                if(split != null && ISOUtil.isNumeric(split, 10)){
252                    MUX mx = mux[Integer.parseInt(split) % mux.length];
253                    if(isUsable(mx))
254                        return mx;
255                }
256            }
257            return nextAvailableMUX(msgno.incrementAndGet(), maxWait);
258        }catch(Exception e){
259            getLog().warn(e);
260        }
261        return null;
262    }
263
264    private int getStrategy(String stg) {
265        if (stg == null)
266            return PRIMARY_SECONDARY;
267
268        stg = stg.trim();
269        switch (stg) {
270            case "round-robin": return ROUND_ROBIN;
271            case "round-robin-with-override": return ROUND_ROBIN_WITH_OVERRIDE;
272            case "split-by-divisor": return SPLIT_BY_DIVISOR;
273            default: return PRIMARY_SECONDARY;
274        }
275    }
276
277    private MUX getMUX(ISOMsg m, long maxWait) {
278        // maxWait should be interpreted as currentTimeMillis "deadline"
279        MUX mux = null;
280        if (strategyHandler != null) {
281             mux = strategyHandler.getMUX(this, m, maxWait);
282        }
283
284        if (mux != null)
285            return mux;
286
287        switch (strategy) {
288            case ROUND_ROBIN: return nextAvailableMUX(msgno.incrementAndGet(), maxWait);
289            case ROUND_ROBIN_WITH_OVERRIDE: return nextAvailableWithOverrideMUX(m, maxWait);
290            case SPLIT_BY_DIVISOR: return splitByDivisorMUX(m, maxWait);
291            default: return firstAvailableMUX(maxWait);
292        }
293    }
294
295    @Override
296    public String[] getMuxNames() {
297        return muxName;
298    }
299
300    @Override
301    public int getStrategy() {
302        return strategy;
303    }
304
305    /**
306     * Returns the configured custom strategy handler, if any.
307     *
308     * @return the {@link StrategyHandler}, or {@code null} when not configured
309     */
310    public StrategyHandler getStrategyHandler() {
311        return strategyHandler;
312    }
313
314    private Space grabSpace (Element e) throws ConfigurationException
315    {
316        String uri = e != null ? e.getText() : "";
317        return SpaceFactory.getSpace (uri);
318    }
319
320    public boolean isConnected() {
321        for (MUX m : mux)
322            if (isUsable(m))
323                return true;
324        return false;
325    }
326
327    @SuppressWarnings("unchecked")
328    private boolean isUsable (MUX mux) {
329        if (!checkEnabled || !(mux instanceof QMUX))
330            return mux.isConnected();
331
332        QMUX qmux = (QMUX) mux;
333        String enabledKey = qmux.getName() + ".enabled";
334        String[] readyNames = qmux.getReadyIndicatorNames();
335        if (readyNames != null && readyNames.length == 1) {
336            // check that 'mux.enabled' entry has the same content as 'ready'
337            return mux.isConnected() && sp.rdp (enabledKey) == sp.rdp (readyNames[0]);
338        }
339        return mux.isConnected() && sp.rdp (enabledKey) != null;
340    }
341
342    private String[] toStringArray (String s) {
343        return (s != null && s.length() > 0) ? ISOUtil.toStringArray(s) : null;
344    }
345
346
347    /**
348     * A class implementing this interface can be added to a {@link MUXPool} to override the classical built-in strategies.<br>
349     *
350     * It could be added to a {@code MUXPool} like this:<br>
351     *
352     * <pre>
353     *    &lt;mux class="org.jpos.q2.iso.MUXPool" logger="Q2" name="my-pool">
354     *      &lt;muxes>mux1 mux2 mux3&lt;/muxes>
355     *      &lt;strategy>round-robin&lt;/strategy>
356     *
357     *      &lt;strategy-handler class="xxx.yyy.MyPoolStrategy">
358     *        &lt;!-- some config here --&gt;
359     *      &lt;/strategy-handler>
360     *    &lt;/mux>
361     * </pre>
362     *
363     * If the {@code strategy-handler} returns {@code null}, the {@link MUXPool} will fall back to the
364     * defined {@code strategy} (or the default one, if none defined).
365     *
366     * @author barspi@transactility.com
367     */
368    public interface StrategyHandler {
369        /** If this method returns null, the {@link MUXPool} will fall back to the configured built-in
370         *  strategy.
371         *
372         * @param pool the {@link MUXPool} using this strategy handler
373         * @param m the {@link ISOMsg} that we wish to send
374         * @param maxWait deadline in milliseconds (epoch value as given by {@code System.currentTimeMillis()})
375         * @return an appropriate {@link MUX} for this strategy, or {@code null} if none is found
376         */
377        MUX getMUX(MUXPool pool, ISOMsg m, long maxWait);
378    }
379}