001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import org.jpos.iso.ISOException; 022import org.jpos.iso.ISOMsg; 023import org.jpos.iso.ISOSource; 024import org.jpos.q2.Q2; 025 026import java.io.IOException; 027import java.io.Serializable; 028import java.util.UUID; 029 030/** 031 * Serializable {@link ISOSource} that forwards outbound messages through a 032 * {@link Space}, decoupling the originating source from the response-routing 033 * thread. Used by SendResponse and other deferred-send participants. 034 */ 035@SuppressWarnings("unused unchecked") 036public class SpaceSource implements ISOSource, SpaceListener<String,ISOMsg>, Serializable { 037 private static final long serialVersionUID = -2629671264411649185L; 038 039 private transient Space isp = SpaceFactory.getSpace(); 040 private transient LocalSpace sp; 041 /** Space key used to correlate request and reply messages. */ 042 private String key; 043 /** Message expiration timeout, in milliseconds. */ 044 private long timeout; 045 /** Last known connectivity state of the wrapped source. */ 046 private boolean connected; 047 048 /** 049 * Constructs a SpaceSource that publishes the original {@code source} into 050 * the internal jPOS space and listens on the correlation key for responses. 051 * 052 * @param sp local space used for response delivery 053 * @param source originating source whose connectivity is captured at construction 054 * @param timeout space-entry lease in milliseconds 055 */ 056 public SpaceSource(LocalSpace sp, ISOSource source, long timeout) { 057 this.key = "SS." + UUID.randomUUID().toString(); 058 this.connected = source.isConnected(); 059 this.sp = sp; 060 sp.addListener(key, this, timeout + 10000L); 061 isp.out (key, source, timeout); 062 } 063 064 /** 065 * Re-initialises the space binding after deserialization. 066 * 067 * @param sp local space used for response delivery 068 * @param timeout space-entry lease in milliseconds 069 */ 070 public void init (LocalSpace sp, long timeout) { 071 this.sp = sp; 072 this.timeout = timeout; 073 } 074 075 @Override 076 public void send(ISOMsg m) throws IOException, ISOException { 077 if (sp == null) 078 throw new IOException ("Space not configured"); 079 sp.out(key, m, timeout); 080 } 081 082 @Override 083 public boolean isConnected() { 084 return connected; // should be called _was_ connected 085 } 086 087 @Override 088 public void notify(String key, ISOMsg m) { 089 sp.removeListener(this.key, this); 090 ISOSource source = (ISOSource) isp.inp (key); 091 if (m != null && source != null && source.isConnected()) { 092 try { 093 source.send((ISOMsg) m.clone()); 094 sp.inp(key); // actually pick it 095 } catch (Exception e) { 096 Q2.getQ2().getLog().warn(e); 097 } 098 } 099 } 100}