001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.iso;
020
021import org.jpos.core.Configurable;
022import org.jpos.core.Configuration;
023import org.jpos.core.ConfigurationException;
024import org.jpos.util.*;
025import org.jpos.util.NameRegistrar.NotFoundException;
026
027import java.io.IOException;
028
029/**
030 * Connector implements ISORequestListener
031 * and forward all incoming messages to a given
032 * destination MUX, or Channel handling back responses
033 *
034 * @author <a href="mailto:apr@cs.com.uy">Alejandro P. Revilla</a>
035 * @version $Revision$ $Date$
036 * @see org.jpos.iso.ISORequestListener
037 */
038public class Connector 
039    implements ISORequestListener, LogSource, Configurable
040{
041    private Logger logger;
042    private String realm;
043    private boolean preserveSourceHeader = true;
044    /** Name of the QMUX to look up. */
045    protected String muxName;
046    /** Name of the channel to look up. */
047    protected String channelName;
048    /** Response timeout in milliseconds (0 = no timeout). */
049    protected int timeout = 0;
050    /** Shared thread pool for processing incoming messages. */
051    protected static ThreadPool pool;
052
053    /** Default constructor. */
054    public Connector () {
055        super();
056    }
057    
058    public void setLogger (Logger logger, String realm) {
059        this.logger = logger;
060        this.realm  = realm;
061    }
062    public String getRealm () {
063        return realm;
064    }
065    public Logger getLogger() {
066        return logger;
067    }
068   /**
069    * Destination can be a Channel or a MUX. If Destination is a Channel
070    * then timeout applies (used on ISORequest to get a Response).
071    * <ul>
072    * <li>destination-mux
073    * <li>destination-channel
074    * <li>timeout
075    * <li>poolsize
076    * </ul>
077    * @param cfg Configuration
078    */
079    public void setConfiguration (Configuration cfg)
080        throws ConfigurationException
081    {
082        timeout = cfg.getInt ("timeout");
083        if (pool == null)
084            pool    = new ThreadPool (1, cfg.getInt ("poolsize", 10));
085        muxName     = cfg.get ("destination-mux", null);
086        channelName = cfg.get ("destination-channel", null);
087        preserveSourceHeader = cfg.getBoolean ("preserve-source-header", true);
088        if (muxName == null && channelName == null) {
089            throw new ConfigurationException("Neither destination mux nor channel were specified.");
090        }
091    }
092    
093    /** Runnable that processes a single incoming message through the MUX. */
094    protected class Process implements Runnable {
095        ISOSource source;
096        ISOMsg m;
097        Process (ISOSource source, ISOMsg m) {
098            super();
099            this.source = source;
100            this.m = m;
101        }
102        public void run () {
103            LogEvent evt = new LogEvent (Connector.this, 
104                "connector-request-listener");
105            try {
106                ISOMsg c = (ISOMsg) m.clone();
107                evt.addMessage (c);
108                if (muxName != null) {
109                    MUX destMux = (MUX) NameRegistrar.get (muxName);
110                    ISOMsg response = destMux.request (c, timeout);
111                    if (response != null) {
112                        if (preserveSourceHeader)
113                            response.setHeader (c.getISOHeader());
114                        source.send(response);
115                    }
116                } else if (channelName != null) {
117                    Channel destChannel = (Channel) NameRegistrar.get (channelName);
118                    destChannel.send (c);
119                }
120            } catch (ISOException e) {
121                evt.addMessage (e);
122            } catch (IOException e) {
123                evt.addMessage (e);
124            } catch (NotFoundException e) {
125                evt.addMessage(e);
126            }
127            Logger.log (evt);
128        }
129
130    }
131    public boolean process (ISOSource source, ISOMsg m) {
132        if (pool == null) 
133            pool = new ThreadPool (1, 10);
134
135        pool.execute (new Process (source, m));
136        return true;
137    }
138}