001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import io.micrometer.core.instrument.Gauge; 022import io.micrometer.core.instrument.Tags; 023import io.micrometer.core.instrument.binder.BaseUnits; 024import org.jdom2.Element; 025import org.jpos.core.ConfigurationException; 026import org.jpos.core.Environment; 027import org.jpos.iso.*; 028import org.jpos.metrics.MeterFactory; 029import org.jpos.metrics.MeterInfo; 030import org.jpos.metrics.iso.ISOMsgMetrics; 031import org.jpos.q2.QBeanSupport; 032import org.jpos.q2.QFactory; 033import org.jpos.space.LocalSpace; 034import org.jpos.space.Space; 035import org.jpos.space.SpaceFactory; 036import org.jpos.space.SpaceListener; 037import org.jpos.util.LogSource; 038import org.jpos.util.NameRegistrar; 039 040import java.util.concurrent.Executors; 041import java.util.concurrent.atomic.AtomicInteger; 042 043/** 044 * ISO Server wrapper. 045 * 046 * @author Alwyn Schoeman 047 * @version $Revision$ $Date$ 048 */ 049 050@SuppressWarnings("unchecked") 051public class QServer 052 extends QBeanSupport 053 implements QServerMBean, SpaceListener, ISORequestListener 054{ 055 private int port = 0; 056 private int maxSessions = 100; 057 private String channelString, packagerString, socketFactoryString; 058 private ISOChannel channel = null; // is never connected; but passed to ISOServer as a clonable "template" for new connections 059 private ISOServer server; 060 protected LocalSpace sp; 061 private String inQueue; 062 private String outQueue; 063 private String sendMethod; 064 AtomicInteger msgn = new AtomicInteger(); 065 066 private Gauge connectionsGauge; 067 private static final String CHANNEL_NAME_REGEXP = " (?=\\d+ \\S+:\\S+)"; 068 069 public QServer () { 070 super (); 071 } 072 073 @Override 074 public void initService() throws ConfigurationException { 075 Element e = getPersist (); 076 sp = grabSpace (e.getChild ("space")); 077 } 078 079 private void newChannel () throws ConfigurationException { 080 Element persist = getPersist (); 081 Element e = persist.getChild ("channel"); 082 if (e == null) { 083 throw new ConfigurationException ("channel element missing"); 084 } 085 086 ChannelAdaptor adaptor = new ChannelAdaptor (); // leverage adaptor's newChannel logic 087 channel = adaptor.newChannel (e, getFactory ()); 088 } 089 090 private void initServer () 091 throws ConfigurationException 092 { 093 if (port == 0) { 094 throw new ConfigurationException ("Port value not set"); 095 } 096 newChannel(); 097 if (channel == null) { 098 throw new ConfigurationException ("ISO Channel is null"); 099 } 100 101 if (!(channel instanceof ServerChannel)) { 102 throw new ConfigurationException (channelString + 103 "does not implement ServerChannel"); 104 } 105 106 server = new ISOServer (port, (ServerChannel) channel, maxSessions); 107 server.setLogger (log.getLogger(), getName() + ".server"); 108 server.setName (getName ()); 109 if (socketFactoryString != null) { 110 ISOServerSocketFactory sFac = getFactory().newInstance(socketFactoryString); 111 if (sFac instanceof LogSource ls) { 112 ls.setLogger(log.getLogger(),getName() + ".socket-factory"); 113 } 114 server.setSocketFactory(sFac); 115 } 116 getFactory().setConfiguration (server, getPersist()); 117 addServerSocketFactory(); 118 addListeners ();// ISORequestListener 119 addISOServerConnectionListeners(); 120 NameRegistrar.register (getName(), this); 121 initMeters(); // meters need 'server' to be initialized 122 Executors.newVirtualThreadPerTaskExecutor().submit(server); 123 } 124 private void initIn() { 125 Element persist = getPersist(); 126 inQueue = Environment.get(persist.getChildTextTrim("in")); 127 if (inQueue != null) { 128 /* 129 * We have an 'in' queue to monitor for messages to be 130 * sent out through server in our (SpaceListener)notify(Object, Object) method. 131 */ 132 sp.addListener(inQueue, this); 133 } 134 } 135 private void initOut() { 136 Element persist = getPersist(); 137 outQueue = Environment.get(persist.getChildTextTrim("out")); 138 if (outQueue != null) { 139 /* 140 * We have an 'out' queue to send any messages to that are received 141 * by our requestListener(this). 142 * 143 * Note, if additional ISORequestListeners are registered with the server after 144 * this point, then they won't see anything as our process(ISOSource, ISOMsg) 145 * always return true. 146 */ 147 server.addISORequestListener(this); 148 } 149 } 150 @Override 151 public void startService () { 152 try { 153 initServer (); 154 initIn(); 155 initOut(); 156 initWhoToSendTo(); 157 } catch (Exception e) { 158 getLog().warn ("error starting service", e); 159 } 160 } 161 private void initWhoToSendTo() { 162 Element persist = getPersist(); 163 sendMethod = persist.getChildText("send-request"); 164 if (sendMethod==null) { 165 sendMethod="LAST"; 166 } 167 } 168 169 @Override 170 public void stopService () { 171 if (server != null) { 172 server.shutdown (); 173 sp.removeListener(inQueue, this); 174 } 175 removeMeters(); 176 } 177 @Override 178 public void destroyService () { 179 NameRegistrar.unregister (getName()); 180 NameRegistrar.unregister ("server." + getName()); 181 } 182 183 @Override 184 public synchronized void setPort (int port) { 185 this.port = port; 186 setAttr (getAttrs (), "port", port); 187 setModified (true); 188 } 189 190 @Override 191 public int getPort () { 192 return port; 193 } 194 195 @Override 196 public synchronized void setPackager (String packager) { 197 packagerString = packager; 198 setAttr (getAttrs (), "packager", packagerString); 199 setModified (true); 200 } 201 202 @Override 203 public String getPackager () { 204 return packagerString; 205 } 206 207 @Override 208 public synchronized void setChannel (String channel) { 209 channelString = channel; 210 setAttr (getAttrs (), "channel", channelString); 211 setModified (true); 212 } 213 214 @Override 215 public String getChannel () { 216 return channelString; 217 } 218 219 @Override 220 public synchronized void setMaxSessions (int maxSessions) { 221 this.maxSessions = maxSessions; 222 setAttr (getAttrs (), "maxSessions", maxSessions); 223 setModified (true); 224 } 225 226 @Override 227 public int getMaxSessions () { 228 return maxSessions; 229 } 230 231 @Override 232 public synchronized void setSocketFactory (String sFactory) { 233 socketFactoryString = sFactory; 234 setAttr (getAttrs(),"socketFactory", socketFactoryString); 235 setModified (true); 236 } 237 238 @Override 239 public String getSocketFactory() { 240 return socketFactoryString; 241 } 242 243 @Override 244 public String getISOChannelNames() { 245 return server.getISOChannelNames(); 246 } 247 248 public ISOServer getISOServer() { 249 return server; 250 } 251 252 @Override 253 public String getCountersAsString () { 254 return server.getCountersAsString (); 255 } 256 @Override 257 public String getCountersAsString (String isoChannelName) { 258 return server.getCountersAsString (isoChannelName); 259 } 260 261 private void addServerSocketFactory () throws ConfigurationException { 262 QFactory factory = getFactory (); 263 Element serverSocketFactoryElement = getPersist().getChild ("server-socket-factory"); 264 if (serverSocketFactoryElement != null) { 265 ISOServerSocketFactory serverSocketFactory= factory.newInstance(serverSocketFactoryElement); 266 if (serverSocketFactory != null) 267 server.setSocketFactory(serverSocketFactory); 268 } 269 270 } 271 272 private void addListeners () throws ConfigurationException { 273 QFactory factory = getFactory (); 274 for (Element l : getPersist().getChildren("request-listener")) { 275 ISORequestListener listener = factory.newInstance(l); 276 if (listener != null) 277 server.addISORequestListener (listener); 278 } 279 } 280 281 private void addISOServerConnectionListeners() throws ConfigurationException { 282 QFactory factory = getFactory (); 283 for (Element l : getPersist().getChildren("connection-listener")) { 284 ISOServerEventListener listener = factory.newInstance(l); 285 if (listener != null) 286 server.addServerEventListener(listener); 287 } 288 } 289 290 291 private LocalSpace grabSpace (Element e) throws ConfigurationException 292 { 293 String uri = e != null ? Environment.get(e.getTextTrim()) : ""; 294 Space sp = SpaceFactory.getSpace (uri); 295 if (sp instanceof LocalSpace) { 296 return (LocalSpace) sp; 297 } 298 throw new ConfigurationException ("Invalid space " + uri); 299 } 300 301 /* 302 * This method will be invoked through the SpaceListener interface we registered once 303 * we noticed we had an 'in' queue. 304 */ 305 @Override 306 public void notify(Object key, Object value) { 307 Object obj = sp.inp(key); 308 if (obj instanceof ISOMsg) { 309 ISOMsg m = (ISOMsg) obj; 310 if ("LAST".equals(sendMethod)) { 311 try { 312 ISOChannel c = server.getLastConnectedISOChannel(); 313 if (c == null) { 314 throw new ISOException("Server has no active connections"); 315 } 316 if (!c.isConnected()) { 317 throw new ISOException("Client disconnected"); 318 } 319 c.send(m); 320 } 321 catch (Exception e) { 322 getLog().warn("notify", e); 323 } 324 } 325 else if ("ALL".equals(sendMethod)) { 326 String channelNames = getISOChannelNames(); 327 String[] channelName; 328 if (channelNames != null) { 329 channelName = channelNames.split(CHANNEL_NAME_REGEXP); 330 for (String s : channelName) { 331 try { 332 ISOChannel c = server.getISOChannel(s); 333 if (c == null) { 334 throw new ISOException("Server has no active connections"); 335 } 336 if (!c.isConnected()) { 337 throw new ISOException("Client disconnected"); 338 } 339 c.send(m); 340 } catch (Exception e) { 341 getLog().warn("notify", e); 342 } 343 } 344 } 345 } 346 else if ("RR".equals(sendMethod)) { 347 String channelNames = getISOChannelNames(); 348 String[] channelName; 349 if (channelNames != null) { 350 channelName = channelNames.split(CHANNEL_NAME_REGEXP); 351 try { 352 ISOChannel c = server.getISOChannel(channelName[msgn.incrementAndGet() % channelName.length]); 353 if (c == null) { 354 throw new ISOException("Server has no active connections"); 355 } 356 if (!c.isConnected()) { 357 throw new ISOException("Client disconnected"); 358 } 359 c.send(m); 360 } catch (Exception e) { 361 getLog().warn("notify", e); 362 } 363 } 364 } 365 } 366 } 367 368 /* 369 * This method will be invoked through the ISORequestListener interface, *if* 370 * this QServer has an 'out' queue to handle. 371 */ 372 @Override 373 public boolean process(ISOSource source, ISOMsg m) { 374 sp.out(outQueue, m); 375 return true; 376 } 377 378 private void initMeters() { 379 var tags = Tags.of("name", getName(), 380 "type", "server"); 381 var registry = getServer().getMeterRegistry(); 382 383 connectionsGauge = 384 MeterFactory.gauge 385 (registry, MeterInfo.ISOSERVER_CONNECTION_COUNT, 386 tags.and("port", ""+getPort()), 387 BaseUnits.SESSIONS, 388 server::getActiveConnections 389 ); 390 391 if (channel instanceof ISOMsgMetrics.Source ms) { 392 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 393 if (mtr != null) { 394 mtr.addTags(tags); 395 mtr.register(registry); 396 } 397 } 398 } 399 400 private void removeMeters() { 401 var registry = getServer().getMeterRegistry(); 402 registry.remove(connectionsGauge); 403 404 if (channel instanceof ISOMsgMetrics.Source ms) { 405 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 406 if (mtr != null) 407 mtr.removeMeters(); 408 } 409 } 410}