001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import org.jpos.iso.ISOException;
022import org.jpos.iso.ISOMsg;
023import org.jpos.iso.ISOSource;
024import org.jpos.q2.Q2;
025
026import java.io.IOException;
027import java.io.Serializable;
028import java.util.UUID;
029
030/**
031 * Serializable {@link ISOSource} that forwards outbound messages through a
032 * {@link Space}, decoupling the originating source from the response-routing
033 * thread. Used by SendResponse and other deferred-send participants.
034 */
035@SuppressWarnings("unused unchecked")
036public class SpaceSource implements ISOSource, SpaceListener<String,ISOMsg>, Serializable {
037    private static final long serialVersionUID = -2629671264411649185L;
038
039    private transient Space isp = SpaceFactory.getSpace();
040    private transient LocalSpace sp;
041    /** Space key used to correlate request and reply messages. */
042    private String key;
043    /** Message expiration timeout, in milliseconds. */
044    private long timeout;
045    /** Last known connectivity state of the wrapped source. */
046    private boolean connected;
047
048    /**
049     * Constructs a SpaceSource that publishes the original {@code source} into
050     * the internal jPOS space and listens on the correlation key for responses.
051     *
052     * @param sp local space used for response delivery
053     * @param source originating source whose connectivity is captured at construction
054     * @param timeout space-entry lease in milliseconds
055     */
056    public SpaceSource(LocalSpace sp, ISOSource source, long timeout) {
057        this.key = "SS." + UUID.randomUUID().toString();
058        this.connected = source.isConnected();
059        this.sp = sp;
060        sp.addListener(key, this, timeout + 10000L);
061        isp.out (key, source, timeout);
062    }
063
064    /**
065     * Re-initialises the space binding after deserialization.
066     *
067     * @param sp local space used for response delivery
068     * @param timeout space-entry lease in milliseconds
069     */
070    public void init (LocalSpace sp, long timeout) {
071        this.sp = sp;
072        this.timeout = timeout;
073    }
074
075    @Override
076    public void send(ISOMsg m) throws IOException, ISOException {
077        if (sp == null)
078            throw new IOException ("Space not configured");
079        sp.out(key, m, timeout);
080    }
081
082    @Override
083    public boolean isConnected() {
084        return connected; // should be called _was_ connected
085    }
086
087    @Override
088    public void notify(String key, ISOMsg m) {
089        sp.removeListener(this.key, this);
090        ISOSource source = (ISOSource) isp.inp (key);
091        if (m != null && source != null && source.isConnected()) {
092            try {
093                source.send((ISOMsg) m.clone());
094                sp.inp(key); // actually pick it
095            } catch (Exception e) {
096                Q2.getQ2().getLog().warn(e);
097            }
098        }
099    }
100}