001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.q2.iso; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.Environment; 024import org.jpos.iso.BaseChannel; 025import org.jpos.iso.Channel; 026import org.jpos.iso.FactoryChannel; 027import org.jpos.iso.FilteredChannel; 028import org.jpos.iso.ISOChannel; 029import org.jpos.iso.ISOClientSocketFactory; 030import org.jpos.iso.ISOFilter; 031import org.jpos.iso.ISOMsg; 032import org.jpos.iso.ISOPackager; 033import org.jpos.q2.QBeanSupport; 034import org.jpos.q2.QFactory; 035import org.jpos.space.Space; 036import org.jpos.space.SpaceFactory; 037import org.jpos.space.SpaceUtil; 038import org.jpos.util.LogEvent; 039import org.jpos.util.LogSource; 040import org.jpos.util.Logger; 041import org.jpos.util.NameRegistrar; 042import org.jpos.util.Realm; 043 044import java.io.IOException; 045import java.util.Date; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ScheduledExecutorService; 048import java.util.concurrent.SynchronousQueue; 049import java.util.concurrent.ThreadPoolExecutor; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicInteger; 052 053/** 054 * OneShotChannelAdaptorMK2 connects and disconnects a channel for every message 055 * exchange. It is similar to OneShotChannelAdaptor but uses a thread pool instead 056 * of opening threads statically and supports mux pooling by exposing channel readiness. 057 * 058 * @author Alejandro Revilla 059 * @author Thomas L. Kjeldsen 060 * @author Victor Salaman 061 */ 062@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"}) 063public class OneShotChannelAdaptorMK2 064 extends QBeanSupport 065 implements OneShotChannelAdaptorMK2MBean, Channel, Runnable 066{ 067 Space<String, Object> sp; 068 String in, out, ready; 069 long delay; 070 long checkInterval; 071 int maxConnections; 072 int[] handbackFields; 073 ThreadPoolExecutor threadPool = null; 074 AtomicInteger cnt; 075 Element channelElement; 076 077 ScheduledExecutorService checkTimer; 078 079 /** Default constructor. */ 080 public OneShotChannelAdaptorMK2() 081 { 082 super(); 083 } 084 085 @Override 086 protected String defaultRealm() { 087 return Realm.COMM_CLIENT; 088 } 089 090 @SuppressWarnings("unchecked") 091 private Space<String, Object> grabSpace(Element e) 092 { 093 return (Space<String, Object>) SpaceFactory.getSpace(e != null ? e.getText() : ""); 094 } 095 096 @Override 097 protected void initService() throws Exception 098 { 099 Element persist = getPersist(); 100 channelElement = persist.getChild("channel"); 101 if (channelElement == null) 102 { 103 throw new ConfigurationException("channel element missing"); 104 } 105 sp = grabSpace(persist.getChild("space")); 106 in = Environment.get(persist.getChildTextTrim ("in")); 107 out = Environment.get(persist.getChildTextTrim ("out")); 108 ready = getName() + ".ready"; 109 110 String s = Environment.get(persist.getChildTextTrim ("max-connections")); 111 maxConnections = s != null ? Integer.parseInt(s) : 1; 112 handbackFields = cfg.getInts("handback-field"); 113 114 s = Environment.get(persist.getChildTextTrim ("delay")); 115 delay = s != null ? Integer.parseInt(s) : 2500; 116 117 s = Environment.get(persist.getChildTextTrim("check-interval")); 118 checkInterval = s != null ? Integer.parseInt(s) : 60000; 119 120 NameRegistrar.register(getName(), this); 121 } 122 123 public void startService() 124 { 125 cnt = new AtomicInteger(0); 126 threadPool = new ThreadPoolExecutor(1, 127 maxConnections, 128 10, 129 TimeUnit.SECONDS, 130 new SynchronousQueue<Runnable>()); 131 new Thread(this).start(); 132 133 checkTimer=Executors.newScheduledThreadPool(1); 134 checkTimer.scheduleAtFixedRate(new CheckChannelTask(), 0L, checkInterval,TimeUnit.MILLISECONDS); 135 } 136 137 public void stopService() 138 { 139 if(checkTimer!=null) 140 { 141 checkTimer.shutdown(); 142 checkTimer=null; 143 } 144 145 takeOffline(); 146 sp.out(in, new Object()); 147 threadPool.shutdown(); 148 while (!threadPool.isTerminated()) 149 { 150 try 151 { 152 Thread.sleep(1000L); 153 } 154 catch (InterruptedException e) 155 { 156 } 157 } 158 159 int c=0; 160 while(running()) 161 { 162 try 163 { 164 Thread.sleep(500); 165 } 166 catch (InterruptedException e) 167 { 168 } 169 c++; 170 if(c>10) break; 171 } 172 } 173 174 public void destroyService() 175 { 176 NameRegistrar.unregister(getName()); 177 } 178 179 public boolean isConnected() 180 { 181 return sp != null && sp.rdp(ready) != null; 182 } 183 184 @Override 185 @SuppressWarnings({"StatementWithEmptyBody", "ConstantConditions"}) 186 public void run() 187 { 188 while (running()) 189 { 190 try 191 { 192 Object o = sp.in(in, delay); 193 if (o instanceof ISOMsg) 194 { 195 if(!isConnected()) 196 { 197 continue; 198 } 199 ISOMsg m = (ISOMsg) o; 200 int i = cnt.incrementAndGet(); 201 if (i > 9999) 202 { 203 cnt.set(0); 204 i = cnt.incrementAndGet(); 205 } 206 threadPool.execute(new Worker(m, i)); 207 } 208 } 209 catch (Exception e) 210 { 211 getLog().warn(getName(), e.getMessage()); 212 } 213 } 214 } 215 216 private class CheckChannelTask implements Runnable 217 { 218 @Override 219 public void run() 220 { 221 try 222 { 223 Date lastOnline = (Date) sp.rdp(ready); 224 final LogEvent ev = getLog().createLogEvent("status"); 225 if (isChannelConnectable(true)) 226 { 227 if (lastOnline == null) 228 { 229 ev.addMessage("Channel is now online"); 230 Logger.log(ev); 231 flushInput(); 232 } 233 takeOnline(); 234 } 235 else 236 { 237 takeOffline(); 238 if (lastOnline != null) 239 { 240 ev.addMessage("Channel is now offline"); 241 Logger.log(ev); 242 } 243 } 244 } 245 catch (Throwable e) 246 { 247 getLog().warn(getName(), e.getMessage()); 248 } 249 } 250 251 private boolean isChannelConnectable(boolean showExceptions) 252 { 253 boolean res = false; 254 255 ISOChannel channel = null; 256 try 257 { 258 channel = newChannel(channelElement, getFactory()); 259 if (channel instanceof BaseChannel) 260 { 261 BaseChannel bc = (BaseChannel) channel; 262 bc.setLogger(null, null); 263 } 264 channel.connect(); 265 res = true; 266 } 267 catch (Exception e) 268 { 269 if (showExceptions) 270 { 271 getLog().error(e.getMessage()); 272 } 273 } 274 finally 275 { 276 if (channel != null && channel.isConnected()) 277 { 278 try 279 { 280 channel.disconnect(); 281 } 282 catch (IOException e) 283 { 284 getLog().error(e); 285 } 286 NameRegistrar.unregister("channel."+channel.getName()); 287 } 288 } 289 290 return res; 291 } 292 } 293 294 private void flushInput() 295 { 296 SpaceUtil.wipe(sp,in); 297 } 298 299 private void takeOffline() 300 { 301 SpaceUtil.wipe(sp, ready); 302 } 303 304 private void takeOnline() 305 { 306 sp.put(ready, new Date()); 307 } 308 309 /** 310 * Sends a message via the inbound queue with no expiration. 311 * 312 * @param m message to send 313 */ 314 public void send(ISOMsg m) 315 { 316 sp.out(in, m); 317 } 318 319 /** 320 * Sends a message via the inbound queue with a per-entry lease. 321 * 322 * @param m message to send 323 * @param timeout entry lease in milliseconds 324 */ 325 public void send(ISOMsg m, long timeout) 326 { 327 sp.out(in, m, timeout); 328 } 329 330 public ISOMsg receive() 331 { 332 return (ISOMsg) sp.in(out); 333 } 334 335 public ISOMsg receive(long timeout) 336 { 337 return (ISOMsg) sp.in(out, timeout); 338 } 339 340 private ISOChannel newChannel(Element e, QFactory f) 341 throws ConfigurationException 342 { 343 String channelName = QFactory.getAttributeValue(e, "class"); 344 if (channelName == null) 345 { 346 throw new ConfigurationException("class attribute missing from channel element."); 347 } 348 349 String packagerName = QFactory.getAttributeValue(e, "packager"); 350 351 ISOChannel channel = (ISOChannel) f.newInstance(channelName); 352 ISOPackager packager; 353 if (packagerName != null) 354 { 355 packager = (ISOPackager) f.newInstance(packagerName); 356 channel.setPackager(packager); 357 f.setConfiguration(packager, e); 358 } 359 QFactory.invoke(channel, "setHeader", QFactory.getAttributeValue(e, "header")); 360 f.setLogger(channel, e, getRealm()); 361 f.setConfiguration(channel, e); 362 363 if (channel instanceof FilteredChannel) 364 { 365 addFilters((FilteredChannel) channel, e, f); 366 } 367 368 String socketFactoryString = getSocketFactory(); 369 if (socketFactoryString != null && channel instanceof FactoryChannel) 370 { 371 ISOClientSocketFactory sFac = (ISOClientSocketFactory) getFactory().newInstance(socketFactoryString); 372 if (sFac != null && sFac instanceof LogSource) 373 { 374 ((LogSource) sFac).setLogger(log.getLogger(), getRealm()); 375 } 376 getFactory().setConfiguration(sFac, e); 377 ((FactoryChannel) channel).setSocketFactory(sFac); 378 } 379 380 return channel; 381 } 382 383 private void addFilters(FilteredChannel channel, Element e, QFactory fact) 384 throws ConfigurationException 385 { 386 for (Object o : e.getChildren("filter")) 387 { 388 Element f = (Element) o; 389 String clazz = QFactory.getAttributeValue(f, "class"); 390 ISOFilter filter = (ISOFilter) fact.newInstance(clazz); 391 fact.setLogger(filter, f); 392 fact.setConfiguration(filter, f); 393 String direction = QFactory.getAttributeValue(f, "direction"); 394 if (direction == null) 395 { 396 channel.addFilter(filter); 397 } 398 else if ("incoming".equalsIgnoreCase(direction)) 399 { 400 channel.addIncomingFilter(filter); 401 } 402 else if ("outgoing".equalsIgnoreCase(direction)) 403 { 404 channel.addOutgoingFilter(filter); 405 } 406 else if ("both".equalsIgnoreCase(direction)) 407 { 408 channel.addIncomingFilter(filter); 409 channel.addOutgoingFilter(filter); 410 } 411 } 412 } 413 414 public String getInQueue() 415 { 416 return in; 417 } 418 419 public synchronized void setInQueue(String in) 420 { 421 String old = this.in; 422 this.in = in; 423 if (old != null) 424 { 425 sp.out(old, new Object()); 426 } 427 428 getPersist().getChild("in").setText(in); 429 setModified(true); 430 } 431 432 public String getOutQueue() 433 { 434 return out; 435 } 436 437 public synchronized void setOutQueue(String out) 438 { 439 this.out = out; 440 getPersist().getChild("out").setText(out); 441 setModified(true); 442 } 443 444 public String getHost() 445 { 446 return getProperty(getProperties("channel"), "host"); 447 } 448 449 public synchronized void setHost(String host) 450 { 451 setProperty(getProperties("channel"), "host", host); 452 setModified(true); 453 } 454 455 public int getPort() 456 { 457 int port = 0; 458 try 459 { 460 port = Integer.parseInt( 461 getProperty(getProperties("channel"), "port") 462 ); 463 } 464 catch (NumberFormatException e) 465 { 466 getLog().error(e); 467 } 468 return port; 469 } 470 471 public synchronized void setPort(int port) 472 { 473 setProperty( 474 getProperties("channel"), "port", Integer.toString(port) 475 ); 476 setModified(true); 477 } 478 479 /** 480 * Returns the configured socket-factory class name. 481 * 482 * @return socket factory class name 483 */ 484 public String getSocketFactory() 485 { 486 return getProperty(getProperties("channel"), "socketFactory"); 487 } 488 489 /** 490 * Sets the socket-factory class name. 491 * 492 * @param sFac socket factory class name 493 */ 494 public synchronized void setSocketFactory(String sFac) 495 { 496 setProperty(getProperties("channel"), "socketFactory", sFac); 497 setModified(true); 498 } 499 500 /** Per-message worker that opens an ad-hoc channel, sends a request, and forwards the response. */ 501 public class Worker implements Runnable 502 { 503 ISOMsg req; 504 int id; 505 506 /** 507 * Constructs a Worker for the given request. 508 * 509 * @param req message to send 510 * @param id worker identifier (used in the thread name) 511 */ 512 public Worker(ISOMsg req, int id) 513 { 514 this.req = req; 515 this.id = id; 516 } 517 518 public void run() 519 { 520 Thread.currentThread().setName("channel-worker-" + id); 521 ISOChannel channel = null; 522 523 try 524 { 525 channel = newChannel(channelElement, getFactory()); 526 if (getName() != null) 527 { 528 channel.setName(getName() + id); 529 } 530 531 ISOMsg handBack = null; 532 if (handbackFields.length > 0) 533 { 534 handBack = (ISOMsg) req.clone(handbackFields); 535 } 536 try 537 { 538 channel.connect(); 539 } 540 catch (Throwable e) 541 { 542 takeOffline(); 543 } 544 if (channel.isConnected()) 545 { 546 takeOnline(); 547 channel.send(req); 548 ISOMsg rsp = channel.receive(); 549 channel.disconnect(); 550 if (handBack != null) 551 { 552 rsp.merge(handBack); 553 } 554 sp.out(out, rsp); 555 } 556 } 557 catch (Exception e) 558 { 559 getLog().warn("channel-worker-" + id, e.getMessage()); 560 } 561 finally 562 { 563 try 564 { 565 if (channel != null) 566 { 567 channel.disconnect(); 568 } 569 } 570 catch (Exception e) 571 { 572 getLog().warn("channel-worker-" + id, e.getMessage()); 573 } 574 finally 575 { 576 NameRegistrar.unregister("channel." + getName() + id); 577 } 578 } 579 } 580 } 581}