001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import io.micrometer.core.instrument.Gauge;
022import io.micrometer.core.instrument.Tags;
023import io.micrometer.core.instrument.binder.BaseUnits;
024import org.jdom2.Element;
025import org.jpos.core.ConfigurationException;
026import org.jpos.core.Environment;
027import org.jpos.iso.*;
028import org.jpos.metrics.MeterFactory;
029import org.jpos.metrics.MeterInfo;
030import org.jpos.metrics.iso.ISOMsgMetrics;
031import org.jpos.q2.QBeanSupport;
032import org.jpos.q2.QFactory;
033import org.jpos.space.LocalSpace;
034import org.jpos.space.Space;
035import org.jpos.space.SpaceFactory;
036import org.jpos.space.SpaceListener;
037import org.jpos.util.LogSource;
038import org.jpos.util.NameRegistrar;
039
040import java.util.concurrent.Executors;
041import java.util.concurrent.atomic.AtomicInteger;
042
043/**
044 * ISO Server wrapper.
045 *
046 * @author Alwyn Schoeman
047 * @version $Revision$ $Date$
048 */
049
050@SuppressWarnings("unchecked")
051public class QServer
052    extends QBeanSupport
053    implements QServerMBean, SpaceListener, ISORequestListener
054{
055    private int port = 0;
056    private int maxSessions = 100;
057    private String channelString, packagerString, socketFactoryString;
058    private ISOChannel channel = null;       // is never connected; but passed to ISOServer as a clonable "template" for new connections
059    private ISOServer server;
060    protected LocalSpace sp;
061    private String inQueue;
062    private String outQueue;
063    private String sendMethod;
064    AtomicInteger msgn = new AtomicInteger();
065
066    private Gauge connectionsGauge;
067    private static final String CHANNEL_NAME_REGEXP = " (?=\\d+ \\S+:\\S+)";
068
069    public QServer () {
070        super ();
071    }
072
073    @Override
074    public void initService() throws ConfigurationException {
075        Element e = getPersist ();
076        sp        = grabSpace (e.getChild ("space"));
077    }
078
079    private void newChannel () throws ConfigurationException {
080        Element persist = getPersist ();
081        Element e = persist.getChild ("channel");
082        if (e == null) {
083            throw new ConfigurationException ("channel element missing");
084        }
085
086        ChannelAdaptor adaptor = new ChannelAdaptor ();     // leverage adaptor's newChannel logic
087        channel = adaptor.newChannel (e, getFactory ());
088    }
089
090    private void initServer ()
091        throws ConfigurationException
092    {
093        if (port == 0) {
094            throw new ConfigurationException ("Port value not set");
095        }
096        newChannel();
097        if (channel == null) {
098            throw new ConfigurationException ("ISO Channel is null");
099        }
100
101        if (!(channel instanceof ServerChannel)) {
102            throw new ConfigurationException (channelString +
103                  "does not implement ServerChannel");
104        }
105
106        server = new ISOServer (port, (ServerChannel) channel, maxSessions);
107        server.setLogger (log.getLogger(), getName() + ".server");
108        server.setName (getName ());
109        if (socketFactoryString != null) {
110            ISOServerSocketFactory sFac = getFactory().newInstance(socketFactoryString);
111            if (sFac instanceof LogSource ls) {
112                ls.setLogger(log.getLogger(),getName() + ".socket-factory");
113            }
114            server.setSocketFactory(sFac);
115        }
116        getFactory().setConfiguration (server, getPersist());
117        addServerSocketFactory();
118        addListeners ();// ISORequestListener
119        addISOServerConnectionListeners();
120        NameRegistrar.register (getName(), this);
121        initMeters(); // meters need 'server' to be initialized
122        Executors.newVirtualThreadPerTaskExecutor().submit(server);
123    }
124    private void initIn() {
125        Element persist = getPersist();
126        inQueue = Environment.get(persist.getChildTextTrim("in"));
127        if (inQueue != null) {
128            /*
129             * We have an 'in' queue to monitor for messages to be
130             * sent out through server in our (SpaceListener)notify(Object, Object) method.
131             */
132            sp.addListener(inQueue, this);
133        }
134    }
135    private void initOut() {
136        Element persist = getPersist();
137        outQueue = Environment.get(persist.getChildTextTrim("out"));
138        if (outQueue != null) {
139            /*
140             * We have an 'out' queue to send any messages to that are received
141             * by our requestListener(this).
142             *
143             * Note, if additional ISORequestListeners are registered with the server after
144             *  this point, then they won't see anything as our process(ISOSource, ISOMsg)
145             *  always return true.
146             */
147           server.addISORequestListener(this);
148        }
149    }
150    @Override
151    public void startService () {
152        try {
153            initServer ();
154            initIn();
155            initOut();
156            initWhoToSendTo();
157        } catch (Exception e) {
158            getLog().warn ("error starting service", e);
159        }
160    }
161    private void initWhoToSendTo() {
162        Element persist = getPersist();
163        sendMethod = persist.getChildText("send-request");
164        if (sendMethod==null) {
165            sendMethod="LAST";
166        }
167    }
168
169    @Override
170    public void stopService () {
171        if (server != null) {
172            server.shutdown ();
173            sp.removeListener(inQueue, this);
174        }
175        removeMeters();
176    }
177    @Override
178    public void destroyService () {
179        NameRegistrar.unregister (getName());
180        NameRegistrar.unregister ("server." + getName());
181    }
182
183    @Override
184    public synchronized void setPort (int port) {
185        this.port = port;
186        setAttr (getAttrs (), "port", port);
187        setModified (true);
188    }
189
190    @Override
191    public int getPort () {
192        return port;
193    }
194
195    @Override
196    public synchronized void setPackager (String packager) {
197        packagerString = packager;
198        setAttr (getAttrs (), "packager", packagerString);
199        setModified (true);
200    }
201
202    @Override
203    public String getPackager () {
204        return packagerString;
205    }
206
207    @Override
208    public synchronized void setChannel (String channel) {
209        channelString = channel;
210        setAttr (getAttrs (), "channel", channelString);
211        setModified (true);
212    }
213
214    @Override
215    public String getChannel () {
216        return channelString;
217    }
218
219    @Override
220    public synchronized void setMaxSessions (int maxSessions) {
221        this.maxSessions = maxSessions;
222        setAttr (getAttrs (), "maxSessions", maxSessions);
223        setModified (true);
224    }
225
226    @Override
227    public int getMaxSessions () {
228        return maxSessions;
229    }
230
231    @Override
232    public synchronized void setSocketFactory (String sFactory) {
233        socketFactoryString = sFactory;
234        setAttr (getAttrs(),"socketFactory", socketFactoryString);
235        setModified (true);
236    }
237
238    @Override
239    public String getSocketFactory() {
240        return socketFactoryString;
241    }
242
243    @Override
244    public String getISOChannelNames() {
245        return server.getISOChannelNames();
246    }
247
248    public ISOServer getISOServer() {
249        return server;
250    }
251
252    @Override
253    public String getCountersAsString () {
254        return server.getCountersAsString ();
255    }
256    @Override
257    public String getCountersAsString (String isoChannelName) {
258        return server.getCountersAsString (isoChannelName);
259    }
260
261    private void addServerSocketFactory () throws ConfigurationException {
262        QFactory factory = getFactory ();
263        Element serverSocketFactoryElement = getPersist().getChild ("server-socket-factory");
264        if (serverSocketFactoryElement != null) {
265            ISOServerSocketFactory serverSocketFactory= factory.newInstance(serverSocketFactoryElement);
266            if (serverSocketFactory != null)
267                server.setSocketFactory(serverSocketFactory);
268        }
269
270    }
271
272    private void addListeners () throws ConfigurationException {
273        QFactory factory = getFactory ();
274        for (Element l : getPersist().getChildren("request-listener")) {
275            ISORequestListener listener = factory.newInstance(l);
276            if (listener != null)
277                server.addISORequestListener (listener);
278        }
279    }
280
281    private void addISOServerConnectionListeners() throws ConfigurationException {
282        QFactory factory = getFactory ();
283        for (Element l : getPersist().getChildren("connection-listener")) {
284            ISOServerEventListener listener = factory.newInstance(l);
285            if (listener != null)
286                server.addServerEventListener(listener);
287        }
288    }
289
290
291    private LocalSpace grabSpace (Element e) throws ConfigurationException
292    {
293        String uri = e != null ? Environment.get(e.getTextTrim()) : "";
294        Space sp = SpaceFactory.getSpace (uri);
295        if (sp instanceof LocalSpace) {
296            return (LocalSpace) sp;
297        }
298        throw new ConfigurationException ("Invalid space " + uri);
299    }
300
301    /*
302     * This method will be invoked through the SpaceListener interface we registered once
303     * we noticed we had an 'in' queue.
304     */
305    @Override
306    public void notify(Object key, Object value) {
307        Object obj = sp.inp(key);
308        if (obj instanceof ISOMsg) {
309            ISOMsg m = (ISOMsg) obj;
310            if ("LAST".equals(sendMethod)) {
311                try {
312                    ISOChannel c = server.getLastConnectedISOChannel();
313                    if (c == null) {
314                        throw new ISOException("Server has no active connections");
315                    }
316                    if (!c.isConnected()) {
317                        throw new ISOException("Client disconnected");
318                    }
319                    c.send(m);
320                }
321                catch (Exception e) {
322                    getLog().warn("notify", e);
323                }
324            }
325            else if ("ALL".equals(sendMethod)) {
326                String channelNames = getISOChannelNames();
327                String[] channelName;
328                if (channelNames != null) {
329                    channelName = channelNames.split(CHANNEL_NAME_REGEXP);
330                    for (String s : channelName) {
331                        try {
332                            ISOChannel c = server.getISOChannel(s);
333                            if (c == null) {
334                                throw new ISOException("Server has no active connections");
335                            }
336                            if (!c.isConnected()) {
337                                throw new ISOException("Client disconnected");
338                            }
339                            c.send(m);
340                        } catch (Exception e) {
341                            getLog().warn("notify", e);
342                        }
343                    }
344                }
345            }
346            else if ("RR".equals(sendMethod)) {
347                String channelNames = getISOChannelNames();
348                String[] channelName;
349                if (channelNames != null) {
350                    channelName = channelNames.split(CHANNEL_NAME_REGEXP);
351                    try {
352                        ISOChannel c = server.getISOChannel(channelName[msgn.incrementAndGet() % channelName.length]);
353                        if (c == null) {
354                            throw new ISOException("Server has no active connections");
355                        }
356                        if (!c.isConnected()) {
357                            throw new ISOException("Client disconnected");
358                        }
359                        c.send(m);
360                    } catch (Exception e) {
361                        getLog().warn("notify", e);
362                    }
363                }
364            }
365        }
366    }
367
368    /*
369     * This method will be invoked through the ISORequestListener interface, *if*
370     * this QServer has an 'out' queue to handle.
371     */
372    @Override
373    public boolean process(ISOSource source, ISOMsg m) {
374        sp.out(outQueue, m);
375        return true;
376    }
377
378    private void initMeters() {
379        var tags =  Tags.of("name", getName(),
380                            "type", "server");
381        var registry = getServer().getMeterRegistry();
382
383        connectionsGauge =
384          MeterFactory.gauge
385            (registry, MeterInfo.ISOSERVER_CONNECTION_COUNT,
386              tags.and("port", ""+getPort()),
387              BaseUnits.SESSIONS,
388              server::getActiveConnections
389            );
390
391        if (channel instanceof ISOMsgMetrics.Source ms) {
392            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
393            if (mtr != null) {
394                mtr.addTags(tags);
395                mtr.register(registry);
396            }
397        }
398    }
399
400    private void removeMeters() {
401        var registry = getServer().getMeterRegistry();
402        registry.remove(connectionsGauge);
403
404        if (channel instanceof ISOMsgMetrics.Source ms) {
405            ISOMsgMetrics mtr = ms.getISOMsgMetrics();
406            if (mtr != null)
407                mtr.removeMeters();
408        }
409    }
410}