001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.iso.*; 024import org.jpos.q2.QBeanSupport; 025import org.jpos.q2.QFactory; 026import org.jpos.space.Space; 027import org.jpos.space.SpaceFactory; 028import org.jpos.util.NameRegistrar; 029import org.jpos.util.Realm; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.concurrent.atomic.AtomicInteger; 035 036/** 037 * Pool of {@link MUX} instances that selects a delegate per request based on 038 * the configured strategy (one of {@link #PRIMARY_SECONDARY}, {@link #ROUND_ROBIN}, 039 * {@link #ROUND_ROBIN_WITH_OVERRIDE}, or {@link #SPLIT_BY_DIVISOR}). 040 * 041 * @author apr 042 */ 043public class MUXPool extends QBeanSupport implements MUX, MUXPoolMBean { 044 /** Default constructor; no instance state to initialise. */ 045 public MUXPool() {} 046 /** Strategy: try MUXes in order, falling through to the next when unusable. */ 047 public static final int PRIMARY_SECONDARY = 0; 048 /** Strategy: round-robin selection across the registered MUXes. */ 049 public static final int ROUND_ROBIN = 1; 050 /** Strategy: round-robin selection with per-MTI override routing. */ 051 public static final int ROUND_ROBIN_WITH_OVERRIDE = 2; 052 /** Strategy: hash-by-divisor selection driven by a configured field value. */ 053 public static final int SPLIT_BY_DIVISOR = 3; 054 055 int strategy = 0; 056 String[] muxName; 057 MUX[] mux; 058 AtomicInteger msgno = new AtomicInteger(); 059 String[] overrideMTIs; 060 String originalChannelField = ""; 061 String splitField = ""; 062 boolean checkEnabled; 063 Space sp; 064 StrategyHandler strategyHandler; 065 066 @Override 067 protected String defaultRealm() { 068 return Realm.COMM_MUX; 069 } 070 071 public void initService () throws ConfigurationException { 072 Element e = getPersist (); 073 muxName = toStringArray(e.getChildTextTrim ("muxes")); 074 strategy = getStrategy(e.getChildTextTrim("strategy")); 075 overrideMTIs = toStringArray(e.getChildTextTrim("follower-override")); 076 originalChannelField = e.getChildTextTrim("original-channel-field"); 077 splitField = e.getChildTextTrim("split-field"); 078 checkEnabled = cfg.getBoolean("check-enabled"); 079 sp = grabSpace (e.getChild ("space")); 080 081 // Sanitize muxes list using only muxes that exist 082 if (muxName == null) 083 throw new ConfigurationException( 084 String.format("<muxes> element not configured for %s '%s'", getClass().getName(), getName())); 085 086 List<MUX> muxes = new ArrayList<>(); 087 List<String> found = new ArrayList<>(); 088 List<String> notFound = new ArrayList<>(); 089 for (String s : muxName) { 090 MUX m = NameRegistrar.getIfExists("mux." + s); 091 if (m != null) { 092 found.add(s); 093 muxes.add(m); 094 } else 095 notFound.add(s); 096 } 097 098 if (!notFound.isEmpty()) { 099 log.warn("MUXPool "+getName()+": some muxes not found and will be removed "+notFound); 100 } 101 if (muxes.isEmpty()) { 102 throw new ConfigurationException ("MUXPool "+getName()+" has no available muxes"); 103 } 104 105 muxName = found.toArray(new String[0]); 106 mux = muxes.toArray(new MUX[0]); 107 108 initHandler(e.getChild("strategy-handler")); 109 NameRegistrar.register ("mux."+getName (), this); 110 } 111 112 public void stopService () { 113 NameRegistrar.unregister ("mux."+getName ()); 114 } 115 116 /** 117 * Instantiates a custom {@link StrategyHandler} from an XML configuration element. 118 * 119 * @param e {@code <strategy-handler>} element, or {@code null} to leave the handler unset 120 * @throws ConfigurationException if the handler cannot be instantiated 121 */ 122 protected void initHandler(Element e) throws ConfigurationException { 123 if (e == null) 124 return; 125 strategyHandler = getFactory().newInstance(e); 126 } 127 128 public ISOMsg request (ISOMsg m, long timeout) throws ISOException { 129 if (timeout == 0) { 130 // a zero timeout intent is to fire-and-forget, 131 // you should use 'send' instead of 'request' 132 try { 133 send(m); 134 } catch (IOException e) { 135 throw new ISOException(e.getMessage(), e); 136 } 137 } 138 139 long maxWait = System.currentTimeMillis() + timeout; 140 MUX mux = getMUX(m,maxWait); 141 if (mux != null) { 142 long remainingTimeout = maxWait - System.currentTimeMillis(); 143 if (remainingTimeout >= 0) 144 return mux.request(m, remainingTimeout); 145 } 146 return null; 147 } 148 149 public void request(ISOMsg m, long timeout, final ISOResponseListener r, final Object handBack) 150 throws ISOException 151 { 152 if (timeout == 0) { 153 // a zero timeout intent is to fire-and-forget, 154 // you should use 'send' instead of 'request' 155 try { 156 send(m); 157 new Thread(() -> r.expired(handBack)).start(); 158 } catch (IOException e) { 159 throw new ISOException(e.getMessage(), e); 160 } 161 } 162 163 long maxWait = System.currentTimeMillis() + timeout; 164 MUX mux = getMUX(m,maxWait); 165 if (mux != null) { 166 long remainingTimeout = maxWait - System.currentTimeMillis(); 167 if (remainingTimeout >= 0) 168 mux.request(m, remainingTimeout, r, handBack); 169 else { 170 new Thread(()->r.expired(handBack)).start(); 171 } 172 } else 173 throw new ISOException ("No MUX available"); 174 } 175 176 public void send (ISOMsg m) throws ISOException, IOException { 177 long maxWait = System.currentTimeMillis() + 1000L; // reasonable default 178 MUX mux = getMUX(m,maxWait); 179 180 if (mux == null) 181 throw new ISOException ("No available MUX"); 182 183 mux.send(m); 184 } 185 186 /** 187 * Returns the first usable MUX, scanning from index 0 and waiting up to {@code maxWait}. 188 * 189 * @param maxWait wall-clock deadline in milliseconds since epoch 190 * @return a usable MUX, or {@code null} if none became available before the deadline 191 */ 192 protected MUX firstAvailableMUX (long maxWait) { 193 return nextAvailableMUX(0, maxWait); 194 } 195 196 /** 197 * Returns the next usable MUX starting from {@code mnumber}, waiting up to {@code maxWait}. 198 * 199 * @param mnumber starting index (round-robin counter) 200 * @param maxWait wall-clock deadline in milliseconds since epoch 201 * @return a usable MUX, or {@code null} if none became available before the deadline 202 */ 203 protected MUX nextAvailableMUX (int mnumber, long maxWait) { 204 do { 205 for (int i=0; i<mux.length; i++) { 206 int j = (mnumber+i) % mux.length; 207 if (isUsable(mux[j])) 208 return mux[j]; 209 msgno.incrementAndGet(); 210 } 211 ISOUtil.sleep (1000); 212 } while (System.currentTimeMillis() < maxWait); 213 return null; 214 } 215 216 private boolean overrideMTI(String mtiReq) { 217 if(overrideMTIs != null){ 218 for (String mti : overrideMTIs) { 219 if(mti.equals(mtiReq)) 220 return true; 221 } 222 } 223 return false; 224 } 225 226 private MUX nextAvailableWithOverrideMUX(ISOMsg m, long maxWait) { 227 try{ 228 if(originalChannelField != null && !"".equals(originalChannelField)){ 229 String channelName = m.getString(originalChannelField); 230 if(channelName != null && !"".equals(channelName) && overrideMTI(m.getMTI())){ 231 ChannelAdaptor channel = NameRegistrar.get (channelName); 232 for (MUX mx : mux) { 233 if(channel != null && ((QMUX)mx).getInQueue().equals(channel.getOutQueue())){ 234 if(isUsable(mx)) 235 return mx; 236 } 237 } 238 } 239 } 240 return nextAvailableMUX(msgno.incrementAndGet(), maxWait); 241 }catch(Exception e){ 242 getLog().warn(e); 243 } 244 return null; 245 } 246 247 private MUX splitByDivisorMUX(ISOMsg m, long maxWait) { 248 try{ 249 if(splitField != null && !"".equals(splitField)){ 250 String split = m.getString(splitField); 251 if(split != null && ISOUtil.isNumeric(split, 10)){ 252 MUX mx = mux[Integer.parseInt(split) % mux.length]; 253 if(isUsable(mx)) 254 return mx; 255 } 256 } 257 return nextAvailableMUX(msgno.incrementAndGet(), maxWait); 258 }catch(Exception e){ 259 getLog().warn(e); 260 } 261 return null; 262 } 263 264 private int getStrategy(String stg) { 265 if (stg == null) 266 return PRIMARY_SECONDARY; 267 268 stg = stg.trim(); 269 switch (stg) { 270 case "round-robin": return ROUND_ROBIN; 271 case "round-robin-with-override": return ROUND_ROBIN_WITH_OVERRIDE; 272 case "split-by-divisor": return SPLIT_BY_DIVISOR; 273 default: return PRIMARY_SECONDARY; 274 } 275 } 276 277 private MUX getMUX(ISOMsg m, long maxWait) { 278 // maxWait should be interpreted as currentTimeMillis "deadline" 279 MUX mux = null; 280 if (strategyHandler != null) { 281 mux = strategyHandler.getMUX(this, m, maxWait); 282 } 283 284 if (mux != null) 285 return mux; 286 287 switch (strategy) { 288 case ROUND_ROBIN: return nextAvailableMUX(msgno.incrementAndGet(), maxWait); 289 case ROUND_ROBIN_WITH_OVERRIDE: return nextAvailableWithOverrideMUX(m, maxWait); 290 case SPLIT_BY_DIVISOR: return splitByDivisorMUX(m, maxWait); 291 default: return firstAvailableMUX(maxWait); 292 } 293 } 294 295 @Override 296 public String[] getMuxNames() { 297 return muxName; 298 } 299 300 @Override 301 public int getStrategy() { 302 return strategy; 303 } 304 305 /** 306 * Returns the configured custom strategy handler, if any. 307 * 308 * @return the {@link StrategyHandler}, or {@code null} when not configured 309 */ 310 public StrategyHandler getStrategyHandler() { 311 return strategyHandler; 312 } 313 314 private Space grabSpace (Element e) throws ConfigurationException 315 { 316 String uri = e != null ? e.getText() : ""; 317 return SpaceFactory.getSpace (uri); 318 } 319 320 public boolean isConnected() { 321 for (MUX m : mux) 322 if (isUsable(m)) 323 return true; 324 return false; 325 } 326 327 @SuppressWarnings("unchecked") 328 private boolean isUsable (MUX mux) { 329 if (!checkEnabled || !(mux instanceof QMUX)) 330 return mux.isConnected(); 331 332 QMUX qmux = (QMUX) mux; 333 String enabledKey = qmux.getName() + ".enabled"; 334 String[] readyNames = qmux.getReadyIndicatorNames(); 335 if (readyNames != null && readyNames.length == 1) { 336 // check that 'mux.enabled' entry has the same content as 'ready' 337 return mux.isConnected() && sp.rdp (enabledKey) == sp.rdp (readyNames[0]); 338 } 339 return mux.isConnected() && sp.rdp (enabledKey) != null; 340 } 341 342 private String[] toStringArray (String s) { 343 return (s != null && s.length() > 0) ? ISOUtil.toStringArray(s) : null; 344 } 345 346 347 /** 348 * A class implementing this interface can be added to a {@link MUXPool} to override the classical built-in strategies.<br> 349 * 350 * It could be added to a {@code MUXPool} like this:<br> 351 * 352 * <pre> 353 * <mux class="org.jpos.q2.iso.MUXPool" logger="Q2" name="my-pool"> 354 * <muxes>mux1 mux2 mux3</muxes> 355 * <strategy>round-robin</strategy> 356 * 357 * <strategy-handler class="xxx.yyy.MyPoolStrategy"> 358 * <!-- some config here --> 359 * </strategy-handler> 360 * </mux> 361 * </pre> 362 * 363 * If the {@code strategy-handler} returns {@code null}, the {@link MUXPool} will fall back to the 364 * defined {@code strategy} (or the default one, if none defined). 365 * 366 * @author barspi@transactility.com 367 */ 368 public interface StrategyHandler { 369 /** If this method returns null, the {@link MUXPool} will fall back to the configured built-in 370 * strategy. 371 * 372 * @param pool the {@link MUXPool} using this strategy handler 373 * @param m the {@link ISOMsg} that we wish to send 374 * @param maxWait deadline in milliseconds (epoch value as given by {@code System.currentTimeMillis()}) 375 * @return an appropriate {@link MUX} for this strategy, or {@code null} if none is found 376 */ 377 MUX getMUX(MUXPool pool, ISOMsg m, long maxWait); 378 } 379}