001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.transaction.participant; 020 021import org.jdom2.Element; 022import org.jpos.core.ConfigurationException; 023import org.jpos.core.XmlConfigurable; 024import org.jpos.transaction.AbortParticipant; 025import org.jpos.transaction.TransactionConstants; 026import org.jpos.transaction.TransactionManager; 027import org.jpos.transaction.TransactionParticipant; 028 029import java.io.Serializable; 030import java.util.ArrayList; 031import java.util.Iterator; 032import java.util.List; 033 034@SuppressWarnings("unchecked") 035public class Join 036 implements TransactionConstants, AbortParticipant, 037 XmlConfigurable 038{ 039 private TransactionManager tm; 040 private final List<TransactionParticipant> participants = new ArrayList<> (); 041 042 public int prepare (long id, Serializable o) { 043 return mergeActions( 044 joinRunners(prepare (createRunners(id, o))) 045 ); 046 } 047 public int prepareForAbort (long id, Serializable o) { 048 return mergeActions( 049 joinRunners(prepareForAbort (createRunners(id, o))) 050 ); 051 } 052 public void commit (long id, Serializable o) { 053 joinRunners(commit (createRunners(id, o))); 054 } 055 public void abort (long id, Serializable o) { 056 joinRunners(abort (createRunners(id, o))); 057 } 058 public void setConfiguration (Element e) throws ConfigurationException { 059 for (Element element : e.getChildren("participant")) { 060 participants.add(tm.createParticipant(element)); 061 } 062 } 063 public void setTransactionManager (TransactionManager mgr) { 064 this.tm = mgr; 065 } 066 private Runner[] prepare (Runner[] runners) { 067 for (Runner runner : runners) runner.prepare(); 068 return runners; 069 } 070 private Runner[] prepareForAbort (Runner[] runners) { 071 for (Runner runner : runners) runner.prepareForAbort(); 072 return runners; 073 } 074 private Runner[] commit (Runner[] runners) { 075 for (Runner runner : runners) runner.commit(); 076 return runners; 077 } 078 private Runner[] abort (Runner[] runners) { 079 for (Runner runner : runners) runner.abort(); 080 return runners; 081 } 082 private Runner[] createRunners(long id, Serializable o) { 083 Runner[] runners = new Runner[participants.size()]; 084 Iterator<TransactionParticipant> iter = participants.iterator(); 085 for (int i=0; iter.hasNext(); i++) { 086 runners[i] = new Runner ( 087 iter.next(), id, o 088 ); 089 } 090 return runners; 091 } 092 private Runner[] joinRunners (Runner[] runners) { 093 for (Runner runner : runners) runner.join(); 094 return runners; 095 } 096 private int mergeActions (Runner[] runners) { 097 boolean prepared = true; 098 boolean readonly = true; 099 boolean no_join = true; 100 boolean retry = false; 101 for (Runner runner : runners) { 102 int action = runner.rc; 103 retry = (action & RETRY) == RETRY; 104 if (retry) 105 return RETRY; 106 if ((action & PREPARED) == ABORTED) 107 prepared = false; 108 if ((action & READONLY) != READONLY) 109 readonly = false; 110 if ((action & NO_JOIN) != NO_JOIN) 111 no_join = false; 112 } 113 return (prepared ? PREPARED : ABORTED) | 114 (no_join ? NO_JOIN : 0) | 115 (readonly ? READONLY : 0); 116 } 117 public static class Runner implements Runnable { 118 private TransactionParticipant p; 119 public int rc; 120 long id; 121 int mode; 122 private Serializable ctx; 123 Thread t; 124 public static final int PREPARE = 0; 125 public static final int PREPARE_FOR_ABORT = 1; 126 public static final int COMMIT = 2; 127 public static final int ABORT = 3; 128 public static final String[] MODES = { 129 "prepare", "prepareForAbort", "commit", "abort" 130 }; 131 132 private String threadName; 133 134 public Runner (TransactionParticipant p, long id, Serializable ctx) { 135 this.p = p; 136 this.id = id; 137 this.ctx = ctx; 138 } 139 public void prepare() { 140 createThread (PREPARE); 141 } 142 public void prepareForAbort() { 143 createThread (PREPARE_FOR_ABORT); 144 } 145 public void commit () { 146 createThread (COMMIT); 147 } 148 public void abort () { 149 createThread (ABORT); 150 } 151 public void run() { 152 switch (mode) { 153 case PREPARE -> rc = p.prepare(id, ctx); 154 case PREPARE_FOR_ABORT -> { 155 if (p instanceof AbortParticipant) 156 rc = ((AbortParticipant) p).prepareForAbort(id, ctx); 157 } 158 case COMMIT -> { 159 if ((rc & NO_JOIN) == 0) 160 p.commit(id, ctx); 161 } 162 case ABORT -> { 163 if ((rc & NO_JOIN) == 0) 164 p.abort(id, ctx); 165 } 166 } 167 } 168 public void join () { 169 try { 170 t.join (); 171 } catch (InterruptedException ignored) { } 172 } 173 private void createThread (int m) { 174 this.mode = m; 175 this.t = Thread.ofVirtual().name( 176 "%s%s:%s".formatted( 177 MODES[mode], 178 this.getClass().getName(), 179 p.getClass().getName() 180 ) 181 ).start(this); 182 } 183 } 184}