001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import io.micrometer.core.instrument.Gauge; 022import io.micrometer.core.instrument.Tags; 023import io.micrometer.core.instrument.binder.BaseUnits; 024import org.jdom2.Element; 025import org.jpos.core.ConfigurationException; 026import org.jpos.core.Environment; 027import org.jpos.iso.*; 028import org.jpos.metrics.MeterFactory; 029import org.jpos.metrics.MeterInfo; 030import org.jpos.metrics.iso.ISOMsgMetrics; 031import org.jpos.q2.QBeanSupport; 032import org.jpos.q2.QFactory; 033import org.jpos.space.LocalSpace; 034import org.jpos.space.Space; 035import org.jpos.space.SpaceFactory; 036import org.jpos.space.SpaceListener; 037import org.jpos.util.LogSource; 038import org.jpos.util.NameRegistrar; 039import org.jpos.util.Realm; 040 041import java.util.concurrent.Executors; 042import java.util.concurrent.atomic.AtomicInteger; 043 044/** 045 * ISO Server wrapper. 046 * 047 * @author Alwyn Schoeman 048 * @version $Revision$ $Date$ 049 */ 050 051@SuppressWarnings("unchecked") 052public class QServer 053 extends QBeanSupport 054 implements QServerMBean, SpaceListener, ISORequestListener 055{ 056 private int port = 0; 057 private int maxSessions = 100; 058 private String channelString, packagerString, socketFactoryString; 059 private ISOChannel channel = null; // is never connected; but passed to ISOServer as a clonable "template" for new connections 060 private ISOServer server; 061 /** Local space used for inbound/outbound queue routing when configured. */ 062 protected LocalSpace sp; 063 private String inQueue; 064 private String outQueue; 065 private String sendMethod; 066 AtomicInteger msgn = new AtomicInteger(); 067 068 private Gauge connectionsGauge; 069 private static final String CHANNEL_NAME_REGEXP = " (?=\\d+ \\S+:\\S+)"; 070 071 /** Default constructor. */ 072 public QServer () { 073 super (); 074 } 075 076 @Override 077 protected String defaultRealm() { 078 return Realm.COMM_SERVER; 079 } 080 081 @Override 082 public void initService() throws ConfigurationException { 083 Element e = getPersist (); 084 sp = grabSpace (e.getChild ("space")); 085 } 086 087 private void newChannel () throws ConfigurationException { 088 Element persist = getPersist (); 089 Element e = persist.getChild ("channel"); 090 if (e == null) { 091 throw new ConfigurationException ("channel element missing"); 092 } 093 094 ChannelAdaptor adaptor = new ChannelAdaptor (); // leverage adaptor's newChannel logic 095 channel = adaptor.newChannel (e, getFactory (), getRealm()); 096 } 097 098 private void initServer () 099 throws ConfigurationException 100 { 101 if (port == 0) { 102 throw new ConfigurationException ("Port value not set"); 103 } 104 newChannel(); 105 if (channel == null) { 106 throw new ConfigurationException ("ISO Channel is null"); 107 } 108 109 if (!(channel instanceof ServerChannel)) { 110 throw new ConfigurationException (channelString + 111 "does not implement ServerChannel"); 112 } 113 114 server = new ISOServer (port, (ServerChannel) channel, maxSessions); 115 server.setLogger (log.getLogger(), getRealm()); 116 server.setName (getName ()); 117 if (socketFactoryString != null) { 118 ISOServerSocketFactory sFac = getFactory().newInstance(socketFactoryString); 119 if (sFac instanceof LogSource ls) { 120 ls.setLogger(log.getLogger(), getRealm()); 121 } 122 server.setSocketFactory(sFac); 123 } 124 getFactory().setConfiguration (server, getPersist()); 125 addServerSocketFactory(); 126 addListeners ();// ISORequestListener 127 addISOServerConnectionListeners(); 128 NameRegistrar.register (getName(), this); 129 initMeters(); // meters need 'server' to be initialized 130 Executors.newVirtualThreadPerTaskExecutor().submit(server); 131 } 132 private void initIn() { 133 Element persist = getPersist(); 134 inQueue = Environment.get(persist.getChildTextTrim("in")); 135 if (inQueue != null) { 136 /* 137 * We have an 'in' queue to monitor for messages to be 138 * sent out through server in our (SpaceListener)notify(Object, Object) method. 139 */ 140 sp.addListener(inQueue, this); 141 } 142 } 143 private void initOut() { 144 Element persist = getPersist(); 145 outQueue = Environment.get(persist.getChildTextTrim("out")); 146 if (outQueue != null) { 147 /* 148 * We have an 'out' queue to send any messages to that are received 149 * by our requestListener(this). 150 * 151 * Note, if additional ISORequestListeners are registered with the server after 152 * this point, then they won't see anything as our process(ISOSource, ISOMsg) 153 * always return true. 154 */ 155 server.addISORequestListener(this); 156 } 157 } 158 @Override 159 public void startService () { 160 try { 161 initServer (); 162 initIn(); 163 initOut(); 164 initWhoToSendTo(); 165 } catch (Exception e) { 166 getLog().warn ("error starting service", e); 167 } 168 } 169 private void initWhoToSendTo() { 170 Element persist = getPersist(); 171 sendMethod = persist.getChildText("send-request"); 172 if (sendMethod==null) { 173 sendMethod="LAST"; 174 } 175 } 176 177 @Override 178 public void stopService () { 179 if (server != null) { 180 server.shutdown (); 181 sp.removeListener(inQueue, this); 182 } 183 removeMeters(); 184 } 185 @Override 186 public void destroyService () { 187 NameRegistrar.unregister (getName()); 188 NameRegistrar.unregister ("server." + getName()); 189 } 190 191 @Override 192 public synchronized void setPort (int port) { 193 this.port = port; 194 setAttr (getAttrs (), "port", port); 195 setModified (true); 196 } 197 198 @Override 199 public int getPort () { 200 return port; 201 } 202 203 @Override 204 public synchronized void setPackager (String packager) { 205 packagerString = packager; 206 setAttr (getAttrs (), "packager", packagerString); 207 setModified (true); 208 } 209 210 @Override 211 public String getPackager () { 212 return packagerString; 213 } 214 215 @Override 216 public synchronized void setChannel (String channel) { 217 channelString = channel; 218 setAttr (getAttrs (), "channel", channelString); 219 setModified (true); 220 } 221 222 @Override 223 public String getChannel () { 224 return channelString; 225 } 226 227 @Override 228 public synchronized void setMaxSessions (int maxSessions) { 229 this.maxSessions = maxSessions; 230 setAttr (getAttrs (), "maxSessions", maxSessions); 231 setModified (true); 232 } 233 234 @Override 235 public int getMaxSessions () { 236 return maxSessions; 237 } 238 239 @Override 240 public synchronized void setSocketFactory (String sFactory) { 241 socketFactoryString = sFactory; 242 setAttr (getAttrs(),"socketFactory", socketFactoryString); 243 setModified (true); 244 } 245 246 @Override 247 public String getSocketFactory() { 248 return socketFactoryString; 249 } 250 251 @Override 252 public String getISOChannelNames() { 253 return server.getISOChannelNames(); 254 } 255 256 /** 257 * Returns the underlying {@link ISOServer} created by this bean. 258 * 259 * @return the live server, or {@code null} if not yet started 260 */ 261 public ISOServer getISOServer() { 262 return server; 263 } 264 265 @Override 266 public String getCountersAsString () { 267 return server.getCountersAsString (); 268 } 269 @Override 270 public String getCountersAsString (String isoChannelName) { 271 return server.getCountersAsString (isoChannelName); 272 } 273 274 private void addServerSocketFactory () throws ConfigurationException { 275 QFactory factory = getFactory (); 276 Element serverSocketFactoryElement = getPersist().getChild ("server-socket-factory"); 277 if (serverSocketFactoryElement != null) { 278 ISOServerSocketFactory serverSocketFactory= factory.newInstance(serverSocketFactoryElement); 279 if (serverSocketFactory != null) 280 server.setSocketFactory(serverSocketFactory); 281 } 282 283 } 284 285 private void addListeners () throws ConfigurationException { 286 QFactory factory = getFactory (); 287 for (Element l : getPersist().getChildren("request-listener")) { 288 ISORequestListener listener = factory.newInstance(l); 289 if (listener != null) 290 server.addISORequestListener (listener); 291 } 292 } 293 294 private void addISOServerConnectionListeners() throws ConfigurationException { 295 QFactory factory = getFactory (); 296 for (Element l : getPersist().getChildren("connection-listener")) { 297 ISOServerEventListener listener = factory.newInstance(l); 298 if (listener != null) 299 server.addServerEventListener(listener); 300 } 301 } 302 303 304 private LocalSpace grabSpace (Element e) throws ConfigurationException 305 { 306 String uri = e != null ? Environment.get(e.getTextTrim()) : ""; 307 Space sp = SpaceFactory.getSpace (uri); 308 if (sp instanceof LocalSpace) { 309 return (LocalSpace) sp; 310 } 311 throw new ConfigurationException ("Invalid space " + uri); 312 } 313 314 /* 315 * This method will be invoked through the SpaceListener interface we registered once 316 * we noticed we had an 'in' queue. 317 */ 318 @Override 319 public void notify(Object key, Object value) { 320 Object obj = sp.inp(key); 321 if (obj instanceof ISOMsg) { 322 ISOMsg m = (ISOMsg) obj; 323 if ("LAST".equals(sendMethod)) { 324 try { 325 ISOChannel c = server.getLastConnectedISOChannel(); 326 if (c == null) { 327 throw new ISOException("Server has no active connections"); 328 } 329 if (!c.isConnected()) { 330 throw new ISOException("Client disconnected"); 331 } 332 c.send(m); 333 } 334 catch (Exception e) { 335 getLog().warn("notify", e); 336 } 337 } 338 else if ("ALL".equals(sendMethod)) { 339 String channelNames = getISOChannelNames(); 340 String[] channelName; 341 if (channelNames != null) { 342 channelName = channelNames.split(CHANNEL_NAME_REGEXP); 343 for (String s : channelName) { 344 try { 345 ISOChannel c = server.getISOChannel(s); 346 if (c == null) { 347 throw new ISOException("Server has no active connections"); 348 } 349 if (!c.isConnected()) { 350 throw new ISOException("Client disconnected"); 351 } 352 c.send(m); 353 } catch (Exception e) { 354 getLog().warn("notify", e); 355 } 356 } 357 } 358 } 359 else if ("RR".equals(sendMethod)) { 360 String channelNames = getISOChannelNames(); 361 String[] channelName; 362 if (channelNames != null) { 363 channelName = channelNames.split(CHANNEL_NAME_REGEXP); 364 try { 365 ISOChannel c = server.getISOChannel(channelName[msgn.incrementAndGet() % channelName.length]); 366 if (c == null) { 367 throw new ISOException("Server has no active connections"); 368 } 369 if (!c.isConnected()) { 370 throw new ISOException("Client disconnected"); 371 } 372 c.send(m); 373 } catch (Exception e) { 374 getLog().warn("notify", e); 375 } 376 } 377 } 378 } 379 } 380 381 /* 382 * This method will be invoked through the ISORequestListener interface, *if* 383 * this QServer has an 'out' queue to handle. 384 */ 385 @Override 386 public boolean process(ISOSource source, ISOMsg m) { 387 sp.out(outQueue, m); 388 return true; 389 } 390 391 private void initMeters() { 392 var tags = Tags.of("name", getName(), 393 "type", "server"); 394 var registry = getServer().getMeterRegistry(); 395 396 connectionsGauge = 397 MeterFactory.gauge 398 (registry, MeterInfo.ISOSERVER_CONNECTION_COUNT, 399 tags.and("port", ""+getPort()), 400 BaseUnits.SESSIONS, 401 server::getActiveConnections 402 ); 403 404 if (channel instanceof ISOMsgMetrics.Source ms) { 405 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 406 if (mtr != null) { 407 mtr.addTags(tags); 408 mtr.register(registry); 409 } 410 } 411 } 412 413 private void removeMeters() { 414 var registry = getServer().getMeterRegistry(); 415 registry.remove(connectionsGauge); 416 417 if (channel instanceof ISOMsgMetrics.Source ms) { 418 ISOMsgMetrics mtr = ms.getISOMsgMetrics(); 419 if (mtr != null) 420 mtr.removeMeters(); 421 } 422 } 423}