001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.iso.*; 024import org.jpos.q2.QBeanSupport; 025import org.jpos.q2.QFactory; 026import org.jpos.space.Space; 027import org.jpos.space.SpaceFactory; 028import org.jpos.util.NameRegistrar; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.concurrent.atomic.AtomicInteger; 034 035/** 036 * @author apr 037 */ 038public class MUXPool extends QBeanSupport implements MUX, MUXPoolMBean { 039 public static final int PRIMARY_SECONDARY = 0; 040 public static final int ROUND_ROBIN = 1; 041 public static final int ROUND_ROBIN_WITH_OVERRIDE = 2; 042 public static final int SPLIT_BY_DIVISOR = 3; 043 044 int strategy = 0; 045 String[] muxName; 046 MUX[] mux; 047 AtomicInteger msgno = new AtomicInteger(); 048 String[] overrideMTIs; 049 String originalChannelField = ""; 050 String splitField = ""; 051 boolean checkEnabled; 052 Space sp; 053 StrategyHandler strategyHandler; 054 055 public void initService () throws ConfigurationException { 056 Element e = getPersist (); 057 muxName = toStringArray(e.getChildTextTrim ("muxes")); 058 strategy = getStrategy(e.getChildTextTrim("strategy")); 059 overrideMTIs = toStringArray(e.getChildTextTrim("follower-override")); 060 originalChannelField = e.getChildTextTrim("original-channel-field"); 061 splitField = e.getChildTextTrim("split-field"); 062 checkEnabled = cfg.getBoolean("check-enabled"); 063 sp = grabSpace (e.getChild ("space")); 064 065 // Sanitize muxes list using only muxes that exist 066 if (muxName == null) 067 throw new ConfigurationException( 068 String.format("<muxes> element not configured for %s '%s'", getClass().getName(), getName())); 069 070 List<MUX> muxes = new ArrayList<>(); 071 List<String> found = new ArrayList<>(); 072 List<String> notFound = new ArrayList<>(); 073 for (String s : muxName) { 074 MUX m = NameRegistrar.getIfExists("mux." + s); 075 if (m != null) { 076 found.add(s); 077 muxes.add(m); 078 } else 079 notFound.add(s); 080 } 081 082 if (!notFound.isEmpty()) { 083 log.warn("MUXPool "+getName()+": some muxes not found and will be removed "+notFound); 084 } 085 if (muxes.isEmpty()) { 086 throw new ConfigurationException ("MUXPool "+getName()+" has no available muxes"); 087 } 088 089 muxName = found.toArray(new String[0]); 090 mux = muxes.toArray(new MUX[0]); 091 092 initHandler(e.getChild("strategy-handler")); 093 NameRegistrar.register ("mux."+getName (), this); 094 } 095 096 public void stopService () { 097 NameRegistrar.unregister ("mux."+getName ()); 098 } 099 100 protected void initHandler(Element e) throws ConfigurationException { 101 if (e == null) 102 return; 103 strategyHandler = getFactory().newInstance(e); 104 } 105 106 public ISOMsg request (ISOMsg m, long timeout) throws ISOException { 107 if (timeout == 0) { 108 // a zero timeout intent is to fire-and-forget, 109 // you should use 'send' instead of 'request' 110 try { 111 send(m); 112 } catch (IOException e) { 113 throw new ISOException(e.getMessage(), e); 114 } 115 } 116 117 long maxWait = System.currentTimeMillis() + timeout; 118 MUX mux = getMUX(m,maxWait); 119 if (mux != null) { 120 long remainingTimeout = maxWait - System.currentTimeMillis(); 121 if (remainingTimeout >= 0) 122 return mux.request(m, remainingTimeout); 123 } 124 return null; 125 } 126 127 public void request(ISOMsg m, long timeout, final ISOResponseListener r, final Object handBack) 128 throws ISOException 129 { 130 if (timeout == 0) { 131 // a zero timeout intent is to fire-and-forget, 132 // you should use 'send' instead of 'request' 133 try { 134 send(m); 135 new Thread(() -> r.expired(handBack)).start(); 136 } catch (IOException e) { 137 throw new ISOException(e.getMessage(), e); 138 } 139 } 140 141 long maxWait = System.currentTimeMillis() + timeout; 142 MUX mux = getMUX(m,maxWait); 143 if (mux != null) { 144 long remainingTimeout = maxWait - System.currentTimeMillis(); 145 if (remainingTimeout >= 0) 146 mux.request(m, remainingTimeout, r, handBack); 147 else { 148 new Thread(()->r.expired(handBack)).start(); 149 } 150 } else 151 throw new ISOException ("No MUX available"); 152 } 153 154 public void send (ISOMsg m) throws ISOException, IOException { 155 long maxWait = System.currentTimeMillis() + 1000L; // reasonable default 156 MUX mux = getMUX(m,maxWait); 157 158 if (mux == null) 159 throw new ISOException ("No available MUX"); 160 161 mux.send(m); 162 } 163 164 protected MUX firstAvailableMUX (long maxWait) { 165 return nextAvailableMUX(0, maxWait); 166 } 167 168 protected MUX nextAvailableMUX (int mnumber, long maxWait) { 169 do { 170 for (int i=0; i<mux.length; i++) { 171 int j = (mnumber+i) % mux.length; 172 if (isUsable(mux[j])) 173 return mux[j]; 174 msgno.incrementAndGet(); 175 } 176 ISOUtil.sleep (1000); 177 } while (System.currentTimeMillis() < maxWait); 178 return null; 179 } 180 181 private boolean overrideMTI(String mtiReq) { 182 if(overrideMTIs != null){ 183 for (String mti : overrideMTIs) { 184 if(mti.equals(mtiReq)) 185 return true; 186 } 187 } 188 return false; 189 } 190 191 private MUX nextAvailableWithOverrideMUX(ISOMsg m, long maxWait) { 192 try{ 193 if(originalChannelField != null && !"".equals(originalChannelField)){ 194 String channelName = m.getString(originalChannelField); 195 if(channelName != null && !"".equals(channelName) && overrideMTI(m.getMTI())){ 196 ChannelAdaptor channel = NameRegistrar.get (channelName); 197 for (MUX mx : mux) { 198 if(channel != null && ((QMUX)mx).getInQueue().equals(channel.getOutQueue())){ 199 if(isUsable(mx)) 200 return mx; 201 } 202 } 203 } 204 } 205 return nextAvailableMUX(msgno.incrementAndGet(), maxWait); 206 }catch(Exception e){ 207 getLog().warn(e); 208 } 209 return null; 210 } 211 212 private MUX splitByDivisorMUX(ISOMsg m, long maxWait) { 213 try{ 214 if(splitField != null && !"".equals(splitField)){ 215 String split = m.getString(splitField); 216 if(split != null && ISOUtil.isNumeric(split, 10)){ 217 MUX mx = mux[Integer.parseInt(split) % mux.length]; 218 if(isUsable(mx)) 219 return mx; 220 } 221 } 222 return nextAvailableMUX(msgno.incrementAndGet(), maxWait); 223 }catch(Exception e){ 224 getLog().warn(e); 225 } 226 return null; 227 } 228 229 private int getStrategy(String stg) { 230 if (stg == null) 231 return PRIMARY_SECONDARY; 232 233 stg = stg.trim(); 234 switch (stg) { 235 case "round-robin": return ROUND_ROBIN; 236 case "round-robin-with-override": return ROUND_ROBIN_WITH_OVERRIDE; 237 case "split-by-divisor": return SPLIT_BY_DIVISOR; 238 default: return PRIMARY_SECONDARY; 239 } 240 } 241 242 private MUX getMUX(ISOMsg m, long maxWait) { 243 // maxWait should be interpreted as currentTimeMillis "deadline" 244 MUX mux = null; 245 if (strategyHandler != null) { 246 mux = strategyHandler.getMUX(this, m, maxWait); 247 } 248 249 if (mux != null) 250 return mux; 251 252 switch (strategy) { 253 case ROUND_ROBIN: return nextAvailableMUX(msgno.incrementAndGet(), maxWait); 254 case ROUND_ROBIN_WITH_OVERRIDE: return nextAvailableWithOverrideMUX(m, maxWait); 255 case SPLIT_BY_DIVISOR: return splitByDivisorMUX(m, maxWait); 256 default: return firstAvailableMUX(maxWait); 257 } 258 } 259 260 @Override 261 public String[] getMuxNames() { 262 return muxName; 263 } 264 265 @Override 266 public int getStrategy() { 267 return strategy; 268 } 269 270 public StrategyHandler getStrategyHandler() { 271 return strategyHandler; 272 } 273 274 private Space grabSpace (Element e) throws ConfigurationException 275 { 276 String uri = e != null ? e.getText() : ""; 277 return SpaceFactory.getSpace (uri); 278 } 279 280 public boolean isConnected() { 281 for (MUX m : mux) 282 if (isUsable(m)) 283 return true; 284 return false; 285 } 286 287 @SuppressWarnings("unchecked") 288 private boolean isUsable (MUX mux) { 289 if (!checkEnabled || !(mux instanceof QMUX)) 290 return mux.isConnected(); 291 292 QMUX qmux = (QMUX) mux; 293 String enabledKey = qmux.getName() + ".enabled"; 294 String[] readyNames = qmux.getReadyIndicatorNames(); 295 if (readyNames != null && readyNames.length == 1) { 296 // check that 'mux.enabled' entry has the same content as 'ready' 297 return mux.isConnected() && sp.rdp (enabledKey) == sp.rdp (readyNames[0]); 298 } 299 return mux.isConnected() && sp.rdp (enabledKey) != null; 300 } 301 302 private String[] toStringArray (String s) { 303 return (s != null && s.length() > 0) ? ISOUtil.toStringArray(s) : null; 304 } 305 306 307 /** 308 * A class implementing this interface can be added to a {@link MUXPool} to override the classical built-in strategies.<br> 309 * 310 * It could be added to a {@code MUXPool} like this:<br> 311 * 312 * <pre> 313 * <mux class="org.jpos.q2.iso.MUXPool" logger="Q2" name="my-pool"> 314 * <muxes>mux1 mux2 mux3</muxes> 315 * <strategy>round-robin</strategy> 316 * 317 * <strategy-handler class="xxx.yyy.MyPoolStrategy"> 318 * <!-- some config here --> 319 * </strategy-handler> 320 * </mux> 321 * </pre> 322 * 323 * If the {@code strategy-handler} returns {@code null}, the {@link MUXPool} will fall back to the 324 * defined {@code strategy} (or the default one, if none defined). 325 * 326 * @author barspi@transactility.com 327 */ 328 public interface StrategyHandler { 329 /** If this method returns null, the {@link MUXPool} will fall back to the configured built-in 330 * strategy. 331 * 332 * @param pool the {@link MUXPool} using this strategy handler 333 * @param m the {@link ISOMsg} that we wish to send 334 * @param maxWait deadline in milliseconds (epoch value as given by {@code System.currentTimeMillis()}) 335 * @return an appropriate {@link MUX} for this strategy, or {@code null} if none is found 336 */ 337 MUX getMUX(MUXPool pool, ISOMsg m, long maxWait); 338 } 339} 340