001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.Environment; 024import org.jpos.iso.*; 025import org.jpos.space.SpaceUtil; 026import org.jpos.util.LogSource; 027import org.jpos.util.Loggeable; 028import org.jpos.util.NameRegistrar; 029 030import java.io.IOException; 031import java.util.Date; 032 033/** 034 * @author apr 035 * @since 1.8.5 036 */ 037@SuppressWarnings({"unused", "unchecked"}) 038public class MultiSessionChannelAdaptor 039 extends ChannelAdaptor 040 implements MultiSessionChannelAdaptorMBean, Channel, Loggeable 041{ 042 int sessions = 1; 043 ISOChannel[] channels; 044 int roundRobinCounter = 0; 045 046 public MultiSessionChannelAdaptor () { 047 super (); 048 resetCounters(); 049 } 050 public void initService() throws ConfigurationException { 051 initSpaceAndQueues(); 052 NameRegistrar.register (getName(), this); 053 } 054 public void startService () { 055 try { 056 channels = new ISOChannel[sessions]; 057 for (int i=0; i<sessions; i++) { 058 ISOChannel c = initChannel(); 059 if (c instanceof LogSource) { 060 LogSource ls = (LogSource) c; 061 ls.setLogger(ls.getLogger(), ls.getRealm()+"-"+i); 062 063 } 064 channels[i] = c; 065 if (!writeOnly) 066 new Thread (new Receiver (i), "channel-receiver-" + in + "-" + i).start (); 067 } 068 new Thread (new Sender (), "channel-sender-" + in).start (); 069 } catch (Exception e) { 070 getLog().warn ("error starting service", e); 071 } 072 } 073 074 public int getSessions() { 075 return sessions; 076 } 077 078 public void setSessions(int sessions) { 079 this.sessions = sessions; 080 } 081 @SuppressWarnings("unchecked") 082 public class Sender implements Runnable { 083 public Sender () { 084 super (); 085 } 086 public void run () { 087 while (running ()){ 088 ISOChannel channel = null; 089 try { 090 if (!running()) 091 break; 092 if (sp.rd(ready, delay) == null) 093 continue; 094 Object o = sp.in (in, delay); 095 channel = getNextChannel(); // we want to call getNextChannel even if o is null so that 096 // it can pull the 'ready' indicator. 097 if (o instanceof ISOMsg && channel != null) { 098 channel.send ((ISOMsg) o); 099 tx++; 100 } 101 } catch (ISOFilter.VetoException e) { 102 getLog().warn ("channel-sender-"+in, e.getMessage ()); 103 } catch (ISOException e) { 104 getLog().warn ("channel-sender-"+in, e.getMessage ()); 105 if (!ignoreISOExceptions) { 106 disconnect (channel); 107 } 108 ISOUtil.sleep (1000); // slow down on errors 109 } catch (Exception e) { 110 getLog().warn ("channel-sender-"+in, e.getMessage ()); 111 disconnect (channel); 112 ISOUtil.sleep (1000); 113 } 114 } 115 disconnectAll(); 116 } 117 } 118 @SuppressWarnings("unchecked") 119 public class Receiver implements Runnable { 120 int slot; 121 ISOChannel channel; 122 public Receiver (int slot) { 123 super (); 124 this.channel = channels[slot]; 125 this.slot = slot; 126 } 127 public void run () { 128 ISOUtil.sleep(slot*10); // we don't want to blast a server at startup 129 while (running()) { 130 try { 131 if (!channel.isConnected()) { 132 connect(slot); 133 if (!channel.isConnected()) { 134 ISOUtil.sleep(delay); 135 continue; 136 } 137 } 138 ISOMsg m = channel.receive (); 139 rx++; 140 lastTxn = System.currentTimeMillis(); 141 if (timeout > 0) 142 sp.out (out, m, timeout); 143 else 144 sp.out (out, m); 145 } catch (ISOException e) { 146 if (running()) { 147 getLog().warn ("channel-receiver-"+out, e); 148 if (!ignoreISOExceptions) { 149 disconnect (channel); 150 } 151 ISOUtil.sleep(1000); 152 } 153 } catch (Exception e) { 154 if (running()) { 155 getLog().warn("channel-receiver-" + out, e); 156 disconnect (channel); 157 ISOUtil.sleep(1000); 158 } 159 } 160 } 161 } 162 } 163 @Override 164 protected void initSpaceAndQueues () throws ConfigurationException { 165 super.initSpaceAndQueues(); 166 Element persist = getPersist (); 167 String s = Environment.get(persist.getChildTextTrim("sessions")); 168 setSessions(s != null && s.length() > 0 ? Integer.parseInt(s) : 1); 169 } 170 171 private void connect (int slot) { 172 ISOChannel c = channels[slot]; 173 if (c != null && !c.isConnected()) { 174 try { 175 c.connect (); 176 sp.put (ready, new Date()); 177 } catch (IOException e) { 178 getLog().warn ("check-connection(" + slot + ") " + c.toString(), e.getMessage ()); 179 } 180 } 181 } 182 private void disconnect (ISOChannel channel) { 183 try { 184 if (getConnectedCount() <= 1) 185 SpaceUtil.wipe(sp, ready); 186 channel.disconnect (); 187 } catch (IOException e) { 188 getLog().warn ("disconnect", e); 189 } 190 } 191 private void disconnectAll() { 192 for (ISOChannel channel : channels) disconnect(channel); 193 } 194 private ISOChannel getNextChannel() { 195 ISOChannel c = null; 196 for (int size = channels.length; size > 0; size--) { 197 c = channels[roundRobinCounter++ % channels.length]; 198 if (c != null && c.isConnected()) 199 break; 200 } 201 if (c == null) 202 SpaceUtil.wipe(sp, ready); 203 return c; 204 } 205 private int getConnectedCount() { 206 int connected = 0; 207 for (ISOChannel c : channels) { 208 if (c != null && c.isConnected()) { 209 connected++; 210 } 211 } 212 return connected; 213 } 214}