001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.transaction.participant;
020
021import org.jdom2.Element;
022import org.jpos.core.ConfigurationException;
023import org.jpos.core.XmlConfigurable;
024import org.jpos.transaction.AbortParticipant;
025import org.jpos.transaction.TransactionConstants;
026import org.jpos.transaction.TransactionManager;
027import org.jpos.transaction.TransactionParticipant;
028
029import java.io.Serializable;
030import java.util.ArrayList;
031import java.util.Iterator;
032import java.util.List;
033
034/**
035 * Transaction participant that runs a list of nested participants concurrently
036 * (one virtual thread each) and merges their lifecycle results.
037 */
038@SuppressWarnings("unchecked")
039public class Join
040       implements TransactionConstants, AbortParticipant,
041                  XmlConfigurable
042{
043    /** Default constructor; no instance state to initialise. */
044    public Join() {}
045    private TransactionManager tm;
046    private final List<TransactionParticipant> participants = new ArrayList<> ();
047
048    /**
049     * Runs {@code prepare} on every nested participant in parallel and merges their results.
050     *
051     * @param id transaction id
052     * @param o transaction context
053     * @return the merged action mask
054     */
055    public int prepare (long id, Serializable o) {
056        return mergeActions(
057            joinRunners(prepare (createRunners(id, o)))
058        );
059    }
060    /**
061     * Runs {@code prepareForAbort} on every nested participant in parallel and merges their results.
062     *
063     * @param id transaction id
064     * @param o transaction context
065     * @return the merged action mask
066     */
067    public int prepareForAbort  (long id, Serializable o) {
068        return mergeActions(
069            joinRunners(prepareForAbort (createRunners(id, o)))
070        );
071    }
072    /**
073     * Runs {@code commit} on every nested participant in parallel.
074     *
075     * @param id transaction id
076     * @param o transaction context
077     */
078    public void commit (long id, Serializable o) {
079        joinRunners(commit (createRunners(id, o)));
080    }
081    /**
082     * Runs {@code abort} on every nested participant in parallel.
083     *
084     * @param id transaction id
085     * @param o transaction context
086     */
087    public void abort  (long id, Serializable o) {
088        joinRunners(abort (createRunners(id, o)));
089    }
090    /**
091     * Reads {@code <participant>} children from {@code e} and instantiates each via the
092     * associated {@link TransactionManager}.
093     *
094     * @param e XML configuration element
095     * @throws ConfigurationException if any nested participant fails to instantiate
096     */
097    public void setConfiguration (Element e) throws ConfigurationException {
098        for (Element element : e.getChildren("participant")) {
099            participants.add(tm.createParticipant(element));
100        }
101    }
102    /**
103     * Captures the {@link TransactionManager} used to construct nested participants.
104     *
105     * @param mgr the hosting transaction manager
106     */
107    public void setTransactionManager (TransactionManager mgr) {
108        this.tm = mgr;
109    }
110    private Runner[] prepare (Runner[] runners) {
111        for (Runner runner : runners) runner.prepare();
112        return runners;
113    }
114    private Runner[] prepareForAbort (Runner[] runners) {
115        for (Runner runner : runners) runner.prepareForAbort();
116        return runners;
117    }
118    private Runner[] commit (Runner[] runners) {
119        for (Runner runner : runners) runner.commit();
120        return runners;
121    }
122    private Runner[] abort (Runner[] runners) {
123        for (Runner runner : runners) runner.abort();
124        return runners;
125    }
126    private Runner[] createRunners(long id, Serializable o) {
127        Runner[] runners = new Runner[participants.size()];
128        Iterator<TransactionParticipant> iter = participants.iterator();
129        for (int i=0; iter.hasNext(); i++) {
130            runners[i] = new Runner (
131              iter.next(), id, o
132            );
133        }
134        return runners;
135    }
136    private Runner[] joinRunners (Runner[] runners) {
137        for (Runner runner : runners) runner.join();
138        return runners;
139    }
140    private int mergeActions (Runner[] runners) {
141        boolean prepared = true;
142        boolean readonly = true;
143        boolean no_join = true;
144        boolean retry = false;
145        for (Runner runner : runners) {
146            int action = runner.rc;
147            retry = (action & RETRY) == RETRY;
148            if (retry)
149                return RETRY;
150            if ((action & PREPARED) == ABORTED)
151                prepared = false;
152            if ((action & READONLY) != READONLY)
153                readonly = false;
154            if ((action & NO_JOIN) != NO_JOIN)
155                no_join = false;
156        }
157        return (prepared ? PREPARED : ABORTED) |
158               (no_join  ? NO_JOIN  : 0) |
159               (readonly ? READONLY : 0);
160    }
161    /**
162     * Wraps a single nested {@link TransactionParticipant} in its own virtual thread
163     * so its lifecycle calls can run concurrently with siblings.
164     */
165    public static class Runner implements Runnable {
166        private TransactionParticipant p;
167        /** Result code returned by the most recent lifecycle call. */
168        public int rc;
169        long id;
170        int mode;
171        private Serializable ctx;
172        Thread t;
173        /** Mode constant: prepare. */
174        public static final int PREPARE = 0;
175        /** Mode constant: prepareForAbort. */
176        public static final int PREPARE_FOR_ABORT = 1;
177        /** Mode constant: commit. */
178        public static final int COMMIT = 2;
179        /** Mode constant: abort. */
180        public static final int ABORT = 3;
181        /** Human-readable labels indexed by mode constant. */
182        public static final String[] MODES = {
183            "prepare", "prepareForAbort", "commit", "abort"
184        };
185
186        private String threadName;
187
188        /**
189         * Constructs a Runner for the given participant and transaction.
190         *
191         * @param p nested participant to invoke
192         * @param id transaction id
193         * @param ctx transaction context
194         */
195        public Runner (TransactionParticipant p, long id, Serializable ctx) {
196            this.p = p;
197            this.id = id;
198            this.ctx = ctx;
199        }
200        /** Schedules the participant's {@code prepare} call on a virtual thread. */
201        public void prepare() {
202            createThread (PREPARE);
203        }
204        /** Schedules the participant's {@code prepareForAbort} call on a virtual thread. */
205        public void prepareForAbort() {
206            createThread (PREPARE_FOR_ABORT);
207        }
208        /** Schedules the participant's {@code commit} call on a virtual thread. */
209        public void commit () {
210            createThread (COMMIT);
211        }
212        /** Schedules the participant's {@code abort} call on a virtual thread. */
213        public void abort () {
214            createThread (ABORT);
215        }
216        /** Invokes the appropriate lifecycle method on the wrapped participant. */
217        public void run() {
218            switch (mode) {
219                case PREPARE -> rc = p.prepare(id, ctx);
220                case PREPARE_FOR_ABORT -> {
221                    if (p instanceof AbortParticipant)
222                        rc = ((AbortParticipant) p).prepareForAbort(id, ctx);
223                }
224                case COMMIT -> {
225                    if ((rc & NO_JOIN) == 0)
226                        p.commit(id, ctx);
227                }
228                case ABORT -> {
229                    if ((rc & NO_JOIN) == 0)
230                        p.abort(id, ctx);
231                }
232            }
233        }
234        /** Waits for the runner's virtual thread to terminate, swallowing interrupts. */
235        public void join () {
236            try {
237                t.join ();
238            } catch (InterruptedException ignored) { }
239        }
240        private void createThread (int m) {
241            this.mode = m;
242            this.t = Thread.ofVirtual().name(
243              "%s%s:%s".formatted(
244                MODES[mode],
245                this.getClass().getName(),
246                p.getClass().getName()
247              )
248            ).start(this);
249        }
250    }
251}