001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import io.micrometer.core.instrument.Gauge;
022import io.micrometer.core.instrument.Tags;
023import io.micrometer.core.instrument.binder.BaseUnits;
024import org.jdom2.Element;
025import org.jpos.core.ConfigurationException;
026import org.jpos.core.Environment;
027import org.jpos.iso.*;
028import org.jpos.metrics.MeterFactory;
029import org.jpos.metrics.MeterInfo;
030import org.jpos.metrics.iso.ISOMsgMetrics;
031import org.jpos.q2.QBeanSupport;
032import org.jpos.q2.QFactory;
033import org.jpos.space.LocalSpace;
034import org.jpos.space.Space;
035import org.jpos.space.SpaceFactory;
036import org.jpos.space.SpaceListener;
037import org.jpos.util.LogSource;
038import org.jpos.util.NameRegistrar;
039import org.jpos.util.Realm;
040
041import java.util.concurrent.Executors;
042import java.util.concurrent.atomic.AtomicInteger;
043
044/**
045 * ISO Server wrapper.
046 *
047 * @author Alwyn Schoeman
048 * @version $Revision$ $Date$
049 */
050
051@SuppressWarnings("unchecked")
052public class QServer
053    extends QBeanSupport
054    implements QServerMBean, SpaceListener, ISORequestListener
055{
056    private int port = 0;
057    private int maxSessions = 100;
058    private String channelString, packagerString, socketFactoryString;
059    private ISOChannel channel = null;       // is never connected; but passed to ISOServer as a clonable "template" for new connections
060    private ISOServer server;
061    /** Local space used for inbound/outbound queue routing when configured. */
062    protected LocalSpace sp;
063    private String inQueue;
064    private String outQueue;
065    private String sendMethod;
066    AtomicInteger msgn = new AtomicInteger();
067
068    private Gauge connectionsGauge;
069    private static final String CHANNEL_NAME_REGEXP = " (?=\\d+ \\S+:\\S+)";
070
071    /** Default constructor. */
072    public QServer () {
073        super ();
074    }
075
076    @Override
077    protected String defaultRealm() {
078        return Realm.COMM_SERVER;
079    }
080
081    @Override
082    public void initService() throws ConfigurationException {
083        Element e = getPersist ();
084        sp        = grabSpace (e.getChild ("space"));
085    }
086
087    private void newChannel () throws ConfigurationException {
088        Element persist = getPersist ();
089        Element e = persist.getChild ("channel");
090        if (e == null) {
091            throw new ConfigurationException ("channel element missing");
092        }
093
094        ChannelAdaptor adaptor = new ChannelAdaptor ();     // leverage adaptor's newChannel logic
095        channel = adaptor.newChannel (e, getFactory (), getRealm());
096    }
097
098    private void initServer ()
099        throws ConfigurationException
100    {
101        if (port == 0) {
102            throw new ConfigurationException ("Port value not set");
103        }
104        newChannel();
105        if (channel == null) {
106            throw new ConfigurationException ("ISO Channel is null");
107        }
108
109        if (!(channel instanceof ServerChannel)) {
110            throw new ConfigurationException (channelString +
111                  "does not implement ServerChannel");
112        }
113
114        server = new ISOServer (port, (ServerChannel) channel, maxSessions);
115        server.setLogger (log.getLogger(), getRealm());
116        server.setName (getName ());
117        if (socketFactoryString != null) {
118            ISOServerSocketFactory sFac = getFactory().newInstance(socketFactoryString);
119            if (sFac instanceof LogSource ls) {
120                ls.setLogger(log.getLogger(), getRealm());
121            }
122            server.setSocketFactory(sFac);
123        }
124        getFactory().setConfiguration (server, getPersist());
125        addServerSocketFactory();
126        addListeners ();// ISORequestListener
127        addISOServerConnectionListeners();
128        NameRegistrar.register (getName(), this);
129        initMeters(); // meters need 'server' to be initialized
130        Executors.newVirtualThreadPerTaskExecutor().submit(server);
131    }
132    private void initIn() {
133        Element persist = getPersist();
134        inQueue = Environment.get(persist.getChildTextTrim("in"));
135        if (inQueue != null) {
136            /*
137             * We have an 'in' queue to monitor for messages to be
138             * sent out through server in our (SpaceListener)notify(Object, Object) method.
139             */
140            sp.addListener(inQueue, this);
141        }
142    }
143    private void initOut() {
144        Element persist = getPersist();
145        outQueue = Environment.get(persist.getChildTextTrim("out"));
146        if (outQueue != null) {
147            /*
148             * We have an 'out' queue to send any messages to that are received
149             * by our requestListener(this).
150             *
151             * Note, if additional ISORequestListeners are registered with the server after
152             *  this point, then they won't see anything as our process(ISOSource, ISOMsg)
153             *  always return true.
154             */
155           server.addISORequestListener(this);
156        }
157    }
158    @Override
159    public void startService () {
160        try {
161            initServer ();
162            initIn();
163            initOut();
164            initWhoToSendTo();
165        } catch (Exception e) {
166            getLog().warn ("error starting service", e);
167        }
168    }
169    private void initWhoToSendTo() {
170        Element persist = getPersist();
171        sendMethod = persist.getChildText("send-request");
172        if (sendMethod==null) {
173            sendMethod="LAST";
174        }
175    }
176
177    @Override
178    public void stopService () {
179        if (server != null) {
180            server.shutdown ();
181            sp.removeListener(inQueue, this);
182        }
183        removeMeters();
184    }
185    @Override
186    public void destroyService () {
187        NameRegistrar.unregister (getName());
188        NameRegistrar.unregister ("server." + getName());
189    }
190
191    @Override
192    public synchronized void setPort (int port) {
193        this.port = port;
194        setAttr (getAttrs (), "port", port);
195        setModified (true);
196    }
197
198    @Override
199    public int getPort () {
200        return port;
201    }
202
203    @Override
204    public synchronized void setPackager (String packager) {
205        packagerString = packager;
206        setAttr (getAttrs (), "packager", packagerString);
207        setModified (true);
208    }
209
210    @Override
211    public String getPackager () {
212        return packagerString;
213    }
214
215    @Override
216    public synchronized void setChannel (String channel) {
217        channelString = channel;
218        setAttr (getAttrs (), "channel", channelString);
219        setModified (true);
220    }
221
222    @Override
223    public String getChannel () {
224        return channelString;
225    }
226
227    @Override
228    public synchronized void setMaxSessions (int maxSessions) {
229        this.maxSessions = maxSessions;
230        setAttr (getAttrs (), "maxSessions", maxSessions);
231        setModified (true);
232    }
233
234    @Override
235    public int getMaxSessions () {
236        return maxSessions;
237    }
238
239    @Override
240    public synchronized void setSocketFactory (String sFactory) {
241        socketFactoryString = sFactory;
242        setAttr (getAttrs(),"socketFactory", socketFactoryString);
243        setModified (true);
244    }
245
246    @Override
247    public String getSocketFactory() {
248        return socketFactoryString;
249    }
250
251    @Override
252    public String getISOChannelNames() {
253        return server.getISOChannelNames();
254    }
255
256    /**
257     * Returns the underlying {@link ISOServer} created by this bean.
258     *
259     * @return the live server, or {@code null} if not yet started
260     */
261    public ISOServer getISOServer() {
262        return server;
263    }
264
265    @Override
266    public String getCountersAsString () {
267        return server.getCountersAsString ();
268    }
269    @Override
270    public String getCountersAsString (String isoChannelName) {
271        return server.getCountersAsString (isoChannelName);
272    }
273
274    private void addServerSocketFactory () throws ConfigurationException {
275        QFactory factory = getFactory ();
276        Element serverSocketFactoryElement = getPersist().getChild ("server-socket-factory");
277        if (serverSocketFactoryElement != null) {
278            ISOServerSocketFactory serverSocketFactory= factory.newInstance(serverSocketFactoryElement);
279            if (serverSocketFactory != null)
280                server.setSocketFactory(serverSocketFactory);
281        }
282
283    }
284
285    private void addListeners () throws ConfigurationException {
286        QFactory factory = getFactory ();
287        for (Element l : getPersist().getChildren("request-listener")) {
288            ISORequestListener listener = factory.newInstance(l);
289            if (listener != null)
290                server.addISORequestListener (listener);
291        }
292    }
293
294    private void addISOServerConnectionListeners() throws ConfigurationException {
295        QFactory factory = getFactory ();
296        for (Element l : getPersist().getChildren("connection-listener")) {
297            ISOServerEventListener listener = factory.newInstance(l);
298            if (listener != null)
299                server.addServerEventListener(listener);
300        }
301    }
302
303
304    private LocalSpace grabSpace (Element e) throws ConfigurationException
305    {
306        String uri = e != null ? Environment.get(e.getTextTrim()) : "";
307        Space sp = SpaceFactory.getSpace (uri);
308        if (sp instanceof LocalSpace) {
309            return (LocalSpace) sp;
310        }
311        throw new ConfigurationException ("Invalid space " + uri);
312    }
313
314    /*
315     * This method will be invoked through the SpaceListener interface we registered once
316     * we noticed we had an 'in' queue.
317     */
318    @Override
319    public void notify(Object key, Object value) {
320        Object obj = sp.inp(key);
321        if (obj instanceof ISOMsg) {
322            ISOMsg m = (ISOMsg) obj;
323            if ("LAST".equals(sendMethod)) {
324                try {
325                    ISOChannel c = server.getLastConnectedISOChannel();
326                    if (c == null) {
327                        throw new ISOException("Server has no active connections");
328                    }
329                    if (!c.isConnected()) {
330                        throw new ISOException("Client disconnected");
331                    }
332                    c.send(m);
333                }
334                catch (Exception e) {
335                    getLog().warn("notify", e);
336                }
337            }
338            else if ("ALL".equals(sendMethod)) {
339                String channelNames = getISOChannelNames();
340                String[] channelName;
341                if (channelNames != null) {
342                    channelName = channelNames.split(CHANNEL_NAME_REGEXP);
343                    for (String s : channelName) {
344                        try {
345                            ISOChannel c = server.getISOChannel(s);
346                            if (c == null) {
347                                throw new ISOException("Server has no active connections");
348                            }
349                            if (!c.isConnected()) {
350                                throw new ISOException("Client disconnected");
351                            }
352                            c.send(m);
353                        } catch (Exception e) {
354                            getLog().warn("notify", e);
355                        }
356                    }
357                }
358            }
359            else if ("RR".equals(sendMethod)) {
360                String channelNames = getISOChannelNames();
361                String[] channelName;
362                if (channelNames != null) {
363                    channelName = channelNames.split(CHANNEL_NAME_REGEXP);
364                    try {
365                        ISOChannel c = server.getISOChannel(channelName[msgn.incrementAndGet() % channelName.length]);
366                        if (c == null) {
367                            throw new ISOException("Server has no active connections");
368                        }
369                        if (!c.isConnected()) {
370                            throw new ISOException("Client disconnected");
371                        }
372                        c.send(m);
373                    } catch (Exception e) {
374                        getLog().warn("notify", e);
375                    }
376                }
377            }
378        }
379    }
380
381    /*
382     * This method will be invoked through the ISORequestListener interface, *if*
383     * this QServer has an 'out' queue to handle.
384     */
385    @Override
386    public boolean process(ISOSource source, ISOMsg m) {
387        sp.out(outQueue, m);
388        return true;
389    }
390
391    private void initMeters() {
392        var tags =  Tags.of("name", getName(),
393                            "type", "server");
394        var registry = getServer().getMeterRegistry();
395
396        connectionsGauge =
397          MeterFactory.gauge
398            (registry, MeterInfo.ISOSERVER_CONNECTION_COUNT,
399              tags.and("port", ""+getPort()),
400              BaseUnits.SESSIONS,
401              server::getActiveConnections
402            );
403
404        if (channel instanceof ISOMsgMetrics.Source ms) {
405            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
406            if (mtr != null) {
407                mtr.addTags(tags);
408                mtr.register(registry);
409            }
410        }
411    }
412
413    private void removeMeters() {
414        var registry = getServer().getMeterRegistry();
415        registry.remove(connectionsGauge);
416
417        if (channel instanceof ISOMsgMetrics.Source ms) {
418            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
419            if (mtr != null)
420                mtr.removeMeters();
421        }
422    }
423}