001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.q2.iso;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.Environment;
024import org.jpos.iso.*;
025import org.jpos.space.SpaceUtil;
026import org.jpos.util.LogSource;
027import org.jpos.util.Loggeable;
028import org.jpos.util.NameRegistrar;
029
030import java.io.IOException;
031import java.util.Date;
032
033/**
034 * @author apr
035 * @since 1.8.5
036 */
037@SuppressWarnings({"unused", "unchecked"})
038public class MultiSessionChannelAdaptor
039    extends ChannelAdaptor
040    implements MultiSessionChannelAdaptorMBean, Channel, Loggeable
041{
042    int sessions = 1;
043    ISOChannel[] channels;
044    int roundRobinCounter = 0;
045
046    public MultiSessionChannelAdaptor () {
047        super ();
048        resetCounters();
049    }
050    public void initService() throws ConfigurationException {
051        initSpaceAndQueues();
052        NameRegistrar.register (getName(), this);
053    }
054    public void startService () {
055        try {
056            channels = new ISOChannel[sessions];
057            for (int i=0; i<sessions; i++) {
058                ISOChannel c = initChannel();
059                if (c instanceof LogSource) {
060                    LogSource ls = (LogSource) c;
061                    ls.setLogger(ls.getLogger(), ls.getRealm()+"-"+i);
062
063                }
064                channels[i] = c;
065                if (!writeOnly)
066                    new Thread (new Receiver (i), "channel-receiver-" + in + "-" + i).start ();
067            }
068            new Thread (new Sender (), "channel-sender-" + in).start ();
069        } catch (Exception e) {
070            getLog().warn ("error starting service", e);
071        }
072    }
073
074    public int getSessions() {
075        return sessions;
076    }
077
078    public void setSessions(int sessions) {
079        this.sessions = sessions;
080    }
081    @SuppressWarnings("unchecked")
082    public class Sender implements Runnable {
083        public Sender () {
084            super ();
085        }
086        public void run () {
087            while (running ()){
088                ISOChannel channel = null;
089                try {
090                    if (!running())
091                        break;
092                    if (sp.rd(ready, delay) == null)
093                        continue;
094                    Object o = sp.in (in, delay);
095                    channel = getNextChannel(); // we want to call getNextChannel even if o is null so that
096                                                // it can pull the 'ready' indicator.
097                    if (o instanceof ISOMsg && channel != null) {
098                        channel.send ((ISOMsg) o);
099                        tx++;
100                    }
101                } catch (ISOFilter.VetoException e) {
102                    getLog().warn ("channel-sender-"+in, e.getMessage ());
103                } catch (ISOException e) {
104                    getLog().warn ("channel-sender-"+in, e.getMessage ());
105                    if (!ignoreISOExceptions) {
106                        disconnect (channel);
107                    }
108                    ISOUtil.sleep (1000); // slow down on errors
109                } catch (Exception e) {
110                    getLog().warn ("channel-sender-"+in, e.getMessage ());
111                    disconnect (channel);
112                    ISOUtil.sleep (1000);
113                }
114            }
115            disconnectAll();
116        }
117    }
118    @SuppressWarnings("unchecked")
119    public class Receiver implements Runnable {
120        int slot;
121        ISOChannel channel;
122        public Receiver (int slot) {
123            super ();
124            this.channel = channels[slot];
125            this.slot = slot;
126        }
127        public void run () {
128            ISOUtil.sleep(slot*10); // we don't want to blast a server at startup
129            while (running()) {
130                try {
131                    if (!channel.isConnected()) {
132                        connect(slot);
133                        if (!channel.isConnected()) {
134                            ISOUtil.sleep(delay);
135                            continue;
136                        }
137                    }
138                    ISOMsg m = channel.receive ();
139                    rx++;
140                    lastTxn = System.currentTimeMillis();
141                    if (timeout > 0)
142                        sp.out (out, m, timeout);
143                    else
144                        sp.out (out, m);
145                } catch (ISOException e) {
146                    if (running()) {
147                        getLog().warn ("channel-receiver-"+out, e);
148                        if (!ignoreISOExceptions) {
149                            disconnect (channel);
150                        }
151                        ISOUtil.sleep(1000);
152                    }
153                } catch (Exception e) {
154                    if (running()) {
155                        getLog().warn("channel-receiver-" + out, e);
156                        disconnect (channel);
157                        ISOUtil.sleep(1000);
158                    }
159                }
160            }
161        }
162    }
163    @Override
164    protected void initSpaceAndQueues () throws ConfigurationException {
165        super.initSpaceAndQueues();
166        Element persist = getPersist ();
167        String s = Environment.get(persist.getChildTextTrim("sessions"));
168        setSessions(s != null && s.length() > 0 ? Integer.parseInt(s) : 1);
169    }
170
171    private void connect (int slot) {
172        ISOChannel c = channels[slot];
173        if (c != null && !c.isConnected()) {
174            try {
175                c.connect ();
176                sp.put (ready, new Date());
177            } catch (IOException e) {
178                getLog().warn ("check-connection(" + slot + ") " + c.toString(), e.getMessage ());
179            }
180        }
181    }
182    private void disconnect (ISOChannel channel) {
183        try {
184            if (getConnectedCount() <= 1)
185                SpaceUtil.wipe(sp, ready);
186            channel.disconnect ();
187        } catch (IOException e) {
188            getLog().warn ("disconnect", e);
189        }
190    }
191    private void disconnectAll() {
192        for (ISOChannel channel : channels) disconnect(channel);
193    }
194    private ISOChannel getNextChannel() {
195        ISOChannel c = null;
196        for (int size = channels.length; size > 0; size--) {
197            c = channels[roundRobinCounter++ % channels.length];
198            if (c != null && c.isConnected())
199                break;
200        }
201        if (c == null)
202            SpaceUtil.wipe(sp, ready);
203        return c;
204    }
205    private int getConnectedCount() {
206        int connected = 0;
207        for (ISOChannel c : channels) {
208            if (c != null && c.isConnected()) {
209                connected++;
210            }
211        }
212        return connected;
213    }
214}