001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.transaction; 020 021import io.micrometer.core.instrument.Counter; 022import io.micrometer.core.instrument.Gauge; 023import io.micrometer.core.instrument.Meter; 024import io.micrometer.core.instrument.Tags; 025import io.micrometer.core.instrument.Timer; 026import io.micrometer.core.instrument.binder.BaseUnits; 027import org.HdrHistogram.AtomicHistogram; 028import org.jdom2.Element; 029import org.jpos.core.Configuration; 030import org.jpos.core.ConfigurationException; 031import org.jpos.log.evt.Txn; 032import org.jpos.metrics.MeterInfo; 033import org.jpos.function.TriConsumer; 034import org.jpos.function.TriFunction; 035import org.jpos.jfr.TMEvent; 036import org.jpos.metrics.MeterFactory; 037import org.jpos.q2.QBeanSupport; 038import org.jpos.q2.QFactory; 039import org.jpos.space.*; 040import org.jpos.util.*; 041 042import java.io.PrintStream; 043import java.io.Serializable; 044import java.util.*; 045import java.util.concurrent.*; 046import java.util.concurrent.atomic.AtomicInteger; 047 048import java.time.Instant; 049import java.time.LocalDateTime; 050import java.time.ZoneId; 051import java.util.concurrent.atomic.AtomicLong; 052import java.util.concurrent.locks.Lock; 053import java.util.concurrent.locks.ReentrantLock; 054 055import org.jpos.iso.ISOUtil; 056import org.jpos.util.Metrics; 057 058import static org.jpos.transaction.ContextConstants.LOGEVT; 059import static org.jpos.transaction.ContextConstants.TIMESTAMP; 060 061 062/** 063 * Multi-participant transaction manager. 064 * 065 * <p>Reads contexts from a configured input space, walks each transaction 066 * through its prepare/commit/abort lifecycle, and persists state to a 067 * persistent space so in-flight transactions can be recovered after a 068 * restart.</p> 069 */ 070@SuppressWarnings("unchecked") 071public class TransactionManager 072 extends QBeanSupport 073 implements Runnable, TransactionConstants, TransactionManagerMBean, Loggeable, MetricsProvider { 074 075 /** Creates an unconfigured manager; configuration is supplied by Q2 at deploy time. */ 076 public TransactionManager() {} 077 078 /** Space-key prefix used to persist a transaction's serialised context. */ 079 public static final String CONTEXT = "$CONTEXT."; 080 /** Space-key prefix used to persist a transaction's lifecycle state. */ 081 public static final String STATE = "$STATE."; 082 /** Space-key prefix used to persist a transaction's group execution stack. */ 083 public static final String GROUPS = "$GROUPS."; 084 /** Space-key under which retried transactions are queued for re-execution. */ 085 public static final String RETRY_QUEUE = "$RETRY_QUEUE"; 086 /** State marker indicating a transaction is in the prepare phase. */ 087 public static final Integer PREPARING = 0; 088 /** State marker indicating a transaction is in the commit phase. */ 089 public static final Integer COMMITTING = 1; 090 /** State marker indicating a transaction has reached its terminal state. */ 091 public static final Integer DONE = 2; 092 /** Group name used when no explicit group is configured. */ 093 public static final String DEFAULT_GROUP = ""; 094 /** Hard ceiling on participants traversed per transaction (loop prevention). */ 095 public static final long MAX_PARTICIPANTS = 1000; // loop prevention 096 /** Default maximum time, in milliseconds, that a session waits for input. */ 097 public static final long MAX_WAIT = 15000L; 098 /** Configured group-name to participant-list mapping. */ 099 protected Map<String,List<TransactionParticipant>> groups; 100 private Set<Destroyable> destroyables = new HashSet<>(); 101 private static final ThreadLocal<Serializable> tlContext = new ThreadLocal<>(); 102 private static final ThreadLocal<Long> tlId = new ThreadLocal<>(); 103 private Metrics metrics; 104 private Map<TransactionParticipant,ParticipantParams> params = new HashMap<>(); 105 private long globalMaxTime; 106 107 private Space<String,Object> sp; 108 private Space<String,Object> psp; 109 private Space<String,Object> isp; // real input space 110 private Space<String,Object> iisp; // internal input space 111 private String queue; 112 private Lock tailLock = new ReentrantLock(); 113 private final List<TransactionStatusListener> statusListeners = new ArrayList<>(); 114 private boolean hasStatusListeners; 115 private boolean doRecover; 116 private boolean callSelectorOnAbort; 117 private boolean abortOnMisconfiguredGroups; 118 private int sessions; 119 private int maxSessions; 120 private int threshold; 121 private int maxActiveTransactions; 122 private final AtomicInteger activeSessions = new AtomicInteger(); 123 private final AtomicInteger pausedSessions = new AtomicInteger(); 124 125 private final AtomicLong head = new AtomicLong(); 126 private final AtomicLong tail = new AtomicLong(); 127 128 private long retryInterval = 5000L; 129 private long retryTimeout = 60000L; 130 private long pauseTimeout = 60000L; 131 private boolean abortOnPauseTimeout = true; 132 private Runnable retryTask = null; 133 private TPS tps; 134 private ExecutorService executor; 135 private final List<Meter> meters = new ArrayList<>(); 136 137 private Gauge activeSessionsGauge; 138 private Counter transactionCounter; 139 private boolean freezeLog; 140 private UUID uuid = UUID.randomUUID(); 141 142 @Override 143 protected String defaultRealm() { 144 return Realm.TXN; 145 } 146 147 @Override 148 public void initService () throws ConfigurationException { 149 queue = cfg.get ("queue", null); 150 if (queue == null) 151 throw new ConfigurationException ("queue property not specified"); 152 sp = SpaceFactory.getSpace (cfg.get ("space")); 153 isp = iisp = SpaceFactory.getSpace (cfg.get ("input-space", cfg.get ("space"))); 154 psp = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString())); 155 doRecover = cfg.getBoolean ("recover", psp instanceof PersistentSpace); 156 tail.set(cfg.getLong ("initial-tail", 1)); 157 head.set(tail.get()); 158 groups = new HashMap<>(); 159 initParticipants (getPersist()); 160 initStatusListeners (getPersist()); 161 executor = QFactory.executorService(cfg.getBoolean("virtual-threads", true)); 162 } 163 164 @Override 165 public void startService () throws Exception { 166 recover(); 167 if (tps != null) 168 tps.stop(); 169 tps = new TPS (cfg.getBoolean ("auto-update-tps", true)); 170 Thread.ofPlatform().start(this); 171 if (psp.rdp (RETRY_QUEUE) != null) 172 checkRetryTask(); 173 174 if (iisp != isp) { 175 Thread.ofPlatform().unstarted( 176 new InputQueueMonitor() 177 ).start(); 178 } 179 NameRegistrar.register(getName(), this); 180 } 181 182 @Override 183 public void stopService () { 184 NameRegistrar.unregister(getName()); 185 if (iisp != isp) 186 for (Object o=iisp.inp(queue); o != null; o=iisp.inp(queue)) 187 isp.out(queue, o); // push back to replicated space 188 189 meters.forEach(getServer().getMeterRegistry()::remove); 190 tps.stop(); 191 for (Destroyable destroyable : destroyables) { 192 try { 193 destroyable.destroy(); 194 } catch (Throwable t) { 195 getLog().warn (t); 196 } 197 } 198 } 199 /** 200 * Enqueues a context at the tail of the input queue (FIFO ordering). 201 * 202 * @param context serialisable transaction context 203 */ 204 public void queue (Serializable context) { 205 iisp.out(queue, context); 206 } 207 /** 208 * Pushes a context onto the head of the input queue (LIFO ordering). 209 * 210 * @param context serialisable transaction context 211 */ 212 public void push (Serializable context) { 213 iisp.push(queue, context); 214 } 215 /** 216 * Returns the configured input queue name. 217 * 218 * @return the name of the input queue this manager consumes from 219 */ 220 @SuppressWarnings("unused") 221 public String getQueueName() { 222 return queue; 223 } 224 /** 225 * Returns the volatile space used for in-flight transaction state. 226 * 227 * @return the working {@link Space} 228 */ 229 public Space getSpace() { 230 return sp; 231 } 232 /** 233 * Returns the externally visible input space used for queueing contexts. 234 * 235 * @return the input {@link Space} 236 */ 237 public Space getInputSpace() { 238 return isp; 239 } 240 /** 241 * Returns the persistent space used to recover in-flight transactions 242 * across restarts. 243 * 244 * @return the persistent {@link Space} 245 */ 246 public Space getPersistentSpace() { 247 return psp; 248 } 249 250 @Override 251 public void run () { 252 while (running()) { 253 if (heavyLoaded()) { 254 ISOUtil.sleep (100L); 255 getLog().info ("HeavyLoaded - active sessions: " + getActiveSessions()); 256 continue; 257 } 258 Object obj = iisp.in (queue, MAX_WAIT); 259 if (obj instanceof Serializable context) { 260 if (getActiveSessions() <= maxSessions) { 261 if (context instanceof Context ctx) 262 ctx.log ("active=%d, maxSessions=%d".formatted(getActiveSessions(), maxSessions)); 263 int session = activeSessions.incrementAndGet(); 264 transactionCounter.increment(); 265 executor.execute(() -> { 266 try { 267 runTransaction(context, session); 268 } finally { 269 activeSessions.decrementAndGet(); 270 } 271 }); 272 } 273 else { 274 iisp.push(queue, context); // push it back 275 ISOUtil.sleep(100L); 276 } 277 } 278 } 279 } 280 281 private void runTransaction (Serializable context, int session) { 282 long id = 0; 283 List<TransactionParticipant> members; 284 Iterator<TransactionParticipant> iter; 285 boolean abort; 286 LogEvent evt; 287 Profiler prof; 288 Thread thread = Thread.currentThread(); 289 290 prof = null; 291 evt = null; 292 thread.setName (getName() + "-" + session + ":idle"); 293 int action = -1; 294 id = head.getAndIncrement (); 295 TMEvent tme = new TMEvent(getName(), id); 296 Txn txn = new Txn(getName(), id); 297 298 tme.begin(); 299 try { 300 setThreadLocal(id, context); 301 if (hasStatusListeners) 302 notifyStatusListeners (session, TransactionStatusEvent.State.READY, id, "", null); 303 304 Chronometer chronometer = new Chronometer(getStart(context)); 305 306 abort = false; 307 members = new ArrayList<> (); 308 iter = getParticipants (DEFAULT_GROUP).iterator(); 309 evt = new LogEvent() 310 .withSource(log) 311 .withTraceId(getTraceId(id)); 312 evt.addMessage(txn); 313 evt.addMessage(context); 314 prof = new Profiler(); 315 snapshot (id, context, PREPARING); 316 action = prepare (session, id, context, members, iter, abort, evt, prof, chronometer); 317 switch (action) { 318 case PREPARED: 319 if (members.size() > 0) { 320 setState(id, COMMITTING); 321 commit(session, id, context, members, false, evt, prof); 322 } 323 break; 324 case ABORTED: 325 if (members.size() > 0) { 326 abort(session, id, context, members, false, evt, prof); 327 } 328 break; 329 case RETRY: 330 psp.out (RETRY_QUEUE, context); 331 checkRetryTask(); 332 break; 333 case NO_JOIN: 334 break; 335 } 336 snapshot (id, null, DONE); 337 if (id == tail.get()) { 338 checkTail (); 339 } else { 340 purge (id, false); 341 } 342 tps.tick(); 343 } catch (Throwable t) { 344 if (evt == null) 345 getLog().fatal (t); // should never happen 346 else 347 evt.addMessage (t); 348 } finally { 349 removeThreadLocal(); 350 if (hasStatusListeners) { 351 notifyStatusListeners ( 352 session, 353 TransactionStatusEvent.State.DONE, 354 id, "", context); 355 } 356 if (evt != null && (action == PREPARED || action == ABORTED || (action == -1 && prof != null))) { 357 switch (action) { 358 case PREPARED : 359 evt.setTag("commit"); 360 break; 361 case ABORTED : 362 evt.setTag ("abort"); 363 break; 364 case -1: 365 evt.setTag ("undefined"); 366 break; 367 } 368 if (getInTransit() > Math.max(maxActiveTransactions, activeSessions.get()) * 100L) { 369 evt.addMessage("WARNING: IN-TRANSIT TOO HIGH"); 370 } 371 evt.addMessage ( 372 String.format (" %s, elapsed=%dms", 373 tmInfo(), 374 prof.getElapsedInMillis() 375 ) 376 ); 377 evt.addMessage (prof); 378 try { 379 Logger.log(freeze(context, evt, prof)); 380 } catch (Throwable t) { 381 getLog().error(t); 382 } 383 } 384 tme.commit(); 385 } 386 } 387 388 @Override 389 public long getTail () { 390 return tail.get(); 391 } 392 393 @Override 394 public long getHead () { 395 return head.get(); 396 } 397 398 /** 399 * Returns the count of transactions that have been picked up but not yet completed. 400 * 401 * @return number of in-flight transactions 402 */ 403 public long getInTransit () { 404 return head.get() - tail.get(); 405 } 406 407 @Override 408 public void setConfiguration (Configuration cfg) throws ConfigurationException { 409 super.setConfiguration (cfg); 410 retryInterval = cfg.getLong ("retry-interval", retryInterval); 411 retryTimeout = cfg.getLong ("retry-timeout", retryTimeout); 412 pauseTimeout = cfg.getLong ("pause-timeout", pauseTimeout); 413 abortOnPauseTimeout = cfg.getBoolean("abort-on-pause-timeout", true); 414 maxActiveTransactions = cfg.getInt ("max-active-sessions", 0); 415 sessions = cfg.getInt ("sessions", 1); 416 threshold = cfg.getInt ("threshold", sessions / 2); 417 maxSessions = cfg.getInt ("max-sessions", sessions); 418 globalMaxTime = cfg.getLong("max-time", 0L); 419 if (maxSessions < sessions) 420 throw new ConfigurationException("max-sessions < sessions"); 421 if (maxActiveTransactions > 0) { 422 if (maxActiveTransactions < sessions) 423 throw new ConfigurationException("max-active-sessions < sessions"); 424 if (maxActiveTransactions < maxSessions) 425 throw new ConfigurationException("max-active-sessions < max-sessions"); 426 } 427 callSelectorOnAbort = cfg.getBoolean("call-selector-on-abort", true); 428 metrics = new Metrics(new AtomicHistogram(cfg.getLong("metrics-highest-trackable-value", 60000), 2)); 429 abortOnMisconfiguredGroups = cfg.getBoolean("abort-on-misconfigured-groups"); 430 431 try { 432 activeSessionsGauge = MeterFactory.gauge 433 (getServer().getMeterRegistry(), MeterInfo.TM_ACTIVE, Tags.of("name", getName()), BaseUnits.SESSIONS, activeSessions::get 434 ); 435 transactionCounter = MeterFactory.counter 436 (getServer().getMeterRegistry(), MeterInfo.TM_COUNTER, Tags.of("name", getName()) 437 ); 438 meters.add(activeSessionsGauge); 439 meters.add(transactionCounter); 440 } catch (Exception e) { 441 throw new ConfigurationException (e); 442 } 443 freezeLog = cfg.getBoolean("freeze-log", true); 444 } 445 /** 446 * Registers a listener that observes transaction lifecycle transitions. 447 * 448 * @param l listener to add 449 */ 450 public void addListener (TransactionStatusListener l) { 451 synchronized (statusListeners) { 452 statusListeners.add (l); 453 hasStatusListeners = true; 454 } 455 } 456 /** 457 * Removes a previously registered status listener. 458 * 459 * @param l listener to remove 460 */ 461 public void removeListener (TransactionStatusListener l) { 462 synchronized (statusListeners) { 463 statusListeners.remove(l); 464 hasStatusListeners = !statusListeners.isEmpty(); 465 } 466 } 467 /** 468 * Returns the TPS counter that tracks transactions per second for this manager. 469 * 470 * @return the {@link TPS} counter 471 */ 472 public TPS getTPS() { 473 return tps; 474 } 475 476 @Override 477 public String getTPSAsString() { 478 return tps.toString(); 479 } 480 481 @Override 482 public float getTPSAvg() { 483 return tps.getAvg(); 484 } 485 486 @Override 487 public int getTPSPeak() { 488 return tps.getPeak(); 489 } 490 491 @Override 492 public Date getTPSPeakWhen() { 493 return new Date(tps.getPeakWhen()); 494 } 495 496 @Override 497 public long getTPSElapsed() { 498 return tps.getElapsed(); 499 } 500 501 @Override 502 public void resetTPS() { 503 tps.reset(); 504 } 505 506 @Override 507 public Metrics getMetrics() { 508 return metrics; 509 } 510 511 @Override 512 public void dump (PrintStream ps, String indent) { 513 ps.printf ("%s%s%n", indent, tmInfo()); 514 if (metrics != null) { 515 metrics.dump(ps, indent); 516 } 517 } 518 519 /** 520 * Walks {@code members} in order, invoking each participant's commit phase 521 * and recording trace and timer information. 522 * 523 * @param session session index that owns this transaction 524 * @param id transaction identifier 525 * @param context serialised transaction context 526 * @param members participants whose commit hooks will run 527 * @param recover when {@code true}, the manager is replaying a transaction 528 * after restart and should call {@link ContextRecovery} 529 * @param evt optional log event to receive trace messages, or {@code null} 530 * @param prof optional profiler to receive checkpoints, or {@code null} 531 */ 532 protected void commit 533 (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof) 534 { 535 for (TransactionParticipant p :members) { 536 var jfr = new TMEvent.Commit("%s:%s".formatted(getName(), p.getClass().getName()), id); 537 jfr.begin(); 538 ParticipantParams pp = getParams(p); 539 if (recover && p instanceof ContextRecovery cr) { 540 context = recover (cr, id, context, pp, true); 541 if (evt != null) 542 evt.addMessage (Trace.of("commit-recover", getName(p))); 543 } 544 if (hasStatusListeners) 545 notifyStatusListeners ( 546 session, TransactionStatusEvent.State.COMMITING, id, getName(p), context 547 ); 548 commitOrAbort (p, id, context, pp, this::commit); 549 if (evt != null) { 550 evt.addMessage (Trace.of("commit", getName(p))); 551 if (prof != null) 552 prof.checkPoint (" commit: " + getName(p)); 553 } 554 jfr.commit(); 555 } 556 } 557 /** 558 * Walks {@code members} in order, invoking each participant's abort phase 559 * and recording trace and timer information. 560 * 561 * @param session session index that owns this transaction 562 * @param id transaction identifier 563 * @param context serialised transaction context 564 * @param members participants whose abort hooks will run 565 * @param recover when {@code true}, the manager is replaying a transaction 566 * after restart and should call {@link ContextRecovery} 567 * @param evt optional log event to receive trace messages, or {@code null} 568 * @param prof optional profiler to receive checkpoints, or {@code null} 569 */ 570 protected void abort 571 (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof) 572 { 573 for (TransactionParticipant p :members) { 574 ParticipantParams pp = getParams(p); 575 if (recover && p instanceof ContextRecovery cr) { 576 context = recover (cr, id, context, pp, true); 577 if (evt != null) 578 evt.addMessage (Trace.of("abort-recover", getName(p))); 579 } 580 if (hasStatusListeners) 581 notifyStatusListeners ( 582 session, TransactionStatusEvent.State.ABORTING, id, getName(p), context 583 ); 584 585 commitOrAbort (p, id, context, pp, this::abort); 586 if (evt != null) { 587 evt.addMessage (Trace.of("abort", getName(p))); 588 if (prof != null) 589 prof.checkPoint (" abort: " + getName(p)); 590 } 591 } 592 } 593 /** 594 * Invokes {@link AbortParticipant#prepareForAbort(long, Serializable)} on 595 * {@code p} when applicable, swallowing exceptions and recording timer 596 * metrics. 597 * 598 * @param p participant 599 * @param id transaction identifier 600 * @param context serialised transaction context 601 * @return result code from the participant, or {@code ABORTED | NO_JOIN} 602 * when {@code p} is not an {@link AbortParticipant} or threw 603 */ 604 protected int prepareForAbort 605 (TransactionParticipant p, long id, Serializable context) 606 { 607 Chronometer c = new Chronometer(); 608 try { 609 if (p instanceof AbortParticipant) { 610 setThreadName(id, "prepareForAbort", p); 611 return ((AbortParticipant)p).prepareForAbort (id, context); 612 } 613 } catch (Throwable t) { 614 logParticipantWarning("PREPARE-FOR-ABORT: " + id, p, t); 615 } finally { 616 getParams(p).timers.prepareForAbortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 617 if (metrics != null) 618 metrics.record(getName(p) + "-prepare-for-abort", c.elapsed()); 619 } 620 return ABORTED | NO_JOIN; 621 } 622 /** 623 * Invokes the prepare phase on {@code p}, swallowing exceptions (logged as 624 * warnings) and recording timer metrics. 625 * 626 * @param p participant 627 * @param id transaction identifier 628 * @param context serialised transaction context 629 * @return result code from the participant, or {@code ABORTED} when {@code p} threw 630 */ 631 protected int prepare 632 (TransactionParticipant p, long id, Serializable context) 633 { 634 Chronometer c = new Chronometer(); 635 try { 636 setThreadName(id, "prepare", p); 637 return p.prepare (id, context); 638 } catch (Throwable t) { 639 logParticipantWarning("PREPARE: " + id, p, t); 640 } finally { 641 getParams(p).timers.prepareTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 642 if (metrics != null) { 643 metrics.record(getName(p) + "-prepare", c.elapsed()); 644 } 645 } 646 return ABORTED; 647 } 648 /** 649 * Invokes the commit phase on {@code p}, swallowing exceptions (logged as 650 * warnings) and recording timer metrics. 651 * 652 * @param p participant 653 * @param id transaction identifier 654 * @param context serialised transaction context 655 */ 656 protected void commit 657 (TransactionParticipant p, long id, Serializable context) 658 { 659 Chronometer c = new Chronometer(); 660 try { 661 setThreadName(id, "commit", p); 662 p.commit(id, context); 663 } catch (Throwable t) { 664 logParticipantWarning("COMMIT: " + id, p, t); 665 } finally { 666 getParams(p).timers.commitTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 667 if (metrics != null) 668 metrics.record(getName(p) + "-commit", c.elapsed()); 669 } 670 } 671 /** 672 * Invokes the abort phase on {@code p}, swallowing exceptions (logged as 673 * warnings) and recording timer metrics. 674 * 675 * @param p participant 676 * @param id transaction identifier 677 * @param context serialised transaction context 678 */ 679 protected void abort 680 (TransactionParticipant p, long id, Serializable context) 681 { 682 Chronometer c = new Chronometer(); 683 try { 684 setThreadName(id, "abort", p); 685 p.abort(id, context); 686 } catch (Throwable t) { 687 logParticipantWarning("ABORT: " + id, p, t); 688 } finally { 689 getParams(p).timers.abortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 690 if (metrics != null) 691 metrics.record(getName(p) + "-abort", c.elapsed()); 692 } 693 } 694 /** 695 * Drives the prepare phase across {@code iter}, accumulating results into 696 * {@code members} and aborting on retry signals or participant exceptions. 697 * 698 * @param session session index that owns this transaction 699 * @param id transaction identifier 700 * @param context serialised transaction context 701 * @param members accumulator of participants whose prepare ran 702 * @param iter participant iterator (may include selectors / groups) 703 * @param abort when {@code true}, drive the prepare-for-abort path 704 * @param evt optional log event to receive trace messages, or {@code null} 705 * @param prof optional profiler to receive checkpoints, or {@code null} 706 * @param chronometer chronometer used to enforce {@code max-time} 707 * @return the bitwise OR of every participant result, suitable for the 708 * lifecycle dispatcher 709 */ 710 protected int prepare 711 (int session, long id, Serializable context, List<TransactionParticipant> members, Iterator<TransactionParticipant> iter, boolean abort, LogEvent evt, Profiler prof, Chronometer chronometer) 712 { 713 boolean retry = false; 714 for (int i=0; iter.hasNext (); i++) { 715 int action; 716 if (i > MAX_PARTICIPANTS) { 717 getLog().warn ( 718 "loop detected - transaction " +id + " aborted." 719 ); 720 return ABORTED; 721 } 722 TransactionParticipant p = iter.next(); 723 724 ParticipantParams pp = getParams(p); 725 if (!abort && pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime) { 726 abort = true; 727 if (evt != null) 728 evt.addMessage(" forcedAbort: " + getName(p) + " elapsed=" + chronometer.elapsed()); 729 } 730 731 TMEvent jfr; 732 if (abort) { 733 jfr = new TMEvent.PrepareForAbort("%s:%s".formatted(getName(), p.getClass().getName()), id); 734 jfr.begin(); 735 if (hasStatusListeners) 736 notifyStatusListeners ( 737 session, TransactionStatusEvent.State.PREPARING_FOR_ABORT, id, getName(p), context 738 ); 739 740 action = prepareOrAbort (p, id, context, pp, this::prepareForAbort); 741 742 if (evt != null && p instanceof AbortParticipant) { 743 evt.addMessage(Trace.of("prepareForAbort", getName(p))); 744 if (prof != null) 745 prof.checkPoint ("prepareForAbort: " + getName(p)); 746 } 747 } else { 748 if (hasStatusListeners) 749 notifyStatusListeners ( 750 session, TransactionStatusEvent.State.PREPARING, id, getName(p), context 751 ); 752 753 jfr = new TMEvent.Prepare("%s:%s".formatted(getName(), p.getClass().getName()), id); 754 jfr.begin(); 755 756 chronometer.lap(); 757 action = prepareOrAbort (p, id, context, pp, this::prepare); 758 boolean timeout = pp.timeout > 0 && chronometer.partial() > pp.timeout; 759 boolean maxTime = pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime; 760 if (timeout || maxTime) 761 action &= (PREPARED ^ 0xFFFF); 762 763 abort = (action & PREPARED) == ABORTED; 764 retry = (action & RETRY) == RETRY; 765 766 if (evt != null) { 767 evt.addMessage (Trace.of("prepare", getName(p), 768 (abort ? " ABORTED" : " PREPARED") 769 + (timeout ? " TIMEOUT" : "") 770 + (maxTime ? " MAX_TIMEOUT" : "") 771 + (retry ? " RETRY" : "") 772 + ((action & READONLY) == READONLY ? " READONLY" : "") 773 + ((action & NO_JOIN) == NO_JOIN ? " NO_JOIN" : "")) 774 ); 775 if (prof != null) 776 prof.checkPoint ("prepare: " + getName(p)); 777 } 778 } 779 780 if ((action & READONLY) == 0) { 781 Chronometer c = new Chronometer(); 782 snapshot (id, context); 783 getParams(p).timers.snapshotTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 784 if (metrics != null) 785 metrics.record(getName(p) + "-snapshot", c.elapsed()); 786 } 787 if ((action & NO_JOIN) == 0) { 788 members.add (p); 789 } 790 if (p instanceof GroupSelector && ((action & PREPARED) == PREPARED || callSelectorOnAbort)) { 791 String groupName = null; 792 Chronometer c = new Chronometer(); 793 try { 794 groupName = ((GroupSelector)p).select (id, context); 795 } catch (Exception e) { 796 if (evt != null) 797 evt.addMessage (" selector: " + getName(p) + " " + e.getMessage()); 798 else 799 getLog().error (" selector: " + getName(p) + " " + e.getMessage()); 800 } finally { 801 if (metrics != null) 802 metrics.record(getName(p) + "-selector", c.lap()); 803 } 804 if (evt != null) { 805 evt.addMessage (" selector: '" + groupName +"'"); 806 } 807 if (groupName != null) { 808 StringTokenizer st = new StringTokenizer (groupName, " ,"); 809 List<TransactionParticipant> participants = new ArrayList(); 810 while (st.hasMoreTokens ()) { 811 String grp = st.nextToken(); 812 addGroup (id, grp); 813 if (evt != null && groups.get(grp) == null) { 814 evt.addMessage (" WARNING: group '" + grp + "' not configured"); 815 if (abortOnMisconfiguredGroups) 816 abort = true; 817 } 818 participants.addAll (getParticipants (grp)); 819 } 820 while (iter.hasNext()) 821 participants.add (iter.next()); 822 823 iter = participants.iterator(); 824 } 825 } 826 jfr.commit(); 827 } 828 return abort ? retry ? RETRY : ABORTED : PREPARED; 829 } 830 /** 831 * Returns the configured participants for a named group. 832 * 833 * @param groupName group name (use {@link #DEFAULT_GROUP} for the default chain) 834 * @return participants in declaration order; an empty list when {@code groupName} 835 * is unknown 836 */ 837 protected List<TransactionParticipant> getParticipants (String groupName) { 838 List<TransactionParticipant> participants = groups.get (groupName); 839 if (participants == null) { 840 participants = new ArrayList(); 841 } 842 return participants; 843 } 844 /** 845 * Returns the participant chain for a specific transaction, combining the 846 * default group with any groups pushed onto the persistent execution stack. 847 * 848 * @param id transaction identifier 849 * @return participants to invoke in order 850 */ 851 protected List<TransactionParticipant> getParticipants (long id) { 852 // Use a local copy of participant to avoid adding the 853 // GROUP participant to the DEFAULT_GROUP 854 List<TransactionParticipant> participantsChain = new ArrayList<>(); 855 List<TransactionParticipant> participants = getParticipants (DEFAULT_GROUP); 856 // Add DEFAULT_GROUP participants 857 participantsChain.addAll(participants); 858 String key = getKey(GROUPS, id); 859 String grp; 860 // now add participants of Group 861 while ( (grp = (String) psp.inp (key)) != null) { 862 participantsChain.addAll (getParticipants (grp)); 863 } 864 return participantsChain; 865 } 866 867 /** 868 * Instantiates and registers any {@code <status-listener>} children of the 869 * QBean descriptor. 870 * 871 * @param config descriptor element 872 * @throws ConfigurationException if a listener cannot be instantiated or configured 873 */ 874 protected void initStatusListeners (Element config) throws ConfigurationException{ 875 final Iterator iter = config.getChildren ("status-listener").iterator(); 876 while (iter.hasNext()) { 877 final Element e = (Element) iter.next(); 878 final QFactory factory = getFactory(); 879 final TransactionStatusListener listener = (TransactionStatusListener) factory.newInstance (QFactory.getAttributeValue (e, "class")); 880 factory.setConfiguration (listener, config); 881 addListener(listener); 882 } 883 } 884 885 /** 886 * Builds the default participant group plus any named {@code <group>} 887 * children from the QBean descriptor. 888 * 889 * @param config descriptor element 890 * @throws ConfigurationException if a group is missing a name, duplicates an 891 * existing group, or a participant fails to 892 * instantiate 893 */ 894 protected void initParticipants (Element config) 895 throws ConfigurationException 896 { 897 groups.put (DEFAULT_GROUP, initGroup (config)); 898 for (Element e : config.getChildren("group")) { 899 String name = QFactory.getAttributeValue (e, "name"); 900 if (name == null) 901 throw new ConfigurationException ("missing group name"); 902 if (groups.containsKey(name)) { 903 throw new ConfigurationException ( 904 "Group '" + name + "' already defined" 905 ); 906 } 907 groups.put (name, initGroup (e)); 908 } 909 } 910 /** 911 * Instantiates the participants declared inside a {@code <group>} element, 912 * skipping any that are explicitly disabled. 913 * 914 * @param e group element 915 * @return participants in declaration order 916 * @throws ConfigurationException if a participant fails to instantiate 917 */ 918 protected List<TransactionParticipant> initGroup (Element e) 919 throws ConfigurationException 920 { 921 List<TransactionParticipant> group = new ArrayList<>(); 922 for (Element el : e.getChildren ("participant")) { 923 if (QFactory.isEnabled(el)) { 924 group.add(createParticipant(el)); 925 } else { 926 getLog().warn ("participant ignored (enabled='" + QFactory.getEnabledAttribute(el) + "'): " + el.getAttributeValue("class") + "/" + el.getAttributeValue("realm")); 927 } 928 } 929 return group; 930 } 931 /** 932 * Instantiates a participant from a {@code <participant>} element, 933 * configures it, registers per-participant timers, and tracks any 934 * {@link Destroyable} for cleanup at shutdown. 935 * 936 * @param e participant element 937 * @return the configured participant 938 * @throws ConfigurationException if instantiation, configuration, or 939 * timer registration fails 940 */ 941 public TransactionParticipant createParticipant (Element e) 942 throws ConfigurationException 943 { 944 QFactory factory = getFactory(); 945 TransactionParticipant participant = factory.newInstance (QFactory.getAttributeValue (e, "class")); 946 factory.setLogger (participant, e); 947 QFactory.invoke (participant, "setTransactionManager", this, TransactionManager.class); 948 factory.setConfiguration (participant, e); 949 String realm = QFactory.getAttributeValue(e, "realm"); 950 951 try { 952 String participantShortName = participantName(participant, realm); 953 params.put(participant, new ParticipantParams( 954 participantShortName, 955 getLong (e, "timeout", 0L), 956 getLong (e, "max-time", globalMaxTime), 957 getSet(e.getChild("requires")), 958 getSet(e.getChild("provides")), 959 getSet(e.getChild("optional")), 960 getOrCreateTimers(participant) 961 ) 962 ); 963 } catch (Exception ex) { 964 throw new ConfigurationException (ex); 965 } 966 if (participant instanceof Destroyable) { 967 destroyables.add((Destroyable) participant); 968 } 969 return participant; 970 } 971 972 @Override 973 public int getOutstandingTransactions() { 974 if (iisp instanceof LocalSpace) 975 return ((LocalSpace) iisp).size(queue); 976 return -1; 977 } 978 /** 979 * Builds a persistent-space key by combining the manager name, a prefix, 980 * and the transaction id. 981 * 982 * @param prefix key namespace (e.g. {@link #CONTEXT}, {@link #STATE}) 983 * @param id transaction identifier 984 * @return the assembled space key 985 */ 986 protected String getKey (String prefix, long id) { 987 StringBuilder sb = new StringBuilder (getName()); 988 sb.append ('.'); 989 sb.append (prefix); 990 sb.append (id); 991 return sb.toString (); 992 } 993 /** 994 * Disables auto-commit on a {@link JDBMSpace}; no-op for other space types. 995 * 996 * @param sp the space whose auto-commit should be turned off 997 */ 998 protected void commitOff (Space sp) { 999 if (sp instanceof JDBMSpace jsp) { 1000 jsp.setAutoCommit(false); 1001 } 1002 } 1003 /** 1004 * Re-enables auto-commit on a {@link JDBMSpace}, flushing any pending 1005 * writes; no-op for other space types. 1006 * 1007 * @param sp the space whose auto-commit should be turned back on 1008 */ 1009 protected void commitOn (Space sp) { 1010 if (sp instanceof JDBMSpace jsp) { 1011 jsp.commit (); 1012 jsp.setAutoCommit(true); 1013 } 1014 } 1015 /** 1016 * Advances the persistent {@code tail} pointer past any contiguous 1017 * already-DONE transactions, purging their state from the space. 1018 */ 1019 protected void checkTail () { 1020 tailLock.lock(); 1021 try { 1022 while (tailDone()) { 1023 tail.incrementAndGet(); 1024 } 1025 } finally { 1026 tailLock.unlock(); 1027 } 1028 } 1029 /** 1030 * Returns whether the transaction at {@code tail} is already DONE; when 1031 * {@code true}, removes its persistent state. 1032 * 1033 * @return {@code true} when the tail transaction has reached terminal state 1034 */ 1035 protected boolean tailDone () { 1036 String stateKey = getKey(STATE, tail.get()); 1037 if (DONE.equals (psp.rdp (stateKey))) { 1038 purge (tail.get(), true); 1039 return true; 1040 } 1041 return false; 1042 } 1043 /** 1044 * Snapshots the context for transaction {@code id} without changing its state. 1045 * 1046 * @param id transaction identifier 1047 * @param context serialised transaction context, or {@code null} to clear it 1048 */ 1049 protected void snapshot (long id, Serializable context) { 1050 snapshot (id, context, null); 1051 } 1052 /** 1053 * Snapshots the context and optionally updates the persisted state for 1054 * transaction {@code id}. Skipped when recovery is disabled and the 1055 * status is not {@link #DONE}. 1056 * 1057 * @param id transaction identifier 1058 * @param context serialised transaction context, or {@code null} to clear it 1059 * @param status new persisted state, or {@code null} to leave it unchanged 1060 */ 1061 protected void snapshot (long id, Serializable context, Integer status) { 1062 if (!doRecover && status != DONE) 1063 return; // nothing to do 1064 1065 var jfr = new TMEvent.Snapshot(getName()+":"+status, id); 1066 jfr.begin(); 1067 1068 String contextKey = getKey (CONTEXT, id); 1069 synchronized (psp) { 1070 commitOff (psp); 1071 SpaceUtil.wipe(psp, contextKey); 1072 if (context != null) 1073 psp.out (contextKey, context); 1074 1075 if (status != null) { 1076 String stateKey = getKey (STATE, id); 1077 psp.put (stateKey, status); 1078 } 1079 commitOn (psp); 1080 } 1081 jfr.commit(); 1082 } 1083 /** 1084 * Atomically replaces the persisted lifecycle state for {@code id}. 1085 * 1086 * @param id transaction identifier 1087 * @param state new state, or {@code null} to clear it 1088 */ 1089 protected void setState (long id, Integer state) { 1090 String stateKey = getKey (STATE, id); 1091 synchronized (psp) { 1092 commitOff (psp); 1093 SpaceUtil.wipe(psp, stateKey); 1094 if (state!= null) 1095 psp.out (stateKey, state); 1096 commitOn (psp); 1097 } 1098 } 1099 /** 1100 * Pushes a group name onto the persistent execution stack for {@code id}. 1101 * 1102 * @param id transaction identifier 1103 * @param groupName group to schedule, or {@code null} for a no-op 1104 */ 1105 protected void addGroup (long id, String groupName) { 1106 if (groupName != null) 1107 psp.out (getKey (GROUPS, id), groupName); 1108 } 1109 /** 1110 * Removes persistent state associated with a transaction. 1111 * 1112 * @param id transaction identifier 1113 * @param full when {@code true}, also removes the lifecycle state entry 1114 * (otherwise only context and group entries are cleared) 1115 */ 1116 protected void purge (long id, boolean full) { 1117 String stateKey = getKey (STATE, id); 1118 String contextKey = getKey (CONTEXT, id); 1119 String groupsKey = getKey (GROUPS, id); 1120 synchronized (psp) { 1121 commitOff (psp); 1122 if (full) 1123 SpaceUtil.wipe(psp, stateKey); 1124 SpaceUtil.wipe(psp, contextKey); 1125 SpaceUtil.wipe(psp, groupsKey); 1126 commitOn (psp); 1127 } 1128 } 1129 1130 /** 1131 * Replays every in-flight transaction recorded in the persistent space, 1132 * advancing the tail pointer as each completes. No-op when recovery is disabled. 1133 */ 1134 protected void recover () { 1135 if (doRecover) { 1136 if (tail.get() < head.get()) { 1137 getLog().info ("recover - tail=" +tail.get()+", head="+head.get()); 1138 } 1139 while (tail.get() < head.get()) { 1140 recover (tail.getAndIncrement()); 1141 } 1142 } 1143 } 1144 /** 1145 * Replays a single transaction by inspecting its persisted state and 1146 * driving either the commit or abort path before purging its state. 1147 * 1148 * @param id transaction identifier 1149 */ 1150 protected void recover (long id) { 1151 LogEvent evt = getLog().createLogEvent ("recover"); 1152 Profiler prof = new Profiler(); 1153 evt.addMessage ("<id>" + id + "</id>"); 1154 try { 1155 String stateKey = getKey (STATE, id); 1156 String contextKey = getKey (CONTEXT, id); 1157 Integer state = (Integer) psp.rdp (stateKey); 1158 if (state == null) { 1159 evt.addMessage ("unknown stateKey " + stateKey); 1160 SpaceUtil.wipe (psp, contextKey); // just in case ... 1161 return; 1162 } 1163 Serializable context = (Serializable) psp.rdp (contextKey); 1164 if (context != null) 1165 evt.addMessage (context); 1166 1167 if (DONE.equals (state)) { 1168 evt.addMessage ("<done/>"); 1169 } else if (COMMITTING.equals (state)) { 1170 commit (0, id, context, getParticipants (id), true, evt, prof); 1171 } else if (PREPARING.equals (state)) { 1172 abort (0, id, context, getParticipants (id), true, evt, prof); 1173 } 1174 purge (id, true); 1175 } finally { 1176 evt.addMessage (prof); 1177 Logger.log (evt); 1178 } 1179 } 1180 /** 1181 * Lazily starts the {@link RetryTask} that drains the retry queue back into 1182 * the input space. 1183 */ 1184 protected synchronized void checkRetryTask () { 1185 if (retryTask == null) { 1186 retryTask = new RetryTask(); 1187 Thread.ofVirtual().start(retryTask); 1188 } 1189 } 1190 1191 /** 1192 * This method gives the opportunity to decorate a LogEvent right before 1193 * it gets logged. When overriding it, unless you know what you're doing, 1194 * you should return a FrozenLogEvent in order to prevent concurrency issues. 1195 * 1196 * @param context current Context 1197 * @param evt current LogEvent 1198 * @param prof profiler (may be null) 1199 * @return FrozenLogEvent 1200 */ 1201 protected LogEvent freeze(Serializable context, LogEvent evt, Profiler prof) { 1202 return freezeLog ? new FrozenLogEvent(evt) : evt; 1203 } 1204 1205 /** 1206 * Background task that periodically pushes any contexts written to the 1207 * {@link #RETRY_QUEUE} back onto the input queue. 1208 */ 1209 public class RetryTask implements Runnable { 1210 /** Creates the retry task bound to the enclosing manager. */ 1211 public RetryTask() {} 1212 @Override 1213 public void run() { 1214 Thread.currentThread().setName (getName()+"-retry-task"); 1215 while (running()) { 1216 for (Serializable context; (context = (Serializable)psp.rdp (RETRY_QUEUE)) != null;) 1217 { 1218 iisp.out (queue, context, retryTimeout); 1219 psp.inp (RETRY_QUEUE); 1220 } 1221 ISOUtil.sleep(retryInterval); 1222 } 1223 } 1224 } 1225 1226 /** 1227 * Background task that throttles the input queue when active sessions 1228 * exceed the configured threshold. 1229 */ 1230 public class InputQueueMonitor implements Runnable { 1231 /** Creates the input-queue monitor bound to the enclosing manager. */ 1232 public InputQueueMonitor() {} 1233 @Override 1234 public void run() { 1235 Thread.currentThread().setName (getName()+"-input-queue-monitor"); 1236 while (running()) { 1237 while (getOutstandingTransactions() > getActiveSessions() + threshold && running()) { 1238 ISOUtil.sleep(100L); 1239 } 1240 if (!running()) 1241 break; 1242 try { 1243 Object context = isp.in(queue, 1000L); 1244 if (context != null) { 1245 if (!running()) { 1246 isp.out(queue, context); // place it back 1247 break; 1248 } 1249 iisp.out(queue, context); 1250 } 1251 } catch (SpaceError e) { 1252 getLog().error(e); 1253 ISOUtil.sleep(1000L); // relax on error 1254 } 1255 } 1256 } 1257 } 1258 1259 /** 1260 * This method returns the number of sessions that can be started at this point in time 1261 * @return number of sessions 1262 */ 1263 protected int getSessionsToStandUp() { 1264 int outstandingTransactions = getOutstandingTransactions(); 1265 int activeSessions = getActiveSessions(); 1266 int count = 0; 1267 if (activeSessions < maxSessions && outstandingTransactions > threshold) { 1268 count = Math.min(outstandingTransactions, maxSessions - activeSessions); 1269 } 1270 return Math.min(1000, count); // reasonable value for virtual thread creation within one second 1271 } 1272 1273 /** 1274 * Hook used by subclasses to drain a session early. The default 1275 * implementation never asks a session to stand down. 1276 * 1277 * @return {@code true} if the current session should stop accepting new transactions 1278 */ 1279 protected boolean isSessionToStandDown() { 1280 return false; 1281 } 1282 1283 @Override 1284 public int getActiveSessions() { 1285 return activeSessions.intValue(); 1286 } 1287 /** 1288 * @return the maximum number of sessions this manager may scale up to 1289 */ 1290 public int getMaxSessions() { 1291 return maxSessions; 1292 } 1293 /** 1294 * Returns the current thread's transaction context as a raw {@link Serializable}. 1295 * 1296 * @return the thread-local context, or {@code null} when no transaction is in progress 1297 */ 1298 public static Serializable getSerializable() { 1299 return tlContext.get(); 1300 } 1301 /** 1302 * Returns the current thread's transaction context, narrowed to the caller's expected type. 1303 * 1304 * @param <T> caller-supplied context type 1305 * @return the thread-local context, or {@code null} when no transaction is in progress 1306 */ 1307 public static <T extends Serializable> T getContext() { 1308 return (T) tlContext.get(); 1309 } 1310 /** 1311 * Returns the current thread's transaction id, when one is in progress. 1312 * 1313 * @return the thread-local transaction id, or {@code null} 1314 */ 1315 public static Long getId() { 1316 return tlId.get(); 1317 } 1318 1319 1320 private void notifyStatusListeners 1321 (int session, TransactionStatusEvent.State state, long id, String info, Serializable context) 1322 { 1323 TransactionStatusEvent e = new TransactionStatusEvent(session, state, id, info, context); 1324 synchronized (statusListeners) { 1325 for (TransactionStatusListener l : statusListeners) { 1326 l.update (e); 1327 } 1328 } 1329 } 1330 private void setThreadName (long id, String method, TransactionParticipant p) { 1331 Thread.currentThread().setName( 1332 String.format("%s:%d %s %s %s", getName(), id, method, p.getClass().getName(), 1333 LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault())) 1334 ); 1335 } 1336 private void setThreadLocal (long id, Serializable context) { 1337 tlId.set(id); 1338 tlContext.set(context); 1339 } 1340 private void removeThreadLocal() { 1341 tlId.remove(); 1342 tlContext.remove(); 1343 } 1344 1345 private String getName(TransactionParticipant p) { 1346 return Optional.ofNullable(params.get(p)).map(ParticipantParams::name).orElseGet(() -> defaultParticipantName(p)); 1347 } 1348 1349 private ParticipantParams getParams (TransactionParticipant p) { 1350 return Optional.ofNullable(params.get(p)).orElseGet(() -> 1351 new ParticipantParams(defaultParticipantName(p), 0L, 0L, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), 1352 getOrCreateTimers(p)) 1353 ); 1354 } 1355 1356 private String participantName(TransactionParticipant p, String alias) { 1357 String resolvedAlias = alias != null ? alias.trim() : ""; 1358 return !resolvedAlias.isEmpty() ? resolvedAlias : defaultParticipantName(p); 1359 } 1360 1361 private String defaultParticipantName(TransactionParticipant p) { 1362 return Caller.shortClassName(p.getClass().getName()); 1363 } 1364 1365 private void logParticipantWarning(String detail, TransactionParticipant participant, Throwable t) { 1366 Logger.log(getLog().createWarn(detail).withTag("participant", getName(participant)).add(t)); 1367 } 1368 1369 private String tmInfo() { 1370 return String.format ("in-transit=%d, head=%d, tail=%d, paused=%d, outstanding=%d, active-sessions=%d/%d%s", 1371 getInTransit(), head.get(), tail.get(), pausedSessions.get(), getOutstandingTransactions(), 1372 getActiveSessions(), maxSessions, 1373 (tps != null ? ", " + tps : "") 1374 ); 1375 } 1376 1377 private long getLong (Element e, String attributeName, long defValue) { 1378 String s = QFactory.getAttributeValue (e, attributeName); 1379 if (s != null) { 1380 try { 1381 return Long.parseLong(s); 1382 } catch (NumberFormatException ignored) {} 1383 } 1384 return defValue; 1385 } 1386 1387 private Instant getStart (Serializable context) { 1388 if (context instanceof Context) { 1389 Object o = ((Context) context).get(TIMESTAMP); 1390 if (o instanceof Instant) 1391 return (Instant) o; 1392 } 1393 return Instant.now(); 1394 } 1395 1396 private boolean heavyLoaded() { 1397 return getActiveSessions() >= maxSessions; 1398 } 1399 1400 private int pauseAndWait(Serializable context, int action) { 1401 if (context instanceof Pausable pausable) try { 1402 pausedSessions.incrementAndGet(); 1403 Future<Integer> paused = pausable.pause(); 1404 long timeout = pausable.getTimeout(); 1405 timeout = timeout > 0 ? Math.min (timeout, pauseTimeout) : pauseTimeout; 1406 try { 1407 action = paused.get(timeout, TimeUnit.MILLISECONDS); 1408 } catch (InterruptedException | ExecutionException e) { 1409 if (context instanceof Context ctx) 1410 ctx.log(e); 1411 } catch (TimeoutException e) { 1412 action &= (PREPARED ^ 0xFFFF); // turn off 'PREPARED' - we need to abort 1413 } finally { 1414 pausable.reset(); 1415 } 1416 } finally { 1417 pausedSessions.decrementAndGet(); 1418 } 1419 return action; 1420 } 1421 1422 /** 1423 * Per-participant configuration: short name, timeouts, dependency sets, 1424 * and the timer bundle used to record per-phase metrics. 1425 * 1426 * @param name short name used in trace messages and timer tags 1427 * @param timeout per-transaction soft timeout in milliseconds 1428 * @param maxTime hard ceiling in milliseconds (overrides {@code timeout} when smaller) 1429 * @param requires names this participant requires from a prior {@code provides} 1430 * @param provides names this participant exports for downstream {@code requires} 1431 * @param optional names this participant may consume but does not require 1432 * @param timers per-phase Micrometer timers 1433 */ 1434 private record ParticipantParams ( 1435 String name, 1436 long timeout, 1437 long maxTime, 1438 Set<String> requires, 1439 Set<String> provides, 1440 Set<String> optional, 1441 Timers timers 1442 ) 1443 { 1444 public boolean isConstrained() { 1445 return !requires.isEmpty() || !optional.isEmpty(); 1446 } 1447 } 1448 private record Timers ( 1449 io.micrometer.core.instrument.Timer prepareTimer, 1450 io.micrometer.core.instrument.Timer prepareForAbortTimer, 1451 io.micrometer.core.instrument.Timer commitTimer, 1452 io.micrometer.core.instrument.Timer abortTimer, 1453 io.micrometer.core.instrument.Timer snapshotTimer) 1454 { } 1455 /** 1456 * Single line of participant trace data accumulated on a {@link LogEvent} 1457 * during a transaction's lifecycle. 1458 * 1459 * @param phase lifecycle phase tag (e.g. {@code prepare}, {@code commit}) 1460 * @param message free-form message, typically the participant short name 1461 * @param info optional trailing info, rendered after the message 1462 */ 1463 public record Trace (String phase, String message, String info) { 1464 @Override 1465 public String toString() { 1466 return "%15s: %s%s".formatted(phase, message, info); 1467 } 1468 /** 1469 * Creates a trace line with no trailing info. 1470 * 1471 * @param phase lifecycle phase tag 1472 * @param message free-form message 1473 * @return the assembled trace 1474 */ 1475 public static Trace of (String phase, String message) { 1476 return new Trace (phase, message, ""); 1477 } 1478 /** 1479 * Creates a trace line with trailing info. 1480 * 1481 * @param phase lifecycle phase tag 1482 * @param message free-form message 1483 * @param info trailing info 1484 * @return the assembled trace 1485 */ 1486 public static Trace of (String phase, String message, String info) { 1487 return new Trace (phase, message, info); 1488 } 1489 } 1490 1491 private Set<String> getSet (Element e) { 1492 return e != null ? new HashSet<>(Arrays.asList(ISOUtil.commaDecode(e.getTextTrim()))) : Collections.emptySet(); 1493 } 1494 1495 private int prepareOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriFunction<TransactionParticipant, Long, Serializable, Integer> preparationFunction) { 1496 int action; 1497 1498 if (context instanceof Context ctx && pp.isConstrained()) { 1499 if (!ctx.hasKeys(pp.requires.toArray())) { 1500 ctx.log ("missing.requires: '%s'".formatted(ctx.keysNotPresent(pp.requires.toArray()))); 1501 action = ABORTED; 1502 } else { 1503 Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); 1504 action = preparationFunction.apply(p, id, c); 1505 if (!pp.requires.contains(LOGEVT.toString())) { 1506 // if we are not inheriting parent's log event and there's a log event 1507 // in the childs context, copy it. 1508 LogEvent evt = c.get(LOGEVT.toString()); 1509 if (evt != null) { 1510 LogEvent parentLogEvent = ctx.getLogEvent(); 1511 synchronized (parentLogEvent) { 1512 parentLogEvent.getPayLoad().addAll(evt.getPayLoad()); 1513 } 1514 c.remove(LOGEVT.toString()); 1515 } 1516 } 1517 ctx.merge(c.clone(pp.provides.toArray())); 1518 } 1519 } else { 1520 action = preparationFunction.apply(p, id, context); 1521 } 1522 if ((action & PAUSE) == PAUSE) { 1523 var jfrp = new TMEvent.Pause(getName(), id); 1524 jfrp.begin(); 1525 action = pauseAndWait(context, action); 1526 jfrp.commit(); 1527 } 1528 return action; 1529 } 1530 1531 private void commitOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriConsumer<TransactionParticipant, Long, Serializable> preparationFunction) { 1532 if (context instanceof Context ctx && pp.isConstrained()) { 1533 Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); 1534 preparationFunction.accept(p, id, c); 1535 ctx.merge(c.clone(pp.provides.toArray())); 1536 } else { 1537 preparationFunction.accept(p, id, context); 1538 } 1539 } 1540 1541 private Serializable recover (ContextRecovery p, long id, Serializable context, ParticipantParams pp, boolean commit) { 1542 var jfr = new TMEvent.Recover("%s:%s".formatted(getName(), p.getClass().getName()), id); 1543 jfr.begin(); 1544 try { 1545 if (context instanceof Context ctx && pp.isConstrained()) { 1546 Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); 1547 Serializable s = p.recover (id, c, commit); 1548 return (s instanceof Context rc) ? 1549 rc.clone (pp.provides.toArray()) : s; 1550 } else { 1551 return p.recover (id, context, commit); 1552 } 1553 } finally { 1554 jfr.commit(); 1555 } 1556 } 1557 1558 private Timers getOrCreateTimers(TransactionParticipant p) { 1559 return Optional.ofNullable(params.get(p)).map(ParticipantParams::timers).orElseGet(() -> { 1560 String participantShortName = Optional.ofNullable(params.get(p)) 1561 .map(ParticipantParams::name) 1562 .orElseGet(() -> defaultParticipantName(p)); 1563 var mr = getServer().getMeterRegistry(); 1564 var tags = Tags.of("name", getName(), "participant", participantShortName); 1565 String realm = (p instanceof LogSource ls) ? ls.getRealm() : null; 1566 tags = tags.and("realm", (realm != null && !realm.isEmpty()) ? realm.trim() : ""); 1567 1568 return new Timers( 1569 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare"))), 1570 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare-for-abort"))), 1571 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "commit"))), 1572 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "abort"))), 1573 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "snapshot"))) 1574 ); 1575 }); 1576 } 1577 1578 private Timer addTimer (Timer m) { 1579 meters.add (m); 1580 return m; 1581 } 1582 1583 private UUID getTraceId (long transactionId) { 1584 return new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ transactionId); 1585 } 1586}