001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.Environment;
024import org.jpos.iso.*;
025import org.jpos.space.SpaceUtil;
026import org.jpos.util.Loggeable;
027import org.jpos.util.NameRegistrar;
028
029import java.io.IOException;
030import java.util.Date;
031
032/**
033 * Q2 QBean that manages multiple concurrent sessions over a single ISOChannel.
034 * @author apr
035 * @since 1.8.5
036 */
037@SuppressWarnings({"unused", "unchecked"})
038public class MultiSessionChannelAdaptor
039    extends ChannelAdaptor
040    implements MultiSessionChannelAdaptorMBean, Channel, Loggeable
041{
042    int sessions = 1;
043    ISOChannel[] channels;
044    int roundRobinCounter = 0;
045
046    /** Default constructor. */
047    public MultiSessionChannelAdaptor () {
048        super ();
049        resetCounters();
050    }
051    public void initService() throws ConfigurationException {
052        initSpaceAndQueues();
053        NameRegistrar.register (getName(), this);
054    }
055    public void startService () {
056        try {
057            channels = new ISOChannel[sessions];
058            for (int i=0; i<sessions; i++) {
059                ISOChannel c = initChannel();
060                channels[i] = c;
061                if (!writeOnly)
062                    new Thread (new Receiver (i), "channel-receiver-" + in + "-" + i).start ();
063            }
064            new Thread (new Sender (), "channel-sender-" + in).start ();
065        } catch (Exception e) {
066            getLog().warn ("error starting service", e);
067        }
068    }
069
070    public int getSessions() {
071        return sessions;
072    }
073
074    public void setSessions(int sessions) {
075        this.sessions = sessions;
076    }
077    /** Per-session worker that pulls requests from the in-queue and writes them to the channel. */
078    @SuppressWarnings("unchecked")
079    public class Sender implements Runnable {
080        /** Default constructor. */
081        public Sender () {
082            super ();
083        }
084        public void run () {
085            while (running ()){
086                ISOChannel channel = null;
087                try {
088                    if (!running())
089                        break;
090                    if (sp.rd(ready, delay) == null)
091                        continue;
092                    Object o = sp.in (in, delay);
093                    channel = getNextChannel(); // we want to call getNextChannel even if o is null so that
094                                                // it can pull the 'ready' indicator.
095                    if (o instanceof ISOMsg && channel != null) {
096                        channel.send ((ISOMsg) o);
097                        tx++;
098                    }
099                } catch (ISOFilter.VetoException e) {
100                    getLog().warn ("channel-sender-"+in, e.getMessage ());
101                } catch (ISOException e) {
102                    getLog().warn ("channel-sender-"+in, e.getMessage ());
103                    if (!ignoreISOExceptions) {
104                        disconnect (channel);
105                    }
106                    ISOUtil.sleep (1000); // slow down on errors
107                } catch (Exception e) {
108                    getLog().warn ("channel-sender-"+in, e.getMessage ());
109                    disconnect (channel);
110                    ISOUtil.sleep (1000);
111                }
112            }
113            disconnectAll();
114        }
115    }
116    /** Per-session worker that drains the channel and posts inbound messages to the out-queue. */
117    @SuppressWarnings("unchecked")
118    public class Receiver implements Runnable {
119        int slot;
120        ISOChannel channel;
121        /**
122         * Constructs a Receiver bound to a specific channel slot.
123         *
124         * @param slot index into the {@code channels} array
125         */
126        public Receiver (int slot) {
127            super ();
128            this.channel = channels[slot];
129            this.slot = slot;
130        }
131        public void run () {
132            ISOUtil.sleep(slot*10); // we don't want to blast a server at startup
133            while (running()) {
134                try {
135                    if (!channel.isConnected()) {
136                        connect(slot);
137                        if (!channel.isConnected()) {
138                            ISOUtil.sleep(delay);
139                            continue;
140                        }
141                    }
142                    ISOMsg m = channel.receive ();
143                    rx++;
144                    lastTxn = System.currentTimeMillis();
145                    if (timeout > 0)
146                        sp.out (out, m, timeout);
147                    else
148                        sp.out (out, m);
149                } catch (ISOException e) {
150                    if (running()) {
151                        getLog().warn ("channel-receiver-"+out, e);
152                        if (!ignoreISOExceptions) {
153                            disconnect (channel);
154                        }
155                        ISOUtil.sleep(1000);
156                    }
157                } catch (Exception e) {
158                    if (running()) {
159                        getLog().warn("channel-receiver-" + out, e);
160                        disconnect (channel);
161                        ISOUtil.sleep(1000);
162                    }
163                }
164            }
165        }
166    }
167    @Override
168    protected void initSpaceAndQueues () throws ConfigurationException {
169        super.initSpaceAndQueues();
170        Element persist = getPersist ();
171        String s = Environment.get(persist.getChildTextTrim("sessions"));
172        setSessions(s != null && s.length() > 0 ? Integer.parseInt(s) : 1);
173    }
174
175    private void connect (int slot) {
176        ISOChannel c = channels[slot];
177        if (c != null && !c.isConnected()) {
178            try {
179                c.connect ();
180                sp.put (ready, new Date());
181            } catch (IOException e) {
182                getLog().warn ("check-connection(" + slot + ") " + c.toString(), e.getMessage ());
183            }
184        }
185    }
186    private void disconnect (ISOChannel channel) {
187        try {
188            if (getConnectedCount() <= 1)
189                SpaceUtil.wipe(sp, ready);
190            channel.disconnect ();
191        } catch (IOException e) {
192            getLog().warn ("disconnect", e);
193        }
194    }
195    private void disconnectAll() {
196        for (ISOChannel channel : channels) disconnect(channel);
197    }
198    private ISOChannel getNextChannel() {
199        ISOChannel c = null;
200        for (int size = channels.length; size > 0; size--) {
201            c = channels[roundRobinCounter++ % channels.length];
202            if (c != null && c.isConnected())
203                break;
204        }
205        if (c == null)
206            SpaceUtil.wipe(sp, ready);
207        return c;
208    }
209    private int getConnectedCount() {
210        int connected = 0;
211        for (ISOChannel c : channels) {
212            if (c != null && c.isConnected()) {
213                connected++;
214            }
215        }
216        return connected;
217    }
218}