001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.transaction; 020 021import io.micrometer.core.instrument.Counter; 022import io.micrometer.core.instrument.Gauge; 023import io.micrometer.core.instrument.Meter; 024import io.micrometer.core.instrument.Tags; 025import io.micrometer.core.instrument.Timer; 026import io.micrometer.core.instrument.binder.BaseUnits; 027import org.HdrHistogram.AtomicHistogram; 028import org.jdom2.Element; 029import org.jpos.core.Configuration; 030import org.jpos.core.ConfigurationException; 031import org.jpos.log.evt.Txn; 032import org.jpos.metrics.MeterInfo; 033import org.jpos.function.TriConsumer; 034import org.jpos.function.TriFunction; 035import org.jpos.jfr.TMEvent; 036import org.jpos.metrics.MeterFactory; 037import org.jpos.q2.QBeanSupport; 038import org.jpos.q2.QFactory; 039import org.jpos.space.*; 040import org.jpos.util.*; 041 042import java.io.PrintStream; 043import java.io.Serializable; 044import java.util.*; 045import java.util.concurrent.*; 046import java.util.concurrent.atomic.AtomicInteger; 047 048import java.time.Instant; 049import java.time.LocalDateTime; 050import java.time.ZoneId; 051import java.util.concurrent.atomic.AtomicLong; 052import java.util.concurrent.locks.Lock; 053import java.util.concurrent.locks.ReentrantLock; 054 055import org.jpos.iso.ISOUtil; 056import org.jpos.util.Metrics; 057 058import static org.jpos.transaction.ContextConstants.LOGEVT; 059import static org.jpos.transaction.ContextConstants.TIMESTAMP; 060 061 062@SuppressWarnings("unchecked") 063public class TransactionManager 064 extends QBeanSupport 065 implements Runnable, TransactionConstants, TransactionManagerMBean, Loggeable, MetricsProvider { 066 067 public static final String CONTEXT = "$CONTEXT."; 068 public static final String STATE = "$STATE."; 069 public static final String GROUPS = "$GROUPS."; 070 public static final String RETRY_QUEUE = "$RETRY_QUEUE"; 071 public static final Integer PREPARING = 0; 072 public static final Integer COMMITTING = 1; 073 public static final Integer DONE = 2; 074 public static final String DEFAULT_GROUP = ""; 075 public static final long MAX_PARTICIPANTS = 1000; // loop prevention 076 public static final long MAX_WAIT = 15000L; 077 protected Map<String,List<TransactionParticipant>> groups; 078 private Set<Destroyable> destroyables = new HashSet<>(); 079 private static final ThreadLocal<Serializable> tlContext = new ThreadLocal<>(); 080 private static final ThreadLocal<Long> tlId = new ThreadLocal<>(); 081 private Metrics metrics; 082 private Map<TransactionParticipant,ParticipantParams> params = new HashMap<>(); 083 private long globalMaxTime; 084 085 private Space<String,Object> sp; 086 private Space<String,Object> psp; 087 private Space<String,Object> isp; // real input space 088 private Space<String,Object> iisp; // internal input space 089 private String queue; 090 private Lock tailLock = new ReentrantLock(); 091 private final List<TransactionStatusListener> statusListeners = new ArrayList<>(); 092 private boolean hasStatusListeners; 093 private boolean doRecover; 094 private boolean callSelectorOnAbort; 095 private boolean abortOnMisconfiguredGroups; 096 private int sessions; 097 private int maxSessions; 098 private int threshold; 099 private int maxActiveTransactions; 100 private final AtomicInteger activeSessions = new AtomicInteger(); 101 private final AtomicInteger pausedSessions = new AtomicInteger(); 102 103 private final AtomicLong head = new AtomicLong(); 104 private final AtomicLong tail = new AtomicLong(); 105 106 private long retryInterval = 5000L; 107 private long retryTimeout = 60000L; 108 private long pauseTimeout = 60000L; 109 private boolean abortOnPauseTimeout = true; 110 private Runnable retryTask = null; 111 private TPS tps; 112 private ExecutorService executor; 113 private final List<Meter> meters = new ArrayList<>(); 114 115 private Gauge activeSessionsGauge; 116 private Counter transactionCounter; 117 private boolean freezeLog; 118 private UUID uuid = UUID.randomUUID(); 119 120 @Override 121 public void initService () throws ConfigurationException { 122 queue = cfg.get ("queue", null); 123 if (queue == null) 124 throw new ConfigurationException ("queue property not specified"); 125 sp = SpaceFactory.getSpace (cfg.get ("space")); 126 isp = iisp = SpaceFactory.getSpace (cfg.get ("input-space", cfg.get ("space"))); 127 psp = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString())); 128 doRecover = cfg.getBoolean ("recover", psp instanceof PersistentSpace); 129 tail.set(cfg.getLong ("initial-tail", 1)); 130 head.set(tail.get()); 131 groups = new HashMap<>(); 132 initParticipants (getPersist()); 133 initStatusListeners (getPersist()); 134 executor = QFactory.executorService(cfg.getBoolean("virtual-threads", true)); 135 } 136 137 @Override 138 public void startService () throws Exception { 139 recover(); 140 if (tps != null) 141 tps.stop(); 142 tps = new TPS (cfg.getBoolean ("auto-update-tps", true)); 143 Thread.ofPlatform().start(this); 144 if (psp.rdp (RETRY_QUEUE) != null) 145 checkRetryTask(); 146 147 if (iisp != isp) { 148 Thread.ofPlatform().unstarted( 149 new InputQueueMonitor() 150 ).start(); 151 } 152 NameRegistrar.register(getName(), this); 153 } 154 155 @Override 156 public void stopService () { 157 NameRegistrar.unregister(getName()); 158 if (iisp != isp) 159 for (Object o=iisp.inp(queue); o != null; o=iisp.inp(queue)) 160 isp.out(queue, o); // push back to replicated space 161 162 meters.forEach(getServer().getMeterRegistry()::remove); 163 tps.stop(); 164 for (Destroyable destroyable : destroyables) { 165 try { 166 destroyable.destroy(); 167 } catch (Throwable t) { 168 getLog().warn (t); 169 } 170 } 171 } 172 public void queue (Serializable context) { 173 iisp.out(queue, context); 174 } 175 public void push (Serializable context) { 176 iisp.push(queue, context); 177 } 178 @SuppressWarnings("unused") 179 public String getQueueName() { 180 return queue; 181 } 182 public Space getSpace() { 183 return sp; 184 } 185 public Space getInputSpace() { 186 return isp; 187 } 188 public Space getPersistentSpace() { 189 return psp; 190 } 191 192 @Override 193 public void run () { 194 while (running()) { 195 if (heavyLoaded()) { 196 ISOUtil.sleep (100L); 197 getLog().info ("HeavyLoaded - active sessions: " + getActiveSessions()); 198 continue; 199 } 200 Object obj = iisp.in (queue, MAX_WAIT); 201 if (obj instanceof Serializable context) { 202 if (getActiveSessions() <= maxSessions) { 203 if (context instanceof Context ctx) 204 ctx.log ("active=%d, maxSessions=%d".formatted(getActiveSessions(), maxSessions)); 205 int session = activeSessions.incrementAndGet(); 206 transactionCounter.increment(); 207 executor.execute(() -> { 208 try { 209 runTransaction(context, session); 210 } finally { 211 activeSessions.decrementAndGet(); 212 } 213 }); 214 } 215 else { 216 iisp.push(queue, context); // push it back 217 ISOUtil.sleep(100L); 218 } 219 } 220 } 221 } 222 223 private void runTransaction (Serializable context, int session) { 224 long id = 0; 225 List<TransactionParticipant> members; 226 Iterator<TransactionParticipant> iter; 227 boolean abort; 228 LogEvent evt; 229 Profiler prof; 230 Thread thread = Thread.currentThread(); 231 232 prof = null; 233 evt = null; 234 thread.setName (getName() + "-" + session + ":idle"); 235 int action = -1; 236 id = head.getAndIncrement (); 237 TMEvent tme = new TMEvent(getName(), id); 238 Txn txn = new Txn(getName(), id); 239 240 tme.begin(); 241 try { 242 setThreadLocal(id, context); 243 if (hasStatusListeners) 244 notifyStatusListeners (session, TransactionStatusEvent.State.READY, id, "", null); 245 246 Chronometer chronometer = new Chronometer(getStart(context)); 247 248 abort = false; 249 members = new ArrayList<> (); 250 iter = getParticipants (DEFAULT_GROUP).iterator(); 251 evt = new LogEvent() 252 .withSource(log) 253 .withTraceId(getTraceId(id)); 254 evt.addMessage(txn); 255 evt.addMessage(context); 256 prof = new Profiler(); 257 snapshot (id, context, PREPARING); 258 action = prepare (session, id, context, members, iter, abort, evt, prof, chronometer); 259 switch (action) { 260 case PREPARED: 261 if (members.size() > 0) { 262 setState(id, COMMITTING); 263 commit(session, id, context, members, false, evt, prof); 264 } 265 break; 266 case ABORTED: 267 if (members.size() > 0) { 268 abort(session, id, context, members, false, evt, prof); 269 } 270 break; 271 case RETRY: 272 psp.out (RETRY_QUEUE, context); 273 checkRetryTask(); 274 break; 275 case NO_JOIN: 276 break; 277 } 278 snapshot (id, null, DONE); 279 if (id == tail.get()) { 280 checkTail (); 281 } else { 282 purge (id, false); 283 } 284 tps.tick(); 285 } catch (Throwable t) { 286 if (evt == null) 287 getLog().fatal (t); // should never happen 288 else 289 evt.addMessage (t); 290 } finally { 291 removeThreadLocal(); 292 if (hasStatusListeners) { 293 notifyStatusListeners ( 294 session, 295 TransactionStatusEvent.State.DONE, 296 id, "", context); 297 } 298 if (evt != null && (action == PREPARED || action == ABORTED || (action == -1 && prof != null))) { 299 switch (action) { 300 case PREPARED : 301 evt.setTag("commit"); 302 break; 303 case ABORTED : 304 evt.setTag ("abort"); 305 break; 306 case -1: 307 evt.setTag ("undefined"); 308 break; 309 } 310 if (getInTransit() > Math.max(maxActiveTransactions, activeSessions.get()) * 100L) { 311 evt.addMessage("WARNING: IN-TRANSIT TOO HIGH"); 312 } 313 evt.addMessage ( 314 String.format (" %s, elapsed=%dms", 315 tmInfo(), 316 prof.getElapsedInMillis() 317 ) 318 ); 319 evt.addMessage (prof); 320 try { 321 Logger.log(freeze(context, evt, prof)); 322 } catch (Throwable t) { 323 getLog().error(t); 324 } 325 } 326 tme.commit(); 327 } 328 } 329 330 @Override 331 public long getTail () { 332 return tail.get(); 333 } 334 335 @Override 336 public long getHead () { 337 return head.get(); 338 } 339 340 public long getInTransit () { 341 return head.get() - tail.get(); 342 } 343 344 @Override 345 public void setConfiguration (Configuration cfg) throws ConfigurationException { 346 super.setConfiguration (cfg); 347 retryInterval = cfg.getLong ("retry-interval", retryInterval); 348 retryTimeout = cfg.getLong ("retry-timeout", retryTimeout); 349 pauseTimeout = cfg.getLong ("pause-timeout", pauseTimeout); 350 abortOnPauseTimeout = cfg.getBoolean("abort-on-pause-timeout", true); 351 maxActiveTransactions = cfg.getInt ("max-active-sessions", 0); 352 sessions = cfg.getInt ("sessions", 1); 353 threshold = cfg.getInt ("threshold", sessions / 2); 354 maxSessions = cfg.getInt ("max-sessions", sessions); 355 globalMaxTime = cfg.getLong("max-time", 0L); 356 if (maxSessions < sessions) 357 throw new ConfigurationException("max-sessions < sessions"); 358 if (maxActiveTransactions > 0) { 359 if (maxActiveTransactions < sessions) 360 throw new ConfigurationException("max-active-sessions < sessions"); 361 if (maxActiveTransactions < maxSessions) 362 throw new ConfigurationException("max-active-sessions < max-sessions"); 363 } 364 callSelectorOnAbort = cfg.getBoolean("call-selector-on-abort", true); 365 metrics = new Metrics(new AtomicHistogram(cfg.getLong("metrics-highest-trackable-value", 60000), 2)); 366 abortOnMisconfiguredGroups = cfg.getBoolean("abort-on-misconfigured-groups"); 367 368 try { 369 activeSessionsGauge = MeterFactory.gauge 370 (getServer().getMeterRegistry(), MeterInfo.TM_ACTIVE, Tags.of("name", getName()), BaseUnits.SESSIONS, activeSessions::get 371 ); 372 transactionCounter = MeterFactory.counter 373 (getServer().getMeterRegistry(), MeterInfo.TM_COUNTER, Tags.of("name", getName()) 374 ); 375 meters.add(activeSessionsGauge); 376 meters.add(transactionCounter); 377 } catch (Exception e) { 378 throw new ConfigurationException (e); 379 } 380 freezeLog = cfg.getBoolean("freeze-log", true); 381 } 382 public void addListener (TransactionStatusListener l) { 383 synchronized (statusListeners) { 384 statusListeners.add (l); 385 hasStatusListeners = true; 386 } 387 } 388 public void removeListener (TransactionStatusListener l) { 389 synchronized (statusListeners) { 390 statusListeners.remove(l); 391 hasStatusListeners = !statusListeners.isEmpty(); 392 } 393 } 394 public TPS getTPS() { 395 return tps; 396 } 397 398 @Override 399 public String getTPSAsString() { 400 return tps.toString(); 401 } 402 403 @Override 404 public float getTPSAvg() { 405 return tps.getAvg(); 406 } 407 408 @Override 409 public int getTPSPeak() { 410 return tps.getPeak(); 411 } 412 413 @Override 414 public Date getTPSPeakWhen() { 415 return new Date(tps.getPeakWhen()); 416 } 417 418 @Override 419 public long getTPSElapsed() { 420 return tps.getElapsed(); 421 } 422 423 @Override 424 public void resetTPS() { 425 tps.reset(); 426 } 427 428 @Override 429 public Metrics getMetrics() { 430 return metrics; 431 } 432 433 @Override 434 public void dump (PrintStream ps, String indent) { 435 ps.printf ("%s%s%n", indent, tmInfo()); 436 if (metrics != null) { 437 metrics.dump(ps, indent); 438 } 439 } 440 441 protected void commit 442 (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof) 443 { 444 for (TransactionParticipant p :members) { 445 var jfr = new TMEvent.Commit("%s:%s".formatted(getName(), p.getClass().getName()), id); 446 jfr.begin(); 447 ParticipantParams pp = getParams(p); 448 if (recover && p instanceof ContextRecovery cr) { 449 context = recover (cr, id, context, pp, true); 450 if (evt != null) 451 evt.addMessage (Trace.of("commit-recover", getName(p))); 452 } 453 if (hasStatusListeners) 454 notifyStatusListeners ( 455 session, TransactionStatusEvent.State.COMMITING, id, getName(p), context 456 ); 457 commitOrAbort (p, id, context, pp, this::commit); 458 if (evt != null) { 459 evt.addMessage (Trace.of("commit", getName(p))); 460 if (prof != null) 461 prof.checkPoint (" commit: " + getName(p)); 462 } 463 jfr.commit(); 464 } 465 } 466 protected void abort 467 (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof) 468 { 469 for (TransactionParticipant p :members) { 470 ParticipantParams pp = getParams(p); 471 if (recover && p instanceof ContextRecovery cr) { 472 context = recover (cr, id, context, pp, true); 473 if (evt != null) 474 evt.addMessage (Trace.of("abort-recover", getName(p))); 475 } 476 if (hasStatusListeners) 477 notifyStatusListeners ( 478 session, TransactionStatusEvent.State.ABORTING, id, getName(p), context 479 ); 480 481 commitOrAbort (p, id, context, pp, this::abort); 482 if (evt != null) { 483 evt.addMessage (Trace.of("abort", getName(p))); 484 if (prof != null) 485 prof.checkPoint (" abort: " + getName(p)); 486 } 487 } 488 } 489 protected int prepareForAbort 490 (TransactionParticipant p, long id, Serializable context) 491 { 492 Chronometer c = new Chronometer(); 493 try { 494 if (p instanceof AbortParticipant) { 495 setThreadName(id, "prepareForAbort", p); 496 return ((AbortParticipant)p).prepareForAbort (id, context); 497 } 498 } catch (Throwable t) { 499 getLog().warn ("PREPARE-FOR-ABORT: " + id, t); 500 } finally { 501 getParams(p).timers.prepareForAbortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 502 if (metrics != null) 503 metrics.record(getName(p) + "-prepare-for-abort", c.elapsed()); 504 } 505 return ABORTED | NO_JOIN; 506 } 507 protected int prepare 508 (TransactionParticipant p, long id, Serializable context) 509 { 510 Chronometer c = new Chronometer(); 511 try { 512 setThreadName(id, "prepare", p); 513 return p.prepare (id, context); 514 } catch (Throwable t) { 515 getLog().warn ("PREPARE: " + id, t); 516 } finally { 517 getParams(p).timers.prepareTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 518 if (metrics != null) { 519 metrics.record(getName(p) + "-prepare", c.elapsed()); 520 } 521 } 522 return ABORTED; 523 } 524 protected void commit 525 (TransactionParticipant p, long id, Serializable context) 526 { 527 Chronometer c = new Chronometer(); 528 try { 529 setThreadName(id, "commit", p); 530 p.commit(id, context); 531 } catch (Throwable t) { 532 getLog().warn ("COMMIT: " + id, t); 533 } finally { 534 getParams(p).timers.commitTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 535 if (metrics != null) 536 metrics.record(getName(p) + "-commit", c.elapsed()); 537 } 538 } 539 protected void abort 540 (TransactionParticipant p, long id, Serializable context) 541 { 542 Chronometer c = new Chronometer(); 543 try { 544 setThreadName(id, "abort", p); 545 p.abort(id, context); 546 } catch (Throwable t) { 547 getLog().warn ("ABORT: " + id, t); 548 } finally { 549 getParams(p).timers.abortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 550 if (metrics != null) 551 metrics.record(getName(p) + "-abort", c.elapsed()); 552 } 553 } 554 protected int prepare 555 (int session, long id, Serializable context, List<TransactionParticipant> members, Iterator<TransactionParticipant> iter, boolean abort, LogEvent evt, Profiler prof, Chronometer chronometer) 556 { 557 boolean retry = false; 558 for (int i=0; iter.hasNext (); i++) { 559 int action; 560 if (i > MAX_PARTICIPANTS) { 561 getLog().warn ( 562 "loop detected - transaction " +id + " aborted." 563 ); 564 return ABORTED; 565 } 566 TransactionParticipant p = iter.next(); 567 568 ParticipantParams pp = getParams(p); 569 if (!abort && pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime) { 570 abort = true; 571 if (evt != null) 572 evt.addMessage(" forcedAbort: " + getName(p) + " elapsed=" + chronometer.elapsed()); 573 } 574 575 TMEvent jfr; 576 if (abort) { 577 jfr = new TMEvent.PrepareForAbort("%s:%s".formatted(getName(), p.getClass().getName()), id); 578 jfr.begin(); 579 if (hasStatusListeners) 580 notifyStatusListeners ( 581 session, TransactionStatusEvent.State.PREPARING_FOR_ABORT, id, getName(p), context 582 ); 583 584 action = prepareOrAbort (p, id, context, pp, this::prepareForAbort); 585 586 if (evt != null && p instanceof AbortParticipant) { 587 evt.addMessage(Trace.of("prepareForAbort", getName(p))); 588 if (prof != null) 589 prof.checkPoint ("prepareForAbort: " + getName(p)); 590 } 591 } else { 592 if (hasStatusListeners) 593 notifyStatusListeners ( 594 session, TransactionStatusEvent.State.PREPARING, id, getName(p), context 595 ); 596 597 jfr = new TMEvent.Prepare("%s:%s".formatted(getName(), p.getClass().getName()), id); 598 jfr.begin(); 599 600 chronometer.lap(); 601 action = prepareOrAbort (p, id, context, pp, this::prepare); 602 boolean timeout = pp.timeout > 0 && chronometer.partial() > pp.timeout; 603 boolean maxTime = pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime; 604 if (timeout || maxTime) 605 action &= (PREPARED ^ 0xFFFF); 606 607 abort = (action & PREPARED) == ABORTED; 608 retry = (action & RETRY) == RETRY; 609 610 if (evt != null) { 611 evt.addMessage (Trace.of("prepare", getName(p), 612 (abort ? " ABORTED" : " PREPARED") 613 + (timeout ? " TIMEOUT" : "") 614 + (maxTime ? " MAX_TIMEOUT" : "") 615 + (retry ? " RETRY" : "") 616 + ((action & READONLY) == READONLY ? " READONLY" : "") 617 + ((action & NO_JOIN) == NO_JOIN ? " NO_JOIN" : "")) 618 ); 619 if (prof != null) 620 prof.checkPoint ("prepare: " + getName(p)); 621 } 622 } 623 624 if ((action & READONLY) == 0) { 625 Chronometer c = new Chronometer(); 626 snapshot (id, context); 627 getParams(p).timers.snapshotTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); 628 if (metrics != null) 629 metrics.record(getName(p) + "-snapshot", c.elapsed()); 630 } 631 if ((action & NO_JOIN) == 0) { 632 members.add (p); 633 } 634 if (p instanceof GroupSelector && ((action & PREPARED) == PREPARED || callSelectorOnAbort)) { 635 String groupName = null; 636 Chronometer c = new Chronometer(); 637 try { 638 groupName = ((GroupSelector)p).select (id, context); 639 } catch (Exception e) { 640 if (evt != null) 641 evt.addMessage (" selector: " + getName(p) + " " + e.getMessage()); 642 else 643 getLog().error (" selector: " + getName(p) + " " + e.getMessage()); 644 } finally { 645 if (metrics != null) 646 metrics.record(getName(p) + "-selector", c.lap()); 647 } 648 if (evt != null) { 649 evt.addMessage (" selector: '" + groupName +"'"); 650 } 651 if (groupName != null) { 652 StringTokenizer st = new StringTokenizer (groupName, " ,"); 653 List<TransactionParticipant> participants = new ArrayList(); 654 while (st.hasMoreTokens ()) { 655 String grp = st.nextToken(); 656 addGroup (id, grp); 657 if (evt != null && groups.get(grp) == null) { 658 evt.addMessage (" WARNING: group '" + grp + "' not configured"); 659 if (abortOnMisconfiguredGroups) 660 abort = true; 661 } 662 participants.addAll (getParticipants (grp)); 663 } 664 while (iter.hasNext()) 665 participants.add (iter.next()); 666 667 iter = participants.iterator(); 668 } 669 } 670 jfr.commit(); 671 } 672 return abort ? retry ? RETRY : ABORTED : PREPARED; 673 } 674 protected List<TransactionParticipant> getParticipants (String groupName) { 675 List<TransactionParticipant> participants = groups.get (groupName); 676 if (participants == null) { 677 participants = new ArrayList(); 678 } 679 return participants; 680 } 681 protected List<TransactionParticipant> getParticipants (long id) { 682 // Use a local copy of participant to avoid adding the 683 // GROUP participant to the DEFAULT_GROUP 684 List<TransactionParticipant> participantsChain = new ArrayList<>(); 685 List<TransactionParticipant> participants = getParticipants (DEFAULT_GROUP); 686 // Add DEFAULT_GROUP participants 687 participantsChain.addAll(participants); 688 String key = getKey(GROUPS, id); 689 String grp; 690 // now add participants of Group 691 while ( (grp = (String) psp.inp (key)) != null) { 692 participantsChain.addAll (getParticipants (grp)); 693 } 694 return participantsChain; 695 } 696 697 protected void initStatusListeners (Element config) throws ConfigurationException{ 698 final Iterator iter = config.getChildren ("status-listener").iterator(); 699 while (iter.hasNext()) { 700 final Element e = (Element) iter.next(); 701 final QFactory factory = getFactory(); 702 final TransactionStatusListener listener = (TransactionStatusListener) factory.newInstance (QFactory.getAttributeValue (e, "class")); 703 factory.setConfiguration (listener, config); 704 addListener(listener); 705 } 706 } 707 708 protected void initParticipants (Element config) 709 throws ConfigurationException 710 { 711 groups.put (DEFAULT_GROUP, initGroup (config)); 712 for (Element e : config.getChildren("group")) { 713 String name = QFactory.getAttributeValue (e, "name"); 714 if (name == null) 715 throw new ConfigurationException ("missing group name"); 716 if (groups.containsKey(name)) { 717 throw new ConfigurationException ( 718 "Group '" + name + "' already defined" 719 ); 720 } 721 groups.put (name, initGroup (e)); 722 } 723 } 724 protected List<TransactionParticipant> initGroup (Element e) 725 throws ConfigurationException 726 { 727 List<TransactionParticipant> group = new ArrayList<>(); 728 for (Element el : e.getChildren ("participant")) { 729 if (QFactory.isEnabled(el)) { 730 group.add(createParticipant(el)); 731 } else { 732 getLog().warn ("participant ignored (enabled='" + QFactory.getEnabledAttribute(el) + "'): " + el.getAttributeValue("class") + "/" + el.getAttributeValue("realm")); 733 } 734 } 735 return group; 736 } 737 public TransactionParticipant createParticipant (Element e) 738 throws ConfigurationException 739 { 740 QFactory factory = getFactory(); 741 TransactionParticipant participant = factory.newInstance (QFactory.getAttributeValue (e, "class")); 742 factory.setLogger (participant, e); 743 QFactory.invoke (participant, "setTransactionManager", this, TransactionManager.class); 744 factory.setConfiguration (participant, e); 745 String realm = QFactory.getAttributeValue(e, "realm"); 746 747 try { 748 String participantShortName = Caller.shortClassName(participant.getClass().getName()); 749 params.put(participant, new ParticipantParams( 750 participantShortName + (realm != null && !realm.isEmpty() ? ":" + realm : ""), 751 getLong (e, "timeout", 0L), 752 getLong (e, "max-time", globalMaxTime), 753 getSet(e.getChild("requires")), 754 getSet(e.getChild("provides")), 755 getSet(e.getChild("optional")), 756 getOrCreateTimers(participant) 757 ) 758 ); 759 } catch (Exception ex) { 760 throw new ConfigurationException (ex); 761 } 762 if (participant instanceof Destroyable) { 763 destroyables.add((Destroyable) participant); 764 } 765 return participant; 766 } 767 768 @Override 769 public int getOutstandingTransactions() { 770 if (iisp instanceof LocalSpace) 771 return ((LocalSpace) iisp).size(queue); 772 return -1; 773 } 774 protected String getKey (String prefix, long id) { 775 StringBuilder sb = new StringBuilder (getName()); 776 sb.append ('.'); 777 sb.append (prefix); 778 sb.append (id); 779 return sb.toString (); 780 } 781 protected void commitOff (Space sp) { 782 if (sp instanceof JDBMSpace jsp) { 783 jsp.setAutoCommit(false); 784 } 785 } 786 protected void commitOn (Space sp) { 787 if (sp instanceof JDBMSpace jsp) { 788 jsp.commit (); 789 jsp.setAutoCommit(true); 790 } 791 } 792 protected void checkTail () { 793 tailLock.lock(); 794 try { 795 while (tailDone()) { 796 tail.incrementAndGet(); 797 } 798 } finally { 799 tailLock.unlock(); 800 } 801 } 802 protected boolean tailDone () { 803 String stateKey = getKey(STATE, tail.get()); 804 if (DONE.equals (psp.rdp (stateKey))) { 805 purge (tail.get(), true); 806 return true; 807 } 808 return false; 809 } 810 protected void snapshot (long id, Serializable context) { 811 snapshot (id, context, null); 812 } 813 protected void snapshot (long id, Serializable context, Integer status) { 814 if (!doRecover && status != DONE) 815 return; // nothing to do 816 817 var jfr = new TMEvent.Snapshot(getName()+":"+status, id); 818 jfr.begin(); 819 820 String contextKey = getKey (CONTEXT, id); 821 synchronized (psp) { 822 commitOff (psp); 823 SpaceUtil.wipe(psp, contextKey); 824 if (context != null) 825 psp.out (contextKey, context); 826 827 if (status != null) { 828 String stateKey = getKey (STATE, id); 829 psp.put (stateKey, status); 830 } 831 commitOn (psp); 832 } 833 jfr.commit(); 834 } 835 protected void setState (long id, Integer state) { 836 String stateKey = getKey (STATE, id); 837 synchronized (psp) { 838 commitOff (psp); 839 SpaceUtil.wipe(psp, stateKey); 840 if (state!= null) 841 psp.out (stateKey, state); 842 commitOn (psp); 843 } 844 } 845 protected void addGroup (long id, String groupName) { 846 if (groupName != null) 847 psp.out (getKey (GROUPS, id), groupName); 848 } 849 protected void purge (long id, boolean full) { 850 String stateKey = getKey (STATE, id); 851 String contextKey = getKey (CONTEXT, id); 852 String groupsKey = getKey (GROUPS, id); 853 synchronized (psp) { 854 commitOff (psp); 855 if (full) 856 SpaceUtil.wipe(psp, stateKey); 857 SpaceUtil.wipe(psp, contextKey); 858 SpaceUtil.wipe(psp, groupsKey); 859 commitOn (psp); 860 } 861 } 862 863 protected void recover () { 864 if (doRecover) { 865 if (tail.get() < head.get()) { 866 getLog().info ("recover - tail=" +tail.get()+", head="+head.get()); 867 } 868 while (tail.get() < head.get()) { 869 recover (tail.getAndIncrement()); 870 } 871 } 872 } 873 protected void recover (long id) { 874 LogEvent evt = getLog().createLogEvent ("recover"); 875 Profiler prof = new Profiler(); 876 evt.addMessage ("<id>" + id + "</id>"); 877 try { 878 String stateKey = getKey (STATE, id); 879 String contextKey = getKey (CONTEXT, id); 880 Integer state = (Integer) psp.rdp (stateKey); 881 if (state == null) { 882 evt.addMessage ("unknown stateKey " + stateKey); 883 SpaceUtil.wipe (psp, contextKey); // just in case ... 884 return; 885 } 886 Serializable context = (Serializable) psp.rdp (contextKey); 887 if (context != null) 888 evt.addMessage (context); 889 890 if (DONE.equals (state)) { 891 evt.addMessage ("<done/>"); 892 } else if (COMMITTING.equals (state)) { 893 commit (0, id, context, getParticipants (id), true, evt, prof); 894 } else if (PREPARING.equals (state)) { 895 abort (0, id, context, getParticipants (id), true, evt, prof); 896 } 897 purge (id, true); 898 } finally { 899 evt.addMessage (prof); 900 Logger.log (evt); 901 } 902 } 903 protected synchronized void checkRetryTask () { 904 if (retryTask == null) { 905 retryTask = new RetryTask(); 906 Thread.ofVirtual().start(retryTask); 907 } 908 } 909 910 /** 911 * This method gives the opportunity to decorate a LogEvent right before 912 * it gets logged. When overriding it, unless you know what you're doing, 913 * you should return a FrozenLogEvent in order to prevent concurrency issues. 914 * 915 * @param context current Context 916 * @param evt current LogEvent 917 * @param prof profiler (may be null) 918 * @return FrozenLogEvent 919 */ 920 protected LogEvent freeze(Serializable context, LogEvent evt, Profiler prof) { 921 return freezeLog ? new FrozenLogEvent(evt) : evt; 922 } 923 924 public class RetryTask implements Runnable { 925 @Override 926 public void run() { 927 Thread.currentThread().setName (getName()+"-retry-task"); 928 while (running()) { 929 for (Serializable context; (context = (Serializable)psp.rdp (RETRY_QUEUE)) != null;) 930 { 931 iisp.out (queue, context, retryTimeout); 932 psp.inp (RETRY_QUEUE); 933 } 934 ISOUtil.sleep(retryInterval); 935 } 936 } 937 } 938 939 public class InputQueueMonitor implements Runnable { 940 @Override 941 public void run() { 942 Thread.currentThread().setName (getName()+"-input-queue-monitor"); 943 while (running()) { 944 while (getOutstandingTransactions() > getActiveSessions() + threshold && running()) { 945 ISOUtil.sleep(100L); 946 } 947 if (!running()) 948 break; 949 try { 950 Object context = isp.in(queue, 1000L); 951 if (context != null) { 952 if (!running()) { 953 isp.out(queue, context); // place it back 954 break; 955 } 956 iisp.out(queue, context); 957 } 958 } catch (SpaceError e) { 959 getLog().error(e); 960 ISOUtil.sleep(1000L); // relax on error 961 } 962 } 963 } 964 } 965 966 /** 967 * This method returns the number of sessions that can be started at this point in time 968 * @return number of sessions 969 */ 970 protected int getSessionsToStandUp() { 971 int outstandingTransactions = getOutstandingTransactions(); 972 int activeSessions = getActiveSessions(); 973 int count = 0; 974 if (activeSessions < maxSessions && outstandingTransactions > threshold) { 975 count = Math.min(outstandingTransactions, maxSessions - activeSessions); 976 } 977 return Math.min(1000, count); // reasonable value for virtual thread creation within one second 978 } 979 980 /** 981 * This method returns true if current session should stop working on more messages 982 * @return 983 */ 984 protected boolean isSessionToStandDown() { 985 return false; 986 } 987 988 @Override 989 public int getActiveSessions() { 990 return activeSessions.intValue(); 991 } 992 public int getMaxSessions() { 993 return maxSessions; 994 } 995 public static Serializable getSerializable() { 996 return tlContext.get(); 997 } 998 public static <T extends Serializable> T getContext() { 999 return (T) tlContext.get(); 1000 } 1001 public static Long getId() { 1002 return tlId.get(); 1003 } 1004 1005 1006 private void notifyStatusListeners 1007 (int session, TransactionStatusEvent.State state, long id, String info, Serializable context) 1008 { 1009 TransactionStatusEvent e = new TransactionStatusEvent(session, state, id, info, context); 1010 synchronized (statusListeners) { 1011 for (TransactionStatusListener l : statusListeners) { 1012 l.update (e); 1013 } 1014 } 1015 } 1016 private void setThreadName (long id, String method, TransactionParticipant p) { 1017 Thread.currentThread().setName( 1018 String.format("%s:%d %s %s %s", getName(), id, method, p.getClass().getName(), 1019 LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault())) 1020 ); 1021 } 1022 private void setThreadLocal (long id, Serializable context) { 1023 tlId.set(id); 1024 tlContext.set(context); 1025 } 1026 private void removeThreadLocal() { 1027 tlId.remove(); 1028 tlContext.remove(); 1029 } 1030 1031 private String getName(TransactionParticipant p) { 1032 return p.getClass().getName(); 1033 } 1034 1035 private ParticipantParams getParams (TransactionParticipant p) { 1036 return Optional.ofNullable(params.get(p)).orElseGet(() -> 1037 new ParticipantParams(p.getClass().getName(), 0L, 0L, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), 1038 getOrCreateTimers(p)) 1039 ); 1040 } 1041 1042 private String tmInfo() { 1043 return String.format ("in-transit=%d, head=%d, tail=%d, paused=%d, outstanding=%d, active-sessions=%d/%d%s", 1044 getInTransit(), head.get(), tail.get(), pausedSessions.get(), getOutstandingTransactions(), 1045 getActiveSessions(), maxSessions, 1046 (tps != null ? ", " + tps : "") 1047 ); 1048 } 1049 1050 private long getLong (Element e, String attributeName, long defValue) { 1051 String s = QFactory.getAttributeValue (e, attributeName); 1052 if (s != null) { 1053 try { 1054 return Long.parseLong(s); 1055 } catch (NumberFormatException ignored) {} 1056 } 1057 return defValue; 1058 } 1059 1060 private Instant getStart (Serializable context) { 1061 if (context instanceof Context) { 1062 Object o = ((Context) context).get(TIMESTAMP); 1063 if (o instanceof Instant) 1064 return (Instant) o; 1065 } 1066 return Instant.now(); 1067 } 1068 1069 private boolean heavyLoaded() { 1070 return getActiveSessions() >= maxSessions; 1071 } 1072 1073 private int pauseAndWait(Serializable context, int action) { 1074 if (context instanceof Pausable pausable) try { 1075 pausedSessions.incrementAndGet(); 1076 Future<Integer> paused = pausable.pause(); 1077 long timeout = pausable.getTimeout(); 1078 timeout = timeout > 0 ? Math.min (timeout, pauseTimeout) : pauseTimeout; 1079 try { 1080 action = paused.get(timeout, TimeUnit.MILLISECONDS); 1081 } catch (InterruptedException | ExecutionException e) { 1082 if (context instanceof Context ctx) 1083 ctx.log(e); 1084 } catch (TimeoutException e) { 1085 action &= (PREPARED ^ 0xFFFF); // turn off 'PREPARED' - we need to abort 1086 } finally { 1087 pausable.reset(); 1088 } 1089 } finally { 1090 pausedSessions.decrementAndGet(); 1091 } 1092 return action; 1093 } 1094 1095 private record ParticipantParams ( 1096 String name, 1097 long timeout, 1098 long maxTime, 1099 Set<String> requires, 1100 Set<String> provides, 1101 Set<String> optional, 1102 Timers timers 1103 ) 1104 { 1105 public boolean isConstrained() { 1106 return !requires.isEmpty() || !optional.isEmpty(); 1107 } 1108 } 1109 private record Timers ( 1110 io.micrometer.core.instrument.Timer prepareTimer, 1111 io.micrometer.core.instrument.Timer prepareForAbortTimer, 1112 io.micrometer.core.instrument.Timer commitTimer, 1113 io.micrometer.core.instrument.Timer abortTimer, 1114 io.micrometer.core.instrument.Timer snapshotTimer) 1115 { } 1116 public record Trace (String phase, String message, String info) { 1117 @Override 1118 public String toString() { 1119 return "%15s: %s%s".formatted(phase, message, info); 1120 } 1121 public static Trace of (String phase, String message) { 1122 return new Trace (phase, message, ""); 1123 } 1124 public static Trace of (String phase, String message, String info) { 1125 return new Trace (phase, message, info); 1126 } 1127 } 1128 1129 private Set<String> getSet (Element e) { 1130 return e != null ? new HashSet<>(Arrays.asList(ISOUtil.commaDecode(e.getTextTrim()))) : Collections.emptySet(); 1131 } 1132 1133 private int prepareOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriFunction<TransactionParticipant, Long, Serializable, Integer> preparationFunction) { 1134 int action; 1135 1136 if (context instanceof Context ctx && pp.isConstrained()) { 1137 if (!ctx.hasKeys(pp.requires.toArray())) { 1138 ctx.log ("missing.requires: '%s'".formatted(ctx.keysNotPresent(pp.requires.toArray()))); 1139 action = ABORTED; 1140 } else { 1141 Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); 1142 action = preparationFunction.apply(p, id, c); 1143 if (!pp.requires.contains(LOGEVT.toString())) { 1144 // if we are not inheriting parent's log event and there's a log event 1145 // in the childs context, copy it. 1146 LogEvent evt = c.get(LOGEVT.toString()); 1147 if (evt != null) { 1148 LogEvent parentLogEvent = ctx.getLogEvent(); 1149 synchronized (parentLogEvent) { 1150 parentLogEvent.getPayLoad().addAll(evt.getPayLoad()); 1151 } 1152 c.remove(LOGEVT.toString()); 1153 } 1154 } 1155 ctx.merge(c.clone(pp.provides.toArray())); 1156 } 1157 } else { 1158 action = preparationFunction.apply(p, id, context); 1159 } 1160 if ((action & PAUSE) == PAUSE) { 1161 var jfrp = new TMEvent.Pause(getName(), id); 1162 jfrp.begin(); 1163 action = pauseAndWait(context, action); 1164 jfrp.commit(); 1165 } 1166 return action; 1167 } 1168 1169 private void commitOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriConsumer<TransactionParticipant, Long, Serializable> preparationFunction) { 1170 if (context instanceof Context ctx && pp.isConstrained()) { 1171 Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); 1172 preparationFunction.accept(p, id, c); 1173 ctx.merge(c.clone(pp.provides.toArray())); 1174 } else { 1175 preparationFunction.accept(p, id, context); 1176 } 1177 } 1178 1179 private Serializable recover (ContextRecovery p, long id, Serializable context, ParticipantParams pp, boolean commit) { 1180 var jfr = new TMEvent.Recover("%s:%s".formatted(getName(), p.getClass().getName()), id); 1181 jfr.begin(); 1182 try { 1183 if (context instanceof Context ctx && pp.isConstrained()) { 1184 Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); 1185 Serializable s = p.recover (id, c, commit); 1186 return (s instanceof Context rc) ? 1187 rc.clone (pp.provides.toArray()) : s; 1188 } else { 1189 return p.recover (id, context, commit); 1190 } 1191 } finally { 1192 jfr.commit(); 1193 } 1194 } 1195 1196 private Timers getOrCreateTimers(TransactionParticipant p) { 1197 return Optional.ofNullable(params.get(p)).map(ParticipantParams::timers).orElseGet(() -> { 1198 String participantShortName = Caller.shortClassName(p.getClass().getName()); 1199 var mr = getServer().getMeterRegistry(); 1200 var tags = Tags.of("name", getName(), "participant", participantShortName); 1201 String realm = (p instanceof LogSource ls) ? ls.getRealm() : null; 1202 tags = tags.and("realm", (realm != null && !realm.isEmpty()) ? realm.trim() : ""); 1203 1204 return new Timers( 1205 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare"))), 1206 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare-for-abort"))), 1207 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "commit"))), 1208 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "abort"))), 1209 addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "snapshot"))) 1210 ); 1211 }); 1212 } 1213 1214 private Timer addTimer (Timer m) { 1215 meters.add (m); 1216 return m; 1217 } 1218 1219 private UUID getTraceId (long transactionId) { 1220 return new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ transactionId); 1221 } 1222}