001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.iso.*;
024import org.jpos.q2.QBeanSupport;
025import org.jpos.q2.QFactory;
026import org.jpos.space.Space;
027import org.jpos.space.SpaceFactory;
028import org.jpos.util.NameRegistrar;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.concurrent.atomic.AtomicInteger;
034
035/**
036 * @author apr
037 */
038public class MUXPool extends QBeanSupport implements MUX, MUXPoolMBean {
039    public static final int PRIMARY_SECONDARY = 0;
040    public static final int ROUND_ROBIN = 1;
041    public static final int ROUND_ROBIN_WITH_OVERRIDE = 2;
042    public static final int SPLIT_BY_DIVISOR = 3;
043
044    int strategy = 0;
045    String[] muxName;
046    MUX[] mux;
047    AtomicInteger msgno = new AtomicInteger();
048    String[] overrideMTIs;
049    String originalChannelField = "";
050    String splitField = "";
051    boolean checkEnabled;
052    Space sp;
053    StrategyHandler strategyHandler;
054
055    public void initService () throws ConfigurationException {
056        Element e = getPersist ();
057        muxName = toStringArray(e.getChildTextTrim ("muxes"));
058        strategy = getStrategy(e.getChildTextTrim("strategy"));
059        overrideMTIs = toStringArray(e.getChildTextTrim("follower-override"));
060        originalChannelField = e.getChildTextTrim("original-channel-field");
061        splitField = e.getChildTextTrim("split-field");
062        checkEnabled = cfg.getBoolean("check-enabled");
063        sp = grabSpace (e.getChild ("space"));
064
065        // Sanitize muxes list using only muxes that exist
066        if (muxName == null)
067            throw new ConfigurationException(
068                String.format("<muxes> element not configured for %s '%s'", getClass().getName(), getName()));
069
070        List<MUX> muxes = new ArrayList<>();
071        List<String> found = new ArrayList<>();
072        List<String> notFound = new ArrayList<>();
073        for (String s : muxName) {
074            MUX m = NameRegistrar.getIfExists("mux." + s);
075            if (m != null) {
076                found.add(s);
077                muxes.add(m);
078            } else
079                notFound.add(s);
080        }
081
082        if (!notFound.isEmpty()) {
083            log.warn("MUXPool "+getName()+": some muxes not found and will be removed "+notFound);
084        }
085        if (muxes.isEmpty()) {
086            throw new ConfigurationException ("MUXPool "+getName()+" has no available muxes");
087        }
088
089        muxName = found.toArray(new String[0]);
090        mux = muxes.toArray(new MUX[0]);
091
092        initHandler(e.getChild("strategy-handler"));
093        NameRegistrar.register ("mux."+getName (), this);
094    }
095
096    public void stopService () {
097        NameRegistrar.unregister ("mux."+getName ());
098    }
099
100    protected void initHandler(Element e) throws ConfigurationException {
101        if (e == null)
102            return;
103        strategyHandler = getFactory().newInstance(e);
104    }
105
106    public ISOMsg request (ISOMsg m, long timeout) throws ISOException {
107        if (timeout == 0) {
108            // a zero timeout intent is to fire-and-forget,
109            // you should use 'send' instead of 'request'
110            try {
111                send(m);
112            } catch (IOException e) {
113                throw new ISOException(e.getMessage(), e);
114            }
115        }
116
117        long maxWait = System.currentTimeMillis() + timeout;
118        MUX mux = getMUX(m,maxWait);
119        if (mux != null) {
120            long remainingTimeout = maxWait - System.currentTimeMillis();
121            if (remainingTimeout >= 0)
122                return mux.request(m, remainingTimeout);
123        }
124        return null;
125    }
126
127    public void request(ISOMsg m, long timeout, final ISOResponseListener r, final Object handBack)
128        throws ISOException
129    {
130        if (timeout == 0) {
131            // a zero timeout intent is to fire-and-forget,
132            // you should use 'send' instead of 'request'
133            try {
134                send(m);
135                new Thread(() -> r.expired(handBack)).start();
136            } catch (IOException e) {
137                throw new ISOException(e.getMessage(), e);
138            }
139        }
140
141        long maxWait = System.currentTimeMillis() + timeout;
142        MUX mux = getMUX(m,maxWait);
143        if (mux != null) {
144            long remainingTimeout = maxWait - System.currentTimeMillis();
145            if (remainingTimeout >= 0)
146                mux.request(m, remainingTimeout, r, handBack);
147            else {
148                new Thread(()->r.expired(handBack)).start();
149            }
150        } else
151            throw new ISOException ("No MUX available");
152    }
153
154    public void send (ISOMsg m) throws ISOException, IOException {
155        long maxWait = System.currentTimeMillis() + 1000L; // reasonable default
156        MUX mux = getMUX(m,maxWait);
157
158        if (mux == null)
159            throw new ISOException ("No available MUX");
160
161        mux.send(m);
162    }
163
164    protected MUX firstAvailableMUX (long maxWait) {
165        return nextAvailableMUX(0, maxWait);
166    }
167
168    protected MUX nextAvailableMUX (int mnumber, long maxWait) {
169        do {
170            for (int i=0; i<mux.length; i++) {
171                int j = (mnumber+i) % mux.length;
172                if (isUsable(mux[j]))
173                    return mux[j];
174                msgno.incrementAndGet();
175            }
176            ISOUtil.sleep (1000);
177        } while (System.currentTimeMillis() < maxWait);
178        return null;
179    }
180
181    private boolean overrideMTI(String mtiReq) {
182        if(overrideMTIs != null){
183            for (String mti : overrideMTIs) {
184                if(mti.equals(mtiReq))
185                    return true;
186            }
187        }
188        return false;
189    }
190
191    private MUX nextAvailableWithOverrideMUX(ISOMsg m, long maxWait) {
192        try{
193            if(originalChannelField != null && !"".equals(originalChannelField)){
194                String channelName = m.getString(originalChannelField);
195                if(channelName != null && !"".equals(channelName) && overrideMTI(m.getMTI())){
196                    ChannelAdaptor channel = NameRegistrar.get (channelName);
197                    for (MUX mx : mux) {
198                        if(channel != null && ((QMUX)mx).getInQueue().equals(channel.getOutQueue())){
199                            if(isUsable(mx))
200                                return mx;
201                        }
202                    }
203                }
204            }
205            return nextAvailableMUX(msgno.incrementAndGet(), maxWait);
206        }catch(Exception e){
207            getLog().warn(e);
208        }
209        return null;
210    }
211
212    private MUX splitByDivisorMUX(ISOMsg m, long maxWait) {
213        try{
214            if(splitField != null && !"".equals(splitField)){
215                String split = m.getString(splitField);
216                if(split != null && ISOUtil.isNumeric(split, 10)){
217                    MUX mx = mux[Integer.parseInt(split) % mux.length];
218                    if(isUsable(mx))
219                        return mx;
220                }
221            }
222            return nextAvailableMUX(msgno.incrementAndGet(), maxWait);
223        }catch(Exception e){
224            getLog().warn(e);
225        }
226        return null;
227    }
228
229    private int getStrategy(String stg) {
230        if (stg == null)
231            return PRIMARY_SECONDARY;
232
233        stg = stg.trim();
234        switch (stg) {
235            case "round-robin": return ROUND_ROBIN;
236            case "round-robin-with-override": return ROUND_ROBIN_WITH_OVERRIDE;
237            case "split-by-divisor": return SPLIT_BY_DIVISOR;
238            default: return PRIMARY_SECONDARY;
239        }
240    }
241
242    private MUX getMUX(ISOMsg m, long maxWait) {
243        // maxWait should be interpreted as currentTimeMillis "deadline"
244        MUX mux = null;
245        if (strategyHandler != null) {
246             mux = strategyHandler.getMUX(this, m, maxWait);
247        }
248
249        if (mux != null)
250            return mux;
251
252        switch (strategy) {
253            case ROUND_ROBIN: return nextAvailableMUX(msgno.incrementAndGet(), maxWait);
254            case ROUND_ROBIN_WITH_OVERRIDE: return nextAvailableWithOverrideMUX(m, maxWait);
255            case SPLIT_BY_DIVISOR: return splitByDivisorMUX(m, maxWait);
256            default: return firstAvailableMUX(maxWait);
257        }
258    }
259
260    @Override
261    public String[] getMuxNames() {
262        return muxName;
263    }
264
265    @Override
266    public int getStrategy() {
267        return strategy;
268    }
269
270    public StrategyHandler getStrategyHandler() {
271        return strategyHandler;
272    }
273
274    private Space grabSpace (Element e) throws ConfigurationException
275    {
276        String uri = e != null ? e.getText() : "";
277        return SpaceFactory.getSpace (uri);
278    }
279
280    public boolean isConnected() {
281        for (MUX m : mux)
282            if (isUsable(m))
283                return true;
284        return false;
285    }
286
287    @SuppressWarnings("unchecked")
288    private boolean isUsable (MUX mux) {
289        if (!checkEnabled || !(mux instanceof QMUX))
290            return mux.isConnected();
291
292        QMUX qmux = (QMUX) mux;
293        String enabledKey = qmux.getName() + ".enabled";
294        String[] readyNames = qmux.getReadyIndicatorNames();
295        if (readyNames != null && readyNames.length == 1) {
296            // check that 'mux.enabled' entry has the same content as 'ready'
297            return mux.isConnected() && sp.rdp (enabledKey) == sp.rdp (readyNames[0]);
298        }
299        return mux.isConnected() && sp.rdp (enabledKey) != null;
300    }
301
302    private String[] toStringArray (String s) {
303        return (s != null && s.length() > 0) ? ISOUtil.toStringArray(s) : null;
304    }
305
306
307    /**
308     * A class implementing this interface can be added to a {@link MUXPool} to override the classical built-in strategies.<br>
309     *
310     * It could be added to a {@code MUXPool} like this:<br>
311     *
312     * <pre>
313     *    &lt;mux class="org.jpos.q2.iso.MUXPool" logger="Q2" name="my-pool">
314     *      &lt;muxes>mux1 mux2 mux3&lt;/muxes>
315     *      &lt;strategy>round-robin&lt;/strategy>
316     *
317     *      &lt;strategy-handler class="xxx.yyy.MyPoolStrategy">
318     *        &lt;!-- some config here --&gt;
319     *      &lt;/strategy-handler>
320     *    &lt;/mux>
321     * </pre>
322     *
323     * If the {@code strategy-handler} returns {@code null}, the {@link MUXPool} will fall back to the
324     * defined {@code strategy} (or the default one, if none defined).
325     *
326     * @author barspi@transactility.com
327     */
328    public interface StrategyHandler {
329        /** If this method returns null, the {@link MUXPool} will fall back to the configured built-in
330         *  strategy.
331         *
332         * @param pool the {@link MUXPool} using this strategy handler
333         * @param m the {@link ISOMsg} that we wish to send
334         * @param maxWait deadline in milliseconds (epoch value as given by {@code System.currentTimeMillis()})
335         * @return an appropriate {@link MUX} for this strategy, or {@code null} if none is found
336         */
337        MUX getMUX(MUXPool pool, ISOMsg m, long maxWait);
338    }
339}
340