001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.transaction.participant; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.XmlConfigurable; 024import org.jpos.transaction.AbortParticipant; 025import org.jpos.transaction.TransactionConstants; 026import org.jpos.transaction.TransactionManager; 027import org.jpos.transaction.TransactionParticipant; 028 029import java.io.Serializable; 030import java.util.ArrayList; 031import java.util.Iterator; 032import java.util.List; 033 034/** 035 * Transaction participant that runs a list of nested participants concurrently 036 * (one virtual thread each) and merges their lifecycle results. 037 */ 038@SuppressWarnings("unchecked") 039public class Join 040 implements TransactionConstants, AbortParticipant, 041 XmlConfigurable 042{ 043 /** Default constructor; no instance state to initialise. */ 044 public Join() {} 045 private TransactionManager tm; 046 private final List<TransactionParticipant> participants = new ArrayList<> (); 047 048 /** 049 * Runs {@code prepare} on every nested participant in parallel and merges their results. 050 * 051 * @param id transaction id 052 * @param o transaction context 053 * @return the merged action mask 054 */ 055 public int prepare (long id, Serializable o) { 056 return mergeActions( 057 joinRunners(prepare (createRunners(id, o))) 058 ); 059 } 060 /** 061 * Runs {@code prepareForAbort} on every nested participant in parallel and merges their results. 062 * 063 * @param id transaction id 064 * @param o transaction context 065 * @return the merged action mask 066 */ 067 public int prepareForAbort (long id, Serializable o) { 068 return mergeActions( 069 joinRunners(prepareForAbort (createRunners(id, o))) 070 ); 071 } 072 /** 073 * Runs {@code commit} on every nested participant in parallel. 074 * 075 * @param id transaction id 076 * @param o transaction context 077 */ 078 public void commit (long id, Serializable o) { 079 joinRunners(commit (createRunners(id, o))); 080 } 081 /** 082 * Runs {@code abort} on every nested participant in parallel. 083 * 084 * @param id transaction id 085 * @param o transaction context 086 */ 087 public void abort (long id, Serializable o) { 088 joinRunners(abort (createRunners(id, o))); 089 } 090 /** 091 * Reads {@code <participant>} children from {@code e} and instantiates each via the 092 * associated {@link TransactionManager}. 093 * 094 * @param e XML configuration element 095 * @throws ConfigurationException if any nested participant fails to instantiate 096 */ 097 public void setConfiguration (Element e) throws ConfigurationException { 098 for (Element element : e.getChildren("participant")) { 099 participants.add(tm.createParticipant(element)); 100 } 101 } 102 /** 103 * Captures the {@link TransactionManager} used to construct nested participants. 104 * 105 * @param mgr the hosting transaction manager 106 */ 107 public void setTransactionManager (TransactionManager mgr) { 108 this.tm = mgr; 109 } 110 private Runner[] prepare (Runner[] runners) { 111 for (Runner runner : runners) runner.prepare(); 112 return runners; 113 } 114 private Runner[] prepareForAbort (Runner[] runners) { 115 for (Runner runner : runners) runner.prepareForAbort(); 116 return runners; 117 } 118 private Runner[] commit (Runner[] runners) { 119 for (Runner runner : runners) runner.commit(); 120 return runners; 121 } 122 private Runner[] abort (Runner[] runners) { 123 for (Runner runner : runners) runner.abort(); 124 return runners; 125 } 126 private Runner[] createRunners(long id, Serializable o) { 127 Runner[] runners = new Runner[participants.size()]; 128 Iterator<TransactionParticipant> iter = participants.iterator(); 129 for (int i=0; iter.hasNext(); i++) { 130 runners[i] = new Runner ( 131 iter.next(), id, o 132 ); 133 } 134 return runners; 135 } 136 private Runner[] joinRunners (Runner[] runners) { 137 for (Runner runner : runners) runner.join(); 138 return runners; 139 } 140 private int mergeActions (Runner[] runners) { 141 boolean prepared = true; 142 boolean readonly = true; 143 boolean no_join = true; 144 boolean retry = false; 145 for (Runner runner : runners) { 146 int action = runner.rc; 147 retry = (action & RETRY) == RETRY; 148 if (retry) 149 return RETRY; 150 if ((action & PREPARED) == ABORTED) 151 prepared = false; 152 if ((action & READONLY) != READONLY) 153 readonly = false; 154 if ((action & NO_JOIN) != NO_JOIN) 155 no_join = false; 156 } 157 return (prepared ? PREPARED : ABORTED) | 158 (no_join ? NO_JOIN : 0) | 159 (readonly ? READONLY : 0); 160 } 161 /** 162 * Wraps a single nested {@link TransactionParticipant} in its own virtual thread 163 * so its lifecycle calls can run concurrently with siblings. 164 */ 165 public static class Runner implements Runnable { 166 private TransactionParticipant p; 167 /** Result code returned by the most recent lifecycle call. */ 168 public int rc; 169 long id; 170 int mode; 171 private Serializable ctx; 172 Thread t; 173 /** Mode constant: prepare. */ 174 public static final int PREPARE = 0; 175 /** Mode constant: prepareForAbort. */ 176 public static final int PREPARE_FOR_ABORT = 1; 177 /** Mode constant: commit. */ 178 public static final int COMMIT = 2; 179 /** Mode constant: abort. */ 180 public static final int ABORT = 3; 181 /** Human-readable labels indexed by mode constant. */ 182 public static final String[] MODES = { 183 "prepare", "prepareForAbort", "commit", "abort" 184 }; 185 186 private String threadName; 187 188 /** 189 * Constructs a Runner for the given participant and transaction. 190 * 191 * @param p nested participant to invoke 192 * @param id transaction id 193 * @param ctx transaction context 194 */ 195 public Runner (TransactionParticipant p, long id, Serializable ctx) { 196 this.p = p; 197 this.id = id; 198 this.ctx = ctx; 199 } 200 /** Schedules the participant's {@code prepare} call on a virtual thread. */ 201 public void prepare() { 202 createThread (PREPARE); 203 } 204 /** Schedules the participant's {@code prepareForAbort} call on a virtual thread. */ 205 public void prepareForAbort() { 206 createThread (PREPARE_FOR_ABORT); 207 } 208 /** Schedules the participant's {@code commit} call on a virtual thread. */ 209 public void commit () { 210 createThread (COMMIT); 211 } 212 /** Schedules the participant's {@code abort} call on a virtual thread. */ 213 public void abort () { 214 createThread (ABORT); 215 } 216 /** Invokes the appropriate lifecycle method on the wrapped participant. */ 217 public void run() { 218 switch (mode) { 219 case PREPARE -> rc = p.prepare(id, ctx); 220 case PREPARE_FOR_ABORT -> { 221 if (p instanceof AbortParticipant) 222 rc = ((AbortParticipant) p).prepareForAbort(id, ctx); 223 } 224 case COMMIT -> { 225 if ((rc & NO_JOIN) == 0) 226 p.commit(id, ctx); 227 } 228 case ABORT -> { 229 if ((rc & NO_JOIN) == 0) 230 p.abort(id, ctx); 231 } 232 } 233 } 234 /** Waits for the runner's virtual thread to terminate, swallowing interrupts. */ 235 public void join () { 236 try { 237 t.join (); 238 } catch (InterruptedException ignored) { } 239 } 240 private void createThread (int m) { 241 this.mode = m; 242 this.t = Thread.ofVirtual().name( 243 "%s%s:%s".formatted( 244 MODES[mode], 245 this.getClass().getName(), 246 p.getClass().getName() 247 ) 248 ).start(this); 249 } 250 } 251}