001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.Environment; 024import org.jpos.iso.*; 025import org.jpos.space.SpaceUtil; 026import org.jpos.util.Loggeable; 027import org.jpos.util.NameRegistrar; 028 029import java.io.IOException; 030import java.util.Date; 031 032/** 033 * Q2 QBean that manages multiple concurrent sessions over a single ISOChannel. 034 * @author apr 035 * @since 1.8.5 036 */ 037@SuppressWarnings({"unused", "unchecked"}) 038public class MultiSessionChannelAdaptor 039 extends ChannelAdaptor 040 implements MultiSessionChannelAdaptorMBean, Channel, Loggeable 041{ 042 int sessions = 1; 043 ISOChannel[] channels; 044 int roundRobinCounter = 0; 045 046 /** Default constructor. */ 047 public MultiSessionChannelAdaptor () { 048 super (); 049 resetCounters(); 050 } 051 public void initService() throws ConfigurationException { 052 initSpaceAndQueues(); 053 NameRegistrar.register (getName(), this); 054 } 055 public void startService () { 056 try { 057 channels = new ISOChannel[sessions]; 058 for (int i=0; i<sessions; i++) { 059 ISOChannel c = initChannel(); 060 channels[i] = c; 061 if (!writeOnly) 062 new Thread (new Receiver (i), "channel-receiver-" + in + "-" + i).start (); 063 } 064 new Thread (new Sender (), "channel-sender-" + in).start (); 065 } catch (Exception e) { 066 getLog().warn ("error starting service", e); 067 } 068 } 069 070 public int getSessions() { 071 return sessions; 072 } 073 074 public void setSessions(int sessions) { 075 this.sessions = sessions; 076 } 077 /** Per-session worker that pulls requests from the in-queue and writes them to the channel. */ 078 @SuppressWarnings("unchecked") 079 public class Sender implements Runnable { 080 /** Default constructor. */ 081 public Sender () { 082 super (); 083 } 084 public void run () { 085 while (running ()){ 086 ISOChannel channel = null; 087 try { 088 if (!running()) 089 break; 090 if (sp.rd(ready, delay) == null) 091 continue; 092 Object o = sp.in (in, delay); 093 channel = getNextChannel(); // we want to call getNextChannel even if o is null so that 094 // it can pull the 'ready' indicator. 095 if (o instanceof ISOMsg && channel != null) { 096 channel.send ((ISOMsg) o); 097 tx++; 098 } 099 } catch (ISOFilter.VetoException e) { 100 getLog().warn ("channel-sender-"+in, e.getMessage ()); 101 } catch (ISOException e) { 102 getLog().warn ("channel-sender-"+in, e.getMessage ()); 103 if (!ignoreISOExceptions) { 104 disconnect (channel); 105 } 106 ISOUtil.sleep (1000); // slow down on errors 107 } catch (Exception e) { 108 getLog().warn ("channel-sender-"+in, e.getMessage ()); 109 disconnect (channel); 110 ISOUtil.sleep (1000); 111 } 112 } 113 disconnectAll(); 114 } 115 } 116 /** Per-session worker that drains the channel and posts inbound messages to the out-queue. */ 117 @SuppressWarnings("unchecked") 118 public class Receiver implements Runnable { 119 int slot; 120 ISOChannel channel; 121 /** 122 * Constructs a Receiver bound to a specific channel slot. 123 * 124 * @param slot index into the {@code channels} array 125 */ 126 public Receiver (int slot) { 127 super (); 128 this.channel = channels[slot]; 129 this.slot = slot; 130 } 131 public void run () { 132 ISOUtil.sleep(slot*10); // we don't want to blast a server at startup 133 while (running()) { 134 try { 135 if (!channel.isConnected()) { 136 connect(slot); 137 if (!channel.isConnected()) { 138 ISOUtil.sleep(delay); 139 continue; 140 } 141 } 142 ISOMsg m = channel.receive (); 143 rx++; 144 lastTxn = System.currentTimeMillis(); 145 if (timeout > 0) 146 sp.out (out, m, timeout); 147 else 148 sp.out (out, m); 149 } catch (ISOException e) { 150 if (running()) { 151 getLog().warn ("channel-receiver-"+out, e); 152 if (!ignoreISOExceptions) { 153 disconnect (channel); 154 } 155 ISOUtil.sleep(1000); 156 } 157 } catch (Exception e) { 158 if (running()) { 159 getLog().warn("channel-receiver-" + out, e); 160 disconnect (channel); 161 ISOUtil.sleep(1000); 162 } 163 } 164 } 165 } 166 } 167 @Override 168 protected void initSpaceAndQueues () throws ConfigurationException { 169 super.initSpaceAndQueues(); 170 Element persist = getPersist (); 171 String s = Environment.get(persist.getChildTextTrim("sessions")); 172 setSessions(s != null && s.length() > 0 ? Integer.parseInt(s) : 1); 173 } 174 175 private void connect (int slot) { 176 ISOChannel c = channels[slot]; 177 if (c != null && !c.isConnected()) { 178 try { 179 c.connect (); 180 sp.put (ready, new Date()); 181 } catch (IOException e) { 182 getLog().warn ("check-connection(" + slot + ") " + c.toString(), e.getMessage ()); 183 } 184 } 185 } 186 private void disconnect (ISOChannel channel) { 187 try { 188 if (getConnectedCount() <= 1) 189 SpaceUtil.wipe(sp, ready); 190 channel.disconnect (); 191 } catch (IOException e) { 192 getLog().warn ("disconnect", e); 193 } 194 } 195 private void disconnectAll() { 196 for (ISOChannel channel : channels) disconnect(channel); 197 } 198 private ISOChannel getNextChannel() { 199 ISOChannel c = null; 200 for (int size = channels.length; size > 0; size--) { 201 c = channels[roundRobinCounter++ % channels.length]; 202 if (c != null && c.isConnected()) 203 break; 204 } 205 if (c == null) 206 SpaceUtil.wipe(sp, ready); 207 return c; 208 } 209 private int getConnectedCount() { 210 int connected = 0; 211 for (ISOChannel c : channels) { 212 if (c != null && c.isConnected()) { 213 connected++; 214 } 215 } 216 return connected; 217 } 218}