001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.transaction.participant;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.XmlConfigurable;
024import org.jpos.transaction.AbortParticipant;
025import org.jpos.transaction.TransactionConstants;
026import org.jpos.transaction.TransactionManager;
027import org.jpos.transaction.TransactionParticipant;
028
029import java.io.Serializable;
030import java.util.ArrayList;
031import java.util.Iterator;
032import java.util.List;
033
034@SuppressWarnings("unchecked")
035public class Join
036       implements TransactionConstants, AbortParticipant, 
037                  XmlConfigurable
038{
039    private TransactionManager tm;
040    private final List<TransactionParticipant> participants = new ArrayList<> ();
041
042    public int prepare (long id, Serializable o) {
043        return mergeActions(
044            joinRunners(prepare (createRunners(id, o)))
045        );
046    }
047    public int prepareForAbort  (long id, Serializable o) { 
048        return mergeActions(
049            joinRunners(prepareForAbort (createRunners(id, o)))
050        );
051    }
052    public void commit (long id, Serializable o) { 
053        joinRunners(commit (createRunners(id, o)));
054    }
055    public void abort  (long id, Serializable o) { 
056        joinRunners(abort (createRunners(id, o)));
057    }
058    public void setConfiguration (Element e) throws ConfigurationException {
059        for (Element element : e.getChildren("participant")) {
060            participants.add(tm.createParticipant(element));
061        }
062    }
063    public void setTransactionManager (TransactionManager mgr) {
064        this.tm = mgr;
065    }
066    private Runner[] prepare (Runner[] runners) {
067        for (Runner runner : runners) runner.prepare();
068        return runners;
069    }
070    private Runner[] prepareForAbort (Runner[] runners) {
071        for (Runner runner : runners) runner.prepareForAbort();
072        return runners;
073    }
074    private Runner[] commit (Runner[] runners) {
075        for (Runner runner : runners) runner.commit();
076        return runners;
077    }
078    private Runner[] abort (Runner[] runners) {
079        for (Runner runner : runners) runner.abort();
080        return runners;
081    }
082    private Runner[] createRunners(long id, Serializable o) {
083        Runner[] runners = new Runner[participants.size()];
084        Iterator<TransactionParticipant> iter = participants.iterator();
085        for (int i=0; iter.hasNext(); i++) {
086            runners[i] = new Runner (
087              iter.next(), id, o
088            );
089        }
090        return runners;
091    }
092    private Runner[] joinRunners (Runner[] runners) {
093        for (Runner runner : runners) runner.join();
094        return runners;
095    }
096    private int mergeActions (Runner[] runners) {
097        boolean prepared = true;
098        boolean readonly = true;
099        boolean no_join = true;
100        boolean retry = false;
101        for (Runner runner : runners) {
102            int action = runner.rc;
103            retry = (action & RETRY) == RETRY;
104            if (retry)
105                return RETRY;
106            if ((action & PREPARED) == ABORTED)
107                prepared = false;
108            if ((action & READONLY) != READONLY)
109                readonly = false;
110            if ((action & NO_JOIN) != NO_JOIN)
111                no_join = false;
112        }
113        return (prepared ? PREPARED : ABORTED) |
114               (no_join  ? NO_JOIN  : 0) |
115               (readonly ? READONLY : 0);
116    }
117    public static class Runner implements Runnable {
118        private TransactionParticipant p;
119        public int rc;
120        long id;
121        int mode;
122        private Serializable ctx;
123        Thread t;
124        public static final int PREPARE = 0;
125        public static final int PREPARE_FOR_ABORT = 1;
126        public static final int COMMIT = 2;
127        public static final int ABORT = 3;
128        public static final String[] MODES = {
129            "prepare", "prepareForAbort", "commit", "abort"
130        };
131
132        private String threadName;
133
134        public Runner (TransactionParticipant p, long id, Serializable ctx) {
135            this.p = p;
136            this.id = id;
137            this.ctx = ctx;
138        }
139        public void prepare() {
140            createThread (PREPARE);
141        }
142        public void prepareForAbort() {
143            createThread (PREPARE_FOR_ABORT);
144        }
145        public void commit () {
146            createThread (COMMIT);
147        }
148        public void abort () {
149            createThread (ABORT);
150        }
151        public void run() {
152            switch (mode) {
153                case PREPARE -> rc = p.prepare(id, ctx);
154                case PREPARE_FOR_ABORT -> {
155                    if (p instanceof AbortParticipant)
156                        rc = ((AbortParticipant) p).prepareForAbort(id, ctx);
157                }
158                case COMMIT -> {
159                    if ((rc & NO_JOIN) == 0)
160                        p.commit(id, ctx);
161                }
162                case ABORT -> {
163                    if ((rc & NO_JOIN) == 0)
164                        p.abort(id, ctx);
165                }
166            }
167        }
168        public void join () {
169            try {
170                t.join ();
171            } catch (InterruptedException ignored) { }
172        }
173        private void createThread (int m) {
174            this.mode = m;
175            this.t = Thread.ofVirtual().name(
176              "%s%s:%s".formatted(
177                MODES[mode],
178                this.getClass().getName(),
179                p.getClass().getName()
180              )
181            ).start(this);
182        }
183    }
184}