001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.Environment; 024import org.jpos.iso.BaseChannel; 025import org.jpos.iso.Channel; 026import org.jpos.iso.FactoryChannel; 027import org.jpos.iso.FilteredChannel; 028import org.jpos.iso.ISOChannel; 029import org.jpos.iso.ISOClientSocketFactory; 030import org.jpos.iso.ISOFilter; 031import org.jpos.iso.ISOMsg; 032import org.jpos.iso.ISOPackager; 033import org.jpos.q2.QBeanSupport; 034import org.jpos.q2.QFactory; 035import org.jpos.space.Space; 036import org.jpos.space.SpaceFactory; 037import org.jpos.space.SpaceUtil; 038import org.jpos.util.LogEvent; 039import org.jpos.util.LogSource; 040import org.jpos.util.Logger; 041import org.jpos.util.NameRegistrar; 042 043import java.io.IOException; 044import java.util.Date; 045import java.util.concurrent.Executors; 046import java.util.concurrent.ScheduledExecutorService; 047import java.util.concurrent.SynchronousQueue; 048import java.util.concurrent.ThreadPoolExecutor; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicInteger; 051 052/** 053 * OneShotChannelAdaptorMK2 connects and disconnects a channel for every message 054 * exchange. It is similar to OneShotChannelAdaptor but uses a thread pool instead 055 * of opening threads statically and supports mux pooling by exposing channel readiness. 056 * 057 * @author Alejandro Revilla 058 * @author Thomas L. Kjeldsen 059 * @author Victor Salaman 060 */ 061@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"}) 062public class OneShotChannelAdaptorMK2 063 extends QBeanSupport 064 implements OneShotChannelAdaptorMK2MBean, Channel, Runnable 065{ 066 Space<String, Object> sp; 067 String in, out, ready; 068 long delay; 069 long checkInterval; 070 int maxConnections; 071 int[] handbackFields; 072 ThreadPoolExecutor threadPool = null; 073 AtomicInteger cnt; 074 Element channelElement; 075 076 ScheduledExecutorService checkTimer; 077 078 public OneShotChannelAdaptorMK2() 079 { 080 super(); 081 } 082 083 @SuppressWarnings("unchecked") 084 private Space<String, Object> grabSpace(Element e) 085 { 086 return (Space<String, Object>) SpaceFactory.getSpace(e != null ? e.getText() : ""); 087 } 088 089 @Override 090 protected void initService() throws Exception 091 { 092 Element persist = getPersist(); 093 channelElement = persist.getChild("channel"); 094 if (channelElement == null) 095 { 096 throw new ConfigurationException("channel element missing"); 097 } 098 sp = grabSpace(persist.getChild("space")); 099 in = Environment.get(persist.getChildTextTrim ("in")); 100 out = Environment.get(persist.getChildTextTrim ("out")); 101 ready = getName() + ".ready"; 102 103 String s = Environment.get(persist.getChildTextTrim ("max-connections")); 104 maxConnections = s != null ? Integer.parseInt(s) : 1; 105 handbackFields = cfg.getInts("handback-field"); 106 107 s = Environment.get(persist.getChildTextTrim ("delay")); 108 delay = s != null ? Integer.parseInt(s) : 2500; 109 110 s = Environment.get(persist.getChildTextTrim("check-interval")); 111 checkInterval = s != null ? Integer.parseInt(s) : 60000; 112 113 NameRegistrar.register(getName(), this); 114 } 115 116 public void startService() 117 { 118 setRealm(getName()); 119 cnt = new AtomicInteger(0); 120 threadPool = new ThreadPoolExecutor(1, 121 maxConnections, 122 10, 123 TimeUnit.SECONDS, 124 new SynchronousQueue<Runnable>()); 125 new Thread(this).start(); 126 127 checkTimer=Executors.newScheduledThreadPool(1); 128 checkTimer.scheduleAtFixedRate(new CheckChannelTask(), 0L, checkInterval,TimeUnit.MILLISECONDS); 129 } 130 131 public void stopService() 132 { 133 if(checkTimer!=null) 134 { 135 checkTimer.shutdown(); 136 checkTimer=null; 137 } 138 139 takeOffline(); 140 sp.out(in, new Object()); 141 threadPool.shutdown(); 142 while (!threadPool.isTerminated()) 143 { 144 try 145 { 146 Thread.sleep(1000L); 147 } 148 catch (InterruptedException e) 149 { 150 } 151 } 152 153 int c=0; 154 while(running()) 155 { 156 try 157 { 158 Thread.sleep(500); 159 } 160 catch (InterruptedException e) 161 { 162 } 163 c++; 164 if(c>10) break; 165 } 166 } 167 168 public void destroyService() 169 { 170 NameRegistrar.unregister(getName()); 171 } 172 173 public boolean isConnected() 174 { 175 return sp != null && sp.rdp(ready) != null; 176 } 177 178 @Override 179 @SuppressWarnings({"StatementWithEmptyBody", "ConstantConditions"}) 180 public void run() 181 { 182 while (running()) 183 { 184 try 185 { 186 Object o = sp.in(in, delay); 187 if (o instanceof ISOMsg) 188 { 189 if(!isConnected()) 190 { 191 continue; 192 } 193 ISOMsg m = (ISOMsg) o; 194 int i = cnt.incrementAndGet(); 195 if (i > 9999) 196 { 197 cnt.set(0); 198 i = cnt.incrementAndGet(); 199 } 200 threadPool.execute(new Worker(m, i)); 201 } 202 } 203 catch (Exception e) 204 { 205 getLog().warn(getName(), e.getMessage()); 206 } 207 } 208 } 209 210 private class CheckChannelTask implements Runnable 211 { 212 @Override 213 public void run() 214 { 215 try 216 { 217 Date lastOnline = (Date) sp.rdp(ready); 218 final LogEvent ev = getLog().createLogEvent("status"); 219 if (isChannelConnectable(true)) 220 { 221 if (lastOnline == null) 222 { 223 ev.addMessage("Channel is now online"); 224 Logger.log(ev); 225 flushInput(); 226 } 227 takeOnline(); 228 } 229 else 230 { 231 takeOffline(); 232 if (lastOnline != null) 233 { 234 ev.addMessage("Channel is now offline"); 235 Logger.log(ev); 236 } 237 } 238 } 239 catch (Throwable e) 240 { 241 getLog().warn(getName(), e.getMessage()); 242 } 243 } 244 245 private boolean isChannelConnectable(boolean showExceptions) 246 { 247 boolean res = false; 248 249 ISOChannel channel = null; 250 try 251 { 252 channel = newChannel(channelElement, getFactory()); 253 if (channel instanceof BaseChannel) 254 { 255 BaseChannel bc = (BaseChannel) channel; 256 bc.setLogger(null, null); 257 } 258 channel.connect(); 259 res = true; 260 } 261 catch (Exception e) 262 { 263 if (showExceptions) 264 { 265 getLog().error(e.getMessage()); 266 } 267 } 268 finally 269 { 270 if (channel != null && channel.isConnected()) 271 { 272 try 273 { 274 channel.disconnect(); 275 } 276 catch (IOException e) 277 { 278 getLog().error(e); 279 } 280 NameRegistrar.unregister("channel."+channel.getName()); 281 } 282 } 283 284 return res; 285 } 286 } 287 288 private void flushInput() 289 { 290 SpaceUtil.wipe(sp,in); 291 } 292 293 private void takeOffline() 294 { 295 SpaceUtil.wipe(sp, ready); 296 } 297 298 private void takeOnline() 299 { 300 sp.put(ready, new Date()); 301 } 302 303 public void send(ISOMsg m) 304 { 305 sp.out(in, m); 306 } 307 308 public void send(ISOMsg m, long timeout) 309 { 310 sp.out(in, m, timeout); 311 } 312 313 public ISOMsg receive() 314 { 315 return (ISOMsg) sp.in(out); 316 } 317 318 public ISOMsg receive(long timeout) 319 { 320 return (ISOMsg) sp.in(out, timeout); 321 } 322 323 private ISOChannel newChannel(Element e, QFactory f) 324 throws ConfigurationException 325 { 326 String channelName = QFactory.getAttributeValue(e, "class"); 327 if (channelName == null) 328 { 329 throw new ConfigurationException("class attribute missing from channel element."); 330 } 331 332 String packagerName = QFactory.getAttributeValue(e, "packager"); 333 334 ISOChannel channel = (ISOChannel) f.newInstance(channelName); 335 ISOPackager packager; 336 if (packagerName != null) 337 { 338 packager = (ISOPackager) f.newInstance(packagerName); 339 channel.setPackager(packager); 340 f.setConfiguration(packager, e); 341 } 342 QFactory.invoke(channel, "setHeader", QFactory.getAttributeValue(e, "header")); 343 f.setLogger(channel, e); 344 f.setConfiguration(channel, e); 345 346 if (channel instanceof FilteredChannel) 347 { 348 addFilters((FilteredChannel) channel, e, f); 349 } 350 351 String socketFactoryString = getSocketFactory(); 352 if (socketFactoryString != null && channel instanceof FactoryChannel) 353 { 354 ISOClientSocketFactory sFac = (ISOClientSocketFactory) getFactory().newInstance(socketFactoryString); 355 if (sFac != null && sFac instanceof LogSource) 356 { 357 ((LogSource) sFac).setLogger(log.getLogger(), getName() + ".socket-factory"); 358 } 359 getFactory().setConfiguration(sFac, e); 360 ((FactoryChannel) channel).setSocketFactory(sFac); 361 } 362 363 return channel; 364 } 365 366 private void addFilters(FilteredChannel channel, Element e, QFactory fact) 367 throws ConfigurationException 368 { 369 for (Object o : e.getChildren("filter")) 370 { 371 Element f = (Element) o; 372 String clazz = QFactory.getAttributeValue(f, "class"); 373 ISOFilter filter = (ISOFilter) fact.newInstance(clazz); 374 fact.setLogger(filter, f); 375 fact.setConfiguration(filter, f); 376 String direction = QFactory.getAttributeValue(f, "direction"); 377 if (direction == null) 378 { 379 channel.addFilter(filter); 380 } 381 else if ("incoming".equalsIgnoreCase(direction)) 382 { 383 channel.addIncomingFilter(filter); 384 } 385 else if ("outgoing".equalsIgnoreCase(direction)) 386 { 387 channel.addOutgoingFilter(filter); 388 } 389 else if ("both".equalsIgnoreCase(direction)) 390 { 391 channel.addIncomingFilter(filter); 392 channel.addOutgoingFilter(filter); 393 } 394 } 395 } 396 397 public String getInQueue() 398 { 399 return in; 400 } 401 402 public synchronized void setInQueue(String in) 403 { 404 String old = this.in; 405 this.in = in; 406 if (old != null) 407 { 408 sp.out(old, new Object()); 409 } 410 411 getPersist().getChild("in").setText(in); 412 setModified(true); 413 } 414 415 public String getOutQueue() 416 { 417 return out; 418 } 419 420 public synchronized void setOutQueue(String out) 421 { 422 this.out = out; 423 getPersist().getChild("out").setText(out); 424 setModified(true); 425 } 426 427 public String getHost() 428 { 429 return getProperty(getProperties("channel"), "host"); 430 } 431 432 public synchronized void setHost(String host) 433 { 434 setProperty(getProperties("channel"), "host", host); 435 setModified(true); 436 } 437 438 public int getPort() 439 { 440 int port = 0; 441 try 442 { 443 port = Integer.parseInt( 444 getProperty(getProperties("channel"), "port") 445 ); 446 } 447 catch (NumberFormatException e) 448 { 449 getLog().error(e); 450 } 451 return port; 452 } 453 454 public synchronized void setPort(int port) 455 { 456 setProperty( 457 getProperties("channel"), "port", Integer.toString(port) 458 ); 459 setModified(true); 460 } 461 462 public String getSocketFactory() 463 { 464 return getProperty(getProperties("channel"), "socketFactory"); 465 } 466 467 public synchronized void setSocketFactory(String sFac) 468 { 469 setProperty(getProperties("channel"), "socketFactory", sFac); 470 setModified(true); 471 } 472 473 public class Worker implements Runnable 474 { 475 ISOMsg req; 476 int id; 477 478 public Worker(ISOMsg req, int id) 479 { 480 this.req = req; 481 this.id = id; 482 } 483 484 public void run() 485 { 486 Thread.currentThread().setName("channel-worker-" + id); 487 ISOChannel channel = null; 488 489 try 490 { 491 channel = newChannel(channelElement, getFactory()); 492 if (getName() != null) 493 { 494 channel.setName(getName() + id); 495 } 496 497 ISOMsg handBack = null; 498 if (handbackFields.length > 0) 499 { 500 handBack = (ISOMsg) req.clone(handbackFields); 501 } 502 try 503 { 504 channel.connect(); 505 } 506 catch (Throwable e) 507 { 508 takeOffline(); 509 } 510 if (channel.isConnected()) 511 { 512 takeOnline(); 513 channel.send(req); 514 ISOMsg rsp = channel.receive(); 515 channel.disconnect(); 516 if (handBack != null) 517 { 518 rsp.merge(handBack); 519 } 520 sp.out(out, rsp); 521 } 522 } 523 catch (Exception e) 524 { 525 getLog().warn("channel-worker-" + id, e.getMessage()); 526 } 527 finally 528 { 529 try 530 { 531 if (channel != null) 532 { 533 channel.disconnect(); 534 } 535 } 536 catch (Exception e) 537 { 538 getLog().warn("channel-worker-" + id, e.getMessage()); 539 } 540 finally 541 { 542 NameRegistrar.unregister("channel." + getName() + id); 543 } 544 } 545 } 546 } 547}