001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.transaction;
020
021import io.micrometer.core.instrument.Counter;
022import io.micrometer.core.instrument.Gauge;
023import io.micrometer.core.instrument.Meter;
024import io.micrometer.core.instrument.Tags;
025import io.micrometer.core.instrument.Timer;
026import io.micrometer.core.instrument.binder.BaseUnits;
027import org.HdrHistogram.AtomicHistogram;
028import org.jdom2.Element;
029import org.jpos.core.Configuration;
030import org.jpos.core.ConfigurationException;
031import org.jpos.log.evt.Txn;
032import org.jpos.metrics.MeterInfo;
033import org.jpos.function.TriConsumer;
034import org.jpos.function.TriFunction;
035import org.jpos.jfr.TMEvent;
036import org.jpos.metrics.MeterFactory;
037import org.jpos.q2.QBeanSupport;
038import org.jpos.q2.QFactory;
039import org.jpos.space.*;
040import org.jpos.util.*;
041
042import java.io.PrintStream;
043import java.io.Serializable;
044import java.util.*;
045import java.util.concurrent.*;
046import java.util.concurrent.atomic.AtomicInteger;
047
048import java.time.Instant;
049import java.time.LocalDateTime;
050import java.time.ZoneId;
051import java.util.concurrent.atomic.AtomicLong;
052import java.util.concurrent.locks.Lock;
053import java.util.concurrent.locks.ReentrantLock;
054
055import org.jpos.iso.ISOUtil;
056import org.jpos.util.Metrics;
057
058import static org.jpos.transaction.ContextConstants.LOGEVT;
059import static org.jpos.transaction.ContextConstants.TIMESTAMP;
060
061
062/**
063 * Multi-participant transaction manager.
064 *
065 * <p>Reads contexts from a configured input space, walks each transaction
066 * through its prepare/commit/abort lifecycle, and persists state to a
067 * persistent space so in-flight transactions can be recovered after a
068 * restart.</p>
069 */
070@SuppressWarnings("unchecked")
071public class TransactionManager
072    extends QBeanSupport
073    implements Runnable, TransactionConstants, TransactionManagerMBean, Loggeable, MetricsProvider {
074
075    /** Creates an unconfigured manager; configuration is supplied by Q2 at deploy time. */
076    public TransactionManager() {}
077
078    /** Space-key prefix used to persist a transaction's serialised context. */
079    public static final String  CONTEXT    = "$CONTEXT.";
080    /** Space-key prefix used to persist a transaction's lifecycle state. */
081    public static final String  STATE      = "$STATE.";
082    /** Space-key prefix used to persist a transaction's group execution stack. */
083    public static final String  GROUPS     = "$GROUPS.";
084    /** Space-key under which retried transactions are queued for re-execution. */
085    public static final String  RETRY_QUEUE = "$RETRY_QUEUE";
086    /** State marker indicating a transaction is in the prepare phase. */
087    public static final Integer PREPARING  = 0;
088    /** State marker indicating a transaction is in the commit phase. */
089    public static final Integer COMMITTING = 1;
090    /** State marker indicating a transaction has reached its terminal state. */
091    public static final Integer DONE       = 2;
092    /** Group name used when no explicit group is configured. */
093    public static final String  DEFAULT_GROUP = "";
094    /** Hard ceiling on participants traversed per transaction (loop prevention). */
095    public static final long    MAX_PARTICIPANTS = 1000;  // loop prevention
096    /** Default maximum time, in milliseconds, that a session waits for input. */
097    public static final long    MAX_WAIT = 15000L;
098    /** Configured group-name to participant-list mapping. */
099    protected Map<String,List<TransactionParticipant>> groups;
100    private Set<Destroyable> destroyables = new HashSet<>();
101    private static final ThreadLocal<Serializable> tlContext = new ThreadLocal<>();
102    private static final ThreadLocal<Long> tlId = new ThreadLocal<>();
103    private Metrics metrics;
104    private Map<TransactionParticipant,ParticipantParams> params = new HashMap<>();
105    private long globalMaxTime;
106
107    private Space<String,Object> sp;
108    private Space<String,Object> psp;
109    private Space<String,Object> isp;  // real input space
110    private Space<String,Object> iisp; // internal input space
111    private String queue;
112    private Lock tailLock = new ReentrantLock();
113    private final List<TransactionStatusListener> statusListeners = new ArrayList<>();
114    private boolean hasStatusListeners;
115    private boolean doRecover;
116    private boolean callSelectorOnAbort;
117    private boolean abortOnMisconfiguredGroups;
118    private int sessions;
119    private int maxSessions;
120    private int threshold;
121    private int maxActiveTransactions;
122    private final AtomicInteger activeSessions = new AtomicInteger();
123    private final AtomicInteger pausedSessions = new AtomicInteger();
124
125    private final AtomicLong head = new AtomicLong();
126    private final AtomicLong tail = new AtomicLong();
127
128    private long retryInterval = 5000L;
129    private long retryTimeout  = 60000L;
130    private long pauseTimeout  = 60000L;
131    private boolean abortOnPauseTimeout = true;
132    private Runnable retryTask = null;
133    private TPS tps;
134    private ExecutorService executor;
135    private final List<Meter> meters = new ArrayList<>();
136
137    private Gauge activeSessionsGauge;
138    private Counter transactionCounter;
139    private boolean freezeLog;
140    private UUID uuid = UUID.randomUUID();
141
142    @Override
143    protected String defaultRealm() {
144        return Realm.TXN;
145    }
146
147    @Override
148    public void initService () throws ConfigurationException {
149        queue = cfg.get ("queue", null);
150        if (queue == null)
151            throw new ConfigurationException ("queue property not specified");
152        sp  = SpaceFactory.getSpace (cfg.get ("space"));
153        isp = iisp = SpaceFactory.getSpace (cfg.get ("input-space", cfg.get ("space")));
154        psp  = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString()));
155        doRecover = cfg.getBoolean ("recover", psp instanceof PersistentSpace);
156        tail.set(cfg.getLong ("initial-tail", 1));
157        head.set(tail.get());
158        groups = new HashMap<>();
159        initParticipants (getPersist());
160        initStatusListeners (getPersist());
161        executor = QFactory.executorService(cfg.getBoolean("virtual-threads", true));
162    }
163
164    @Override
165    public void startService () throws Exception {
166        recover();
167        if (tps != null)
168            tps.stop();
169        tps = new TPS (cfg.getBoolean ("auto-update-tps", true));
170        Thread.ofPlatform().start(this);
171        if (psp.rdp (RETRY_QUEUE) != null)
172            checkRetryTask();
173
174        if (iisp != isp) {
175            Thread.ofPlatform().unstarted(
176              new InputQueueMonitor()
177            ).start();
178        }
179        NameRegistrar.register(getName(), this);
180    }
181
182    @Override
183    public void stopService () {
184        NameRegistrar.unregister(getName());
185        if (iisp != isp)
186            for (Object o=iisp.inp(queue); o != null; o=iisp.inp(queue))
187                isp.out(queue, o); // push back to replicated space
188
189        meters.forEach(getServer().getMeterRegistry()::remove);
190        tps.stop();
191        for (Destroyable destroyable : destroyables) {
192            try {
193                destroyable.destroy();
194            } catch (Throwable t) {
195                getLog().warn (t);
196            }
197        }
198    }
199    /**
200     * Enqueues a context at the tail of the input queue (FIFO ordering).
201     *
202     * @param context serialisable transaction context
203     */
204    public void queue (Serializable context) {
205        iisp.out(queue, context);
206    }
207    /**
208     * Pushes a context onto the head of the input queue (LIFO ordering).
209     *
210     * @param context serialisable transaction context
211     */
212    public void push (Serializable context) {
213        iisp.push(queue, context);
214    }
215    /**
216     * Returns the configured input queue name.
217     *
218     * @return the name of the input queue this manager consumes from
219     */
220    @SuppressWarnings("unused")
221    public String getQueueName() {
222        return queue;
223    }
224    /**
225     * Returns the volatile space used for in-flight transaction state.
226     *
227     * @return the working {@link Space}
228     */
229    public Space getSpace() {
230        return sp;
231    }
232    /**
233     * Returns the externally visible input space used for queueing contexts.
234     *
235     * @return the input {@link Space}
236     */
237    public Space getInputSpace() {
238        return isp;
239    }
240    /**
241     * Returns the persistent space used to recover in-flight transactions
242     * across restarts.
243     *
244     * @return the persistent {@link Space}
245     */
246    public Space getPersistentSpace() {
247        return psp;
248    }
249
250    @Override
251    public void run () {
252        while (running()) {
253            if (heavyLoaded()) {
254                ISOUtil.sleep (100L);
255                getLog().info ("HeavyLoaded - active sessions: " + getActiveSessions());
256                continue;
257            }
258            Object obj = iisp.in (queue, MAX_WAIT);
259            if (obj instanceof Serializable context) {
260                if (getActiveSessions() <= maxSessions) {
261                    if (context instanceof Context ctx)
262                        ctx.log ("active=%d, maxSessions=%d".formatted(getActiveSessions(), maxSessions));
263                    int session = activeSessions.incrementAndGet();
264                    transactionCounter.increment();
265                    executor.execute(() -> {
266                        try {
267                            runTransaction(context, session);
268                        } finally {
269                            activeSessions.decrementAndGet();
270                        }
271                    });
272                }
273                else {
274                    iisp.push(queue, context);  // push it back
275                    ISOUtil.sleep(100L);
276                }
277            }
278        }
279    }
280
281    private void runTransaction (Serializable context, int session) {
282        long id = 0;
283        List<TransactionParticipant> members;
284        Iterator<TransactionParticipant> iter;
285        boolean abort;
286        LogEvent evt;
287        Profiler prof;
288        Thread thread = Thread.currentThread();
289
290        prof = null;
291        evt = null;
292        thread.setName (getName() + "-" + session + ":idle");
293        int action = -1;
294        id = head.getAndIncrement ();
295        TMEvent tme = new TMEvent(getName(), id);
296        Txn txn = new Txn(getName(), id);
297
298        tme.begin();
299        try {
300            setThreadLocal(id, context);
301            if (hasStatusListeners)
302                notifyStatusListeners (session, TransactionStatusEvent.State.READY, id, "", null);
303
304            Chronometer chronometer = new Chronometer(getStart(context));
305
306            abort = false;
307            members = new ArrayList<> ();
308            iter = getParticipants (DEFAULT_GROUP).iterator();
309            evt = new LogEvent()
310              .withSource(log)
311              .withTraceId(getTraceId(id));
312            evt.addMessage(txn);
313            evt.addMessage(context);
314            prof = new Profiler();
315            snapshot (id, context, PREPARING);
316            action = prepare (session, id, context, members, iter, abort, evt, prof, chronometer);
317            switch (action) {
318                case PREPARED:
319                    if (members.size() > 0) {
320                        setState(id, COMMITTING);
321                        commit(session, id, context, members, false, evt, prof);
322                    }
323                    break;
324                case ABORTED:
325                    if (members.size() > 0) {
326                        abort(session, id, context, members, false, evt, prof);
327                    }
328                    break;
329                case RETRY:
330                    psp.out (RETRY_QUEUE, context);
331                    checkRetryTask();
332                    break;
333                case NO_JOIN:
334                    break;
335            }
336            snapshot (id, null, DONE);
337            if (id == tail.get()) {
338                checkTail ();
339            } else {
340                purge (id, false);
341            }
342            tps.tick();
343        } catch (Throwable t) {
344            if (evt == null)
345                getLog().fatal (t); // should never happen
346            else
347                evt.addMessage (t);
348        } finally {
349            removeThreadLocal();
350            if (hasStatusListeners) {
351                notifyStatusListeners (
352                  session,
353                  TransactionStatusEvent.State.DONE,
354                  id, "", context);
355            }
356            if (evt != null && (action == PREPARED || action == ABORTED || (action == -1 && prof != null))) {
357                switch (action) {
358                    case PREPARED :
359                        evt.setTag("commit");
360                        break;
361                    case ABORTED :
362                        evt.setTag ("abort");
363                        break;
364                    case -1:
365                        evt.setTag ("undefined");
366                        break;
367                }
368                if (getInTransit() > Math.max(maxActiveTransactions, activeSessions.get()) * 100L) {
369                    evt.addMessage("WARNING: IN-TRANSIT TOO HIGH");
370                }
371                evt.addMessage (
372                  String.format (" %s, elapsed=%dms",
373                    tmInfo(),
374                    prof.getElapsedInMillis()
375                  )
376                );
377                evt.addMessage (prof);
378                try {
379                    Logger.log(freeze(context, evt, prof));
380                } catch (Throwable t) {
381                    getLog().error(t);
382                }
383            }
384            tme.commit();
385        }
386    }
387
388    @Override
389    public long getTail () {
390        return tail.get();
391    }
392
393    @Override
394    public long getHead () {
395        return head.get();
396    }
397
398    /**
399     * Returns the count of transactions that have been picked up but not yet completed.
400     *
401     * @return number of in-flight transactions
402     */
403    public long getInTransit () {
404        return head.get() - tail.get();
405    }
406
407    @Override
408    public void setConfiguration (Configuration cfg) throws ConfigurationException {
409        super.setConfiguration (cfg);
410        retryInterval = cfg.getLong ("retry-interval", retryInterval);
411        retryTimeout  = cfg.getLong ("retry-timeout", retryTimeout);
412        pauseTimeout  = cfg.getLong ("pause-timeout", pauseTimeout);
413        abortOnPauseTimeout = cfg.getBoolean("abort-on-pause-timeout", true);
414        maxActiveTransactions = cfg.getInt  ("max-active-sessions", 0);
415        sessions = cfg.getInt ("sessions", 1);
416        threshold = cfg.getInt ("threshold", sessions / 2);
417        maxSessions = cfg.getInt ("max-sessions", sessions);
418        globalMaxTime = cfg.getLong("max-time", 0L);
419        if (maxSessions < sessions)
420            throw new ConfigurationException("max-sessions < sessions");
421        if (maxActiveTransactions > 0) {
422            if (maxActiveTransactions < sessions)
423                throw new ConfigurationException("max-active-sessions < sessions");
424            if (maxActiveTransactions < maxSessions)
425                throw new ConfigurationException("max-active-sessions < max-sessions");
426        }
427        callSelectorOnAbort = cfg.getBoolean("call-selector-on-abort", true);
428        metrics = new Metrics(new AtomicHistogram(cfg.getLong("metrics-highest-trackable-value", 60000), 2));
429        abortOnMisconfiguredGroups = cfg.getBoolean("abort-on-misconfigured-groups");
430
431        try {
432            activeSessionsGauge = MeterFactory.gauge
433              (getServer().getMeterRegistry(), MeterInfo.TM_ACTIVE, Tags.of("name", getName()), BaseUnits.SESSIONS, activeSessions::get
434            );
435            transactionCounter = MeterFactory.counter
436              (getServer().getMeterRegistry(), MeterInfo.TM_COUNTER, Tags.of("name", getName())
437            );
438            meters.add(activeSessionsGauge);
439            meters.add(transactionCounter);
440        } catch (Exception e) {
441            throw new ConfigurationException (e);
442        }
443        freezeLog = cfg.getBoolean("freeze-log", true);
444    }
445    /**
446     * Registers a listener that observes transaction lifecycle transitions.
447     *
448     * @param l listener to add
449     */
450    public void addListener (TransactionStatusListener l) {
451        synchronized (statusListeners) {
452            statusListeners.add (l);
453            hasStatusListeners = true;
454        }
455    }
456    /**
457     * Removes a previously registered status listener.
458     *
459     * @param l listener to remove
460     */
461    public void removeListener (TransactionStatusListener l) {
462        synchronized (statusListeners) {
463            statusListeners.remove(l);
464            hasStatusListeners = !statusListeners.isEmpty();
465        }
466    }
467    /**
468     * Returns the TPS counter that tracks transactions per second for this manager.
469     *
470     * @return the {@link TPS} counter
471     */
472    public TPS getTPS() {
473        return tps;
474    }
475
476    @Override
477    public String getTPSAsString() {
478        return tps.toString();
479    }
480
481    @Override
482    public float getTPSAvg() {
483        return tps.getAvg();
484    }
485
486    @Override
487    public int getTPSPeak() {
488        return tps.getPeak();
489    }
490
491    @Override
492    public Date getTPSPeakWhen() {
493        return new Date(tps.getPeakWhen());
494    }
495
496    @Override
497    public long getTPSElapsed() {
498        return tps.getElapsed();
499    }
500
501    @Override
502    public void resetTPS() {
503        tps.reset();
504    }
505
506    @Override
507    public Metrics getMetrics() {
508        return metrics;
509    }
510
511    @Override
512    public void dump (PrintStream ps, String indent) {
513        ps.printf ("%s%s%n", indent, tmInfo());
514        if (metrics != null) {
515            metrics.dump(ps, indent);
516        }
517    }
518
519    /**
520     * Walks {@code members} in order, invoking each participant's commit phase
521     * and recording trace and timer information.
522     *
523     * @param session  session index that owns this transaction
524     * @param id       transaction identifier
525     * @param context  serialised transaction context
526     * @param members  participants whose commit hooks will run
527     * @param recover  when {@code true}, the manager is replaying a transaction
528     *                 after restart and should call {@link ContextRecovery}
529     * @param evt      optional log event to receive trace messages, or {@code null}
530     * @param prof     optional profiler to receive checkpoints, or {@code null}
531     */
532    protected void commit
533        (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof)
534    {
535        for (TransactionParticipant p :members) {
536            var jfr = new TMEvent.Commit("%s:%s".formatted(getName(), p.getClass().getName()), id);
537            jfr.begin();
538            ParticipantParams pp = getParams(p);
539            if (recover && p instanceof ContextRecovery cr) {
540                context = recover (cr, id, context, pp, true);
541                if (evt != null)
542                    evt.addMessage (Trace.of("commit-recover", getName(p)));
543            }
544            if (hasStatusListeners)
545                notifyStatusListeners (
546                    session, TransactionStatusEvent.State.COMMITING, id, getName(p), context
547                );
548            commitOrAbort (p, id, context, pp, this::commit);
549            if (evt != null) {
550                evt.addMessage (Trace.of("commit", getName(p)));
551                if (prof != null)
552                    prof.checkPoint (" commit: " + getName(p));
553            }
554            jfr.commit();
555        }
556    }
557    /**
558     * Walks {@code members} in order, invoking each participant's abort phase
559     * and recording trace and timer information.
560     *
561     * @param session  session index that owns this transaction
562     * @param id       transaction identifier
563     * @param context  serialised transaction context
564     * @param members  participants whose abort hooks will run
565     * @param recover  when {@code true}, the manager is replaying a transaction
566     *                 after restart and should call {@link ContextRecovery}
567     * @param evt      optional log event to receive trace messages, or {@code null}
568     * @param prof     optional profiler to receive checkpoints, or {@code null}
569     */
570    protected void abort
571        (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof)
572    {
573        for (TransactionParticipant p :members) {
574            ParticipantParams pp = getParams(p);
575            if (recover && p instanceof ContextRecovery cr) {
576                context = recover (cr, id, context, pp, true);
577                if (evt != null)
578                    evt.addMessage (Trace.of("abort-recover", getName(p)));
579            }
580            if (hasStatusListeners)
581                notifyStatusListeners (
582                    session, TransactionStatusEvent.State.ABORTING, id, getName(p), context
583                );
584
585            commitOrAbort (p, id, context, pp, this::abort);
586            if (evt != null) {
587                evt.addMessage (Trace.of("abort", getName(p)));
588                if (prof != null)
589                    prof.checkPoint ("  abort: " + getName(p));
590            }
591        }
592    }
593    /**
594     * Invokes {@link AbortParticipant#prepareForAbort(long, Serializable)} on
595     * {@code p} when applicable, swallowing exceptions and recording timer
596     * metrics.
597     *
598     * @param p       participant
599     * @param id      transaction identifier
600     * @param context serialised transaction context
601     * @return result code from the participant, or {@code ABORTED | NO_JOIN}
602     *         when {@code p} is not an {@link AbortParticipant} or threw
603     */
604    protected int prepareForAbort
605        (TransactionParticipant p, long id, Serializable context)
606    {
607        Chronometer c = new Chronometer();
608        try {
609            if (p instanceof AbortParticipant) {
610                setThreadName(id, "prepareForAbort", p);
611                return ((AbortParticipant)p).prepareForAbort (id, context);
612            }
613        } catch (Throwable t) {
614            logParticipantWarning("PREPARE-FOR-ABORT: " + id, p, t);
615        } finally {
616            getParams(p).timers.prepareForAbortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
617            if (metrics != null)
618                metrics.record(getName(p) + "-prepare-for-abort", c.elapsed());
619        }
620        return ABORTED | NO_JOIN;
621    }
622    /**
623     * Invokes the prepare phase on {@code p}, swallowing exceptions (logged as
624     * warnings) and recording timer metrics.
625     *
626     * @param p       participant
627     * @param id      transaction identifier
628     * @param context serialised transaction context
629     * @return result code from the participant, or {@code ABORTED} when {@code p} threw
630     */
631    protected int prepare
632        (TransactionParticipant p, long id, Serializable context)
633    {
634        Chronometer c = new Chronometer();
635        try {
636            setThreadName(id, "prepare", p);
637            return p.prepare (id, context);
638        } catch (Throwable t) {
639            logParticipantWarning("PREPARE: " + id, p, t);
640        } finally {
641            getParams(p).timers.prepareTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
642            if (metrics != null) {
643                metrics.record(getName(p) + "-prepare", c.elapsed());
644            }
645        }
646        return ABORTED;
647    }
648    /**
649     * Invokes the commit phase on {@code p}, swallowing exceptions (logged as
650     * warnings) and recording timer metrics.
651     *
652     * @param p       participant
653     * @param id      transaction identifier
654     * @param context serialised transaction context
655     */
656    protected void commit
657        (TransactionParticipant p, long id, Serializable context)
658    {
659        Chronometer c = new Chronometer();
660        try {
661            setThreadName(id, "commit", p);
662            p.commit(id, context);
663        } catch (Throwable t) {
664            logParticipantWarning("COMMIT: " + id, p, t);
665        } finally {
666            getParams(p).timers.commitTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
667            if (metrics != null)
668                metrics.record(getName(p) + "-commit", c.elapsed());
669        }
670    }
671    /**
672     * Invokes the abort phase on {@code p}, swallowing exceptions (logged as
673     * warnings) and recording timer metrics.
674     *
675     * @param p       participant
676     * @param id      transaction identifier
677     * @param context serialised transaction context
678     */
679    protected void abort
680        (TransactionParticipant p, long id, Serializable context)
681    {
682        Chronometer c = new Chronometer();
683        try {
684            setThreadName(id, "abort", p);
685            p.abort(id, context);
686        } catch (Throwable t) {
687            logParticipantWarning("ABORT: " + id, p, t);
688        } finally {
689            getParams(p).timers.abortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
690            if (metrics != null)
691                metrics.record(getName(p) + "-abort", c.elapsed());
692        }
693    }
694    /**
695     * Drives the prepare phase across {@code iter}, accumulating results into
696     * {@code members} and aborting on retry signals or participant exceptions.
697     *
698     * @param session     session index that owns this transaction
699     * @param id          transaction identifier
700     * @param context     serialised transaction context
701     * @param members     accumulator of participants whose prepare ran
702     * @param iter        participant iterator (may include selectors / groups)
703     * @param abort       when {@code true}, drive the prepare-for-abort path
704     * @param evt         optional log event to receive trace messages, or {@code null}
705     * @param prof        optional profiler to receive checkpoints, or {@code null}
706     * @param chronometer chronometer used to enforce {@code max-time}
707     * @return the bitwise OR of every participant result, suitable for the
708     *         lifecycle dispatcher
709     */
710    protected int prepare
711        (int session, long id, Serializable context, List<TransactionParticipant> members, Iterator<TransactionParticipant> iter, boolean abort, LogEvent evt, Profiler prof, Chronometer chronometer)
712    {
713        boolean retry = false;
714        for (int i=0; iter.hasNext (); i++) {
715            int action;
716            if (i > MAX_PARTICIPANTS) {
717                getLog().warn (
718                    "loop detected - transaction " +id + " aborted."
719                );
720                return ABORTED;
721            }
722            TransactionParticipant p = iter.next();
723
724            ParticipantParams pp = getParams(p);
725            if (!abort && pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime) {
726                abort = true;
727                if (evt != null)
728                    evt.addMessage("    forcedAbort: " + getName(p) + " elapsed=" + chronometer.elapsed());
729            }
730
731            TMEvent jfr;
732            if (abort) {
733                jfr = new TMEvent.PrepareForAbort("%s:%s".formatted(getName(), p.getClass().getName()), id);
734                jfr.begin();
735                if (hasStatusListeners)
736                    notifyStatusListeners (
737                        session, TransactionStatusEvent.State.PREPARING_FOR_ABORT, id, getName(p), context
738                    );
739
740                action = prepareOrAbort (p, id, context, pp, this::prepareForAbort);
741
742                if (evt != null && p instanceof AbortParticipant) {
743                    evt.addMessage(Trace.of("prepareForAbort", getName(p)));
744                    if (prof != null)
745                        prof.checkPoint ("prepareForAbort: " + getName(p));
746                }
747            } else {
748                if (hasStatusListeners)
749                    notifyStatusListeners (
750                        session, TransactionStatusEvent.State.PREPARING, id, getName(p), context
751                    );
752
753                jfr = new TMEvent.Prepare("%s:%s".formatted(getName(), p.getClass().getName()), id);
754                jfr.begin();
755
756                chronometer.lap();
757                action = prepareOrAbort (p, id, context, pp, this::prepare);
758                boolean timeout = pp.timeout > 0 && chronometer.partial() > pp.timeout;
759                boolean maxTime = pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime;
760                if (timeout || maxTime)
761                    action &= (PREPARED ^ 0xFFFF);
762
763                abort  = (action & PREPARED) == ABORTED;
764                retry  = (action & RETRY) == RETRY;
765
766                if (evt != null) {
767                    evt.addMessage (Trace.of("prepare", getName(p),
768                            (abort ? " ABORTED" : " PREPARED")
769                            + (timeout ? " TIMEOUT" : "")
770                            + (maxTime ? " MAX_TIMEOUT" : "")
771                            + (retry ? " RETRY" : "")
772                            + ((action & READONLY) == READONLY ? " READONLY" : "")
773                            + ((action & NO_JOIN) == NO_JOIN ? " NO_JOIN" : ""))
774                    );
775                    if (prof != null)
776                        prof.checkPoint ("prepare: " + getName(p));
777                }
778            }
779
780            if ((action & READONLY) == 0) {
781                Chronometer c = new Chronometer();
782                snapshot (id, context);
783                getParams(p).timers.snapshotTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
784                if (metrics != null)
785                    metrics.record(getName(p) + "-snapshot", c.elapsed());
786            }
787            if ((action & NO_JOIN) == 0) {
788                members.add (p);
789            }
790            if (p instanceof GroupSelector && ((action & PREPARED) == PREPARED || callSelectorOnAbort)) {
791                String groupName = null;
792                Chronometer c = new Chronometer();
793                try {
794                    groupName = ((GroupSelector)p).select (id, context);
795                } catch (Exception e) {
796                    if (evt != null) 
797                        evt.addMessage ("       selector: " + getName(p) + " " + e.getMessage());
798                    else 
799                        getLog().error ("       selector: " + getName(p) + " " + e.getMessage());
800                } finally {
801                    if (metrics != null)
802                        metrics.record(getName(p) + "-selector", c.lap());
803                }
804                if (evt != null) {
805                    evt.addMessage ("       selector: '" + groupName +"'");
806                }
807                if (groupName != null) {
808                    StringTokenizer st = new StringTokenizer (groupName, " ,");
809                    List<TransactionParticipant> participants = new ArrayList();
810                    while (st.hasMoreTokens ()) {
811                        String grp = st.nextToken();
812                        addGroup (id, grp);
813                        if (evt != null && groups.get(grp) == null) {
814                            evt.addMessage ("                 WARNING: group '" + grp + "' not configured");
815                            if (abortOnMisconfiguredGroups)
816                                abort = true;
817                        }
818                        participants.addAll (getParticipants (grp));
819                    }
820                    while (iter.hasNext())
821                        participants.add (iter.next());
822
823                    iter = participants.iterator();
824                }
825            }
826            jfr.commit();
827        }
828        return abort ? retry ? RETRY : ABORTED : PREPARED;
829    }
830    /**
831     * Returns the configured participants for a named group.
832     *
833     * @param groupName group name (use {@link #DEFAULT_GROUP} for the default chain)
834     * @return participants in declaration order; an empty list when {@code groupName}
835     *         is unknown
836     */
837    protected List<TransactionParticipant> getParticipants (String groupName) {
838        List<TransactionParticipant> participants = groups.get (groupName);
839        if (participants == null) {
840            participants = new ArrayList();
841        }
842        return participants;
843    }
844    /**
845     * Returns the participant chain for a specific transaction, combining the
846     * default group with any groups pushed onto the persistent execution stack.
847     *
848     * @param id transaction identifier
849     * @return participants to invoke in order
850     */
851    protected List<TransactionParticipant> getParticipants (long id) {
852        // Use a local copy of participant to avoid adding the 
853        // GROUP participant to the DEFAULT_GROUP
854        List<TransactionParticipant> participantsChain = new ArrayList<>();
855        List<TransactionParticipant> participants = getParticipants (DEFAULT_GROUP);
856        // Add DEFAULT_GROUP participants 
857        participantsChain.addAll(participants);
858        String key = getKey(GROUPS, id);
859        String grp;
860        // now add participants of Group 
861        while ( (grp = (String) psp.inp (key)) != null) {
862            participantsChain.addAll (getParticipants (grp));
863        }
864        return participantsChain;
865    }
866
867    /**
868     * Instantiates and registers any {@code <status-listener>} children of the
869     * QBean descriptor.
870     *
871     * @param config descriptor element
872     * @throws ConfigurationException if a listener cannot be instantiated or configured
873     */
874    protected void initStatusListeners (Element config)  throws ConfigurationException{
875        final Iterator iter = config.getChildren ("status-listener").iterator();
876        while (iter.hasNext()) {
877            final Element e = (Element) iter.next();
878            final QFactory factory = getFactory();
879            final TransactionStatusListener listener = (TransactionStatusListener) factory.newInstance (QFactory.getAttributeValue (e, "class"));
880            factory.setConfiguration (listener, config);
881            addListener(listener);
882        }
883    }
884
885    /**
886     * Builds the default participant group plus any named {@code <group>}
887     * children from the QBean descriptor.
888     *
889     * @param config descriptor element
890     * @throws ConfigurationException if a group is missing a name, duplicates an
891     *                                existing group, or a participant fails to
892     *                                instantiate
893     */
894    protected void initParticipants (Element config)
895        throws ConfigurationException
896    {
897        groups.put (DEFAULT_GROUP,  initGroup (config));
898        for (Element e : config.getChildren("group")) {
899            String name = QFactory.getAttributeValue (e, "name");
900            if (name == null) 
901                throw new ConfigurationException ("missing group name");
902            if (groups.containsKey(name)) {
903                throw new ConfigurationException (
904                    "Group '" + name + "' already defined"
905                );
906            }
907            groups.put (name, initGroup (e));
908        }
909    }
910    /**
911     * Instantiates the participants declared inside a {@code <group>} element,
912     * skipping any that are explicitly disabled.
913     *
914     * @param e group element
915     * @return participants in declaration order
916     * @throws ConfigurationException if a participant fails to instantiate
917     */
918    protected List<TransactionParticipant> initGroup (Element e)
919        throws ConfigurationException
920    {
921        List<TransactionParticipant> group = new ArrayList<>();
922        for (Element el : e.getChildren ("participant")) {
923            if (QFactory.isEnabled(el)) {
924                group.add(createParticipant(el));
925            } else {
926                getLog().warn ("participant ignored (enabled='" + QFactory.getEnabledAttribute(el) + "'): " + el.getAttributeValue("class") + "/" + el.getAttributeValue("realm"));
927            }
928        }
929        return group;
930    }
931    /**
932     * Instantiates a participant from a {@code <participant>} element,
933     * configures it, registers per-participant timers, and tracks any
934     * {@link Destroyable} for cleanup at shutdown.
935     *
936     * @param e participant element
937     * @return the configured participant
938     * @throws ConfigurationException if instantiation, configuration, or
939     *                                timer registration fails
940     */
941    public TransactionParticipant createParticipant (Element e)
942        throws ConfigurationException
943    {
944        QFactory factory = getFactory();
945        TransactionParticipant participant = factory.newInstance (QFactory.getAttributeValue (e, "class"));
946        factory.setLogger (participant, e);
947        QFactory.invoke (participant, "setTransactionManager", this, TransactionManager.class);
948        factory.setConfiguration (participant, e);
949        String realm = QFactory.getAttributeValue(e, "realm");
950
951        try {
952            String participantShortName = participantName(participant, realm);
953            params.put(participant, new ParticipantParams(
954                participantShortName,
955                getLong (e, "timeout", 0L),
956                getLong (e, "max-time", globalMaxTime),
957                getSet(e.getChild("requires")),
958                getSet(e.getChild("provides")),
959                getSet(e.getChild("optional")),
960                getOrCreateTimers(participant)
961              )
962            );
963        } catch (Exception ex) {
964            throw new ConfigurationException (ex);
965        }
966        if (participant instanceof Destroyable) {
967            destroyables.add((Destroyable) participant);
968        }
969        return participant;
970    }
971
972    @Override
973    public int getOutstandingTransactions() {
974        if (iisp instanceof LocalSpace)
975            return ((LocalSpace) iisp).size(queue);
976        return -1;
977    }
978    /**
979     * Builds a persistent-space key by combining the manager name, a prefix,
980     * and the transaction id.
981     *
982     * @param prefix key namespace (e.g. {@link #CONTEXT}, {@link #STATE})
983     * @param id     transaction identifier
984     * @return the assembled space key
985     */
986    protected String getKey (String prefix, long id) {
987        StringBuilder sb = new StringBuilder (getName());
988        sb.append ('.');
989        sb.append (prefix);
990        sb.append (id);
991        return sb.toString ();
992    }
993    /**
994     * Disables auto-commit on a {@link JDBMSpace}; no-op for other space types.
995     *
996     * @param sp the space whose auto-commit should be turned off
997     */
998    protected void commitOff (Space sp) {
999        if (sp instanceof JDBMSpace jsp) {
1000            jsp.setAutoCommit(false);
1001        }
1002    }
1003    /**
1004     * Re-enables auto-commit on a {@link JDBMSpace}, flushing any pending
1005     * writes; no-op for other space types.
1006     *
1007     * @param sp the space whose auto-commit should be turned back on
1008     */
1009    protected void commitOn (Space sp) {
1010        if (sp instanceof JDBMSpace jsp) {
1011            jsp.commit ();
1012            jsp.setAutoCommit(true);
1013        }
1014    }
1015    /**
1016     * Advances the persistent {@code tail} pointer past any contiguous
1017     * already-DONE transactions, purging their state from the space.
1018     */
1019    protected void checkTail () {
1020        tailLock.lock();
1021        try {
1022            while (tailDone()) {
1023                tail.incrementAndGet();
1024            }
1025        } finally {
1026            tailLock.unlock();
1027        }
1028    }
1029    /**
1030     * Returns whether the transaction at {@code tail} is already DONE; when
1031     * {@code true}, removes its persistent state.
1032     *
1033     * @return {@code true} when the tail transaction has reached terminal state
1034     */
1035    protected boolean tailDone () {
1036        String stateKey = getKey(STATE, tail.get());
1037        if (DONE.equals (psp.rdp (stateKey))) {
1038            purge (tail.get(), true);
1039            return true;
1040        }
1041        return false;
1042    }
1043    /**
1044     * Snapshots the context for transaction {@code id} without changing its state.
1045     *
1046     * @param id      transaction identifier
1047     * @param context serialised transaction context, or {@code null} to clear it
1048     */
1049    protected void snapshot (long id, Serializable context) {
1050        snapshot (id, context, null);
1051    }
1052    /**
1053     * Snapshots the context and optionally updates the persisted state for
1054     * transaction {@code id}. Skipped when recovery is disabled and the
1055     * status is not {@link #DONE}.
1056     *
1057     * @param id      transaction identifier
1058     * @param context serialised transaction context, or {@code null} to clear it
1059     * @param status  new persisted state, or {@code null} to leave it unchanged
1060     */
1061    protected void snapshot (long id, Serializable context, Integer status) {
1062        if (!doRecover && status != DONE)
1063            return; // nothing to do
1064
1065        var jfr = new TMEvent.Snapshot(getName()+":"+status, id);
1066        jfr.begin();
1067
1068        String contextKey = getKey (CONTEXT, id);
1069        synchronized (psp) {
1070            commitOff (psp);
1071            SpaceUtil.wipe(psp, contextKey);
1072            if (context != null)
1073                psp.out (contextKey, context);
1074
1075            if (status != null) {
1076                String stateKey  = getKey (STATE, id);
1077                psp.put (stateKey, status);
1078            }
1079            commitOn (psp);
1080        }
1081        jfr.commit();
1082    }
1083    /**
1084     * Atomically replaces the persisted lifecycle state for {@code id}.
1085     *
1086     * @param id    transaction identifier
1087     * @param state new state, or {@code null} to clear it
1088     */
1089    protected void setState (long id, Integer state) {
1090        String stateKey  = getKey (STATE, id);
1091        synchronized (psp) {
1092            commitOff (psp);
1093            SpaceUtil.wipe(psp, stateKey);
1094            if (state!= null)
1095                psp.out (stateKey, state);
1096            commitOn (psp);
1097        }
1098    }
1099    /**
1100     * Pushes a group name onto the persistent execution stack for {@code id}.
1101     *
1102     * @param id        transaction identifier
1103     * @param groupName group to schedule, or {@code null} for a no-op
1104     */
1105    protected void addGroup (long id, String groupName) {
1106        if (groupName != null)
1107            psp.out (getKey (GROUPS, id), groupName);
1108    }
1109    /**
1110     * Removes persistent state associated with a transaction.
1111     *
1112     * @param id   transaction identifier
1113     * @param full when {@code true}, also removes the lifecycle state entry
1114     *             (otherwise only context and group entries are cleared)
1115     */
1116    protected void purge (long id, boolean full) {
1117        String stateKey   = getKey (STATE, id);
1118        String contextKey = getKey (CONTEXT, id);
1119        String groupsKey  = getKey (GROUPS, id);
1120        synchronized (psp) {
1121            commitOff (psp);
1122            if (full)
1123                SpaceUtil.wipe(psp, stateKey);
1124            SpaceUtil.wipe(psp, contextKey);
1125            SpaceUtil.wipe(psp, groupsKey);
1126            commitOn (psp);
1127        }
1128    }
1129
1130    /**
1131     * Replays every in-flight transaction recorded in the persistent space,
1132     * advancing the tail pointer as each completes. No-op when recovery is disabled.
1133     */
1134    protected void recover () {
1135        if (doRecover) {
1136            if (tail.get() < head.get()) {
1137                getLog().info ("recover - tail=" +tail.get()+", head="+head.get());
1138            }
1139            while (tail.get() < head.get()) {
1140                recover (tail.getAndIncrement());
1141            }
1142        }
1143    }
1144    /**
1145     * Replays a single transaction by inspecting its persisted state and
1146     * driving either the commit or abort path before purging its state.
1147     *
1148     * @param id transaction identifier
1149     */
1150    protected void recover (long id) {
1151        LogEvent evt = getLog().createLogEvent ("recover");
1152        Profiler prof = new Profiler();
1153        evt.addMessage ("<id>" + id + "</id>");
1154        try {
1155            String stateKey   = getKey (STATE, id);
1156            String contextKey = getKey (CONTEXT, id);
1157            Integer state = (Integer) psp.rdp (stateKey);
1158            if (state == null) {
1159                evt.addMessage ("unknown stateKey " + stateKey);
1160                SpaceUtil.wipe (psp, contextKey);   // just in case ...
1161                return;
1162            }
1163            Serializable context = (Serializable) psp.rdp (contextKey);
1164            if (context != null)
1165                evt.addMessage (context);
1166
1167            if (DONE.equals (state)) {
1168                evt.addMessage ("<done/>");
1169            } else if (COMMITTING.equals (state)) {
1170                commit (0, id, context, getParticipants (id), true, evt, prof);
1171            } else if (PREPARING.equals (state)) {
1172                abort (0, id, context, getParticipants (id), true, evt, prof);
1173            }
1174            purge (id, true);
1175        } finally {
1176            evt.addMessage (prof);
1177            Logger.log (evt);
1178        }
1179    }
1180    /**
1181     * Lazily starts the {@link RetryTask} that drains the retry queue back into
1182     * the input space.
1183     */
1184    protected synchronized void checkRetryTask () {
1185        if (retryTask == null) {
1186            retryTask = new RetryTask();
1187            Thread.ofVirtual().start(retryTask);
1188        }
1189    }
1190
1191    /**
1192     * This method gives the opportunity to decorate a LogEvent right before
1193     * it gets logged. When overriding it, unless you know what you're doing,
1194     * you should return a FrozenLogEvent in order to prevent concurrency issues.
1195     *
1196     * @param context current Context
1197     * @param evt current LogEvent
1198     * @param prof profiler (may be null)
1199     * @return FrozenLogEvent
1200     */
1201    protected LogEvent freeze(Serializable context, LogEvent evt, Profiler prof) {
1202        return freezeLog ? new FrozenLogEvent(evt) : evt;
1203    }
1204
1205    /**
1206     * Background task that periodically pushes any contexts written to the
1207     * {@link #RETRY_QUEUE} back onto the input queue.
1208     */
1209    public class RetryTask implements Runnable {
1210        /** Creates the retry task bound to the enclosing manager. */
1211        public RetryTask() {}
1212        @Override
1213        public void run() {
1214            Thread.currentThread().setName (getName()+"-retry-task");
1215            while (running()) {
1216                for (Serializable context; (context = (Serializable)psp.rdp (RETRY_QUEUE)) != null;) 
1217                {
1218                    iisp.out (queue, context, retryTimeout);
1219                    psp.inp (RETRY_QUEUE);
1220                }
1221                ISOUtil.sleep(retryInterval);
1222            }
1223        }
1224    }
1225
1226    /**
1227     * Background task that throttles the input queue when active sessions
1228     * exceed the configured threshold.
1229     */
1230    public class InputQueueMonitor implements Runnable {
1231        /** Creates the input-queue monitor bound to the enclosing manager. */
1232        public InputQueueMonitor() {}
1233        @Override
1234        public void run() {
1235            Thread.currentThread().setName (getName()+"-input-queue-monitor");
1236            while (running()) {
1237                while (getOutstandingTransactions() > getActiveSessions() + threshold && running()) {
1238                    ISOUtil.sleep(100L);
1239                }
1240                if (!running())
1241                    break;
1242                try {
1243                    Object context = isp.in(queue, 1000L);
1244                    if (context != null) {
1245                        if (!running()) {
1246                            isp.out(queue, context); // place it back
1247                            break;
1248                        }
1249                        iisp.out(queue, context);
1250                    }
1251                } catch (SpaceError e) {
1252                    getLog().error(e);
1253                    ISOUtil.sleep(1000L); // relax on error
1254                }
1255            }
1256        }
1257    }
1258
1259    /**
1260     * This method returns the number of sessions that can be started at this point in time
1261     * @return number of sessions
1262     */
1263    protected int getSessionsToStandUp() {
1264        int outstandingTransactions = getOutstandingTransactions();
1265        int activeSessions = getActiveSessions();
1266        int count = 0;
1267        if (activeSessions < maxSessions && outstandingTransactions > threshold) {
1268            count = Math.min(outstandingTransactions, maxSessions - activeSessions);
1269        }
1270        return Math.min(1000, count); // reasonable value for virtual thread creation within one second
1271    }
1272  
1273    /**
1274     * Hook used by subclasses to drain a session early. The default
1275     * implementation never asks a session to stand down.
1276     *
1277     * @return {@code true} if the current session should stop accepting new transactions
1278     */
1279    protected boolean isSessionToStandDown() {
1280        return false;
1281    }
1282
1283    @Override
1284    public int getActiveSessions() {
1285        return activeSessions.intValue();
1286    }
1287    /**
1288     * @return the maximum number of sessions this manager may scale up to
1289     */
1290    public int getMaxSessions() {
1291        return maxSessions;
1292    }
1293    /**
1294     * Returns the current thread's transaction context as a raw {@link Serializable}.
1295     *
1296     * @return the thread-local context, or {@code null} when no transaction is in progress
1297     */
1298    public static Serializable getSerializable() {
1299        return tlContext.get();
1300    }
1301    /**
1302     * Returns the current thread's transaction context, narrowed to the caller's expected type.
1303     *
1304     * @param <T> caller-supplied context type
1305     * @return the thread-local context, or {@code null} when no transaction is in progress
1306     */
1307    public static <T extends Serializable> T getContext() {
1308        return (T) tlContext.get();
1309    }
1310    /**
1311     * Returns the current thread's transaction id, when one is in progress.
1312     *
1313     * @return the thread-local transaction id, or {@code null}
1314     */
1315    public static Long getId() {
1316        return tlId.get();
1317    }
1318
1319
1320    private void notifyStatusListeners
1321            (int session, TransactionStatusEvent.State state, long id, String info, Serializable context)
1322    {
1323        TransactionStatusEvent e = new TransactionStatusEvent(session, state, id, info, context);
1324        synchronized (statusListeners) {
1325            for (TransactionStatusListener l : statusListeners) {
1326                l.update (e);
1327            }
1328        }
1329    }
1330    private void setThreadName (long id, String method, TransactionParticipant p) {
1331        Thread.currentThread().setName(
1332            String.format("%s:%d %s %s %s", getName(), id, method, p.getClass().getName(),
1333                LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault()))
1334        );
1335    }
1336    private void setThreadLocal (long id, Serializable context) {
1337        tlId.set(id);
1338        tlContext.set(context);
1339    }
1340    private void removeThreadLocal() {
1341        tlId.remove();
1342        tlContext.remove();
1343    }
1344
1345    private String getName(TransactionParticipant p) {
1346        return Optional.ofNullable(params.get(p)).map(ParticipantParams::name).orElseGet(() -> defaultParticipantName(p));
1347    }
1348
1349    private ParticipantParams getParams (TransactionParticipant p) {
1350        return Optional.ofNullable(params.get(p)).orElseGet(() ->
1351          new ParticipantParams(defaultParticipantName(p), 0L, 0L, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
1352            getOrCreateTimers(p))
1353        );
1354    }
1355
1356    private String participantName(TransactionParticipant p, String alias) {
1357        String resolvedAlias = alias != null ? alias.trim() : "";
1358        return !resolvedAlias.isEmpty() ? resolvedAlias : defaultParticipantName(p);
1359    }
1360
1361    private String defaultParticipantName(TransactionParticipant p) {
1362        return Caller.shortClassName(p.getClass().getName());
1363    }
1364
1365    private void logParticipantWarning(String detail, TransactionParticipant participant, Throwable t) {
1366        Logger.log(getLog().createWarn(detail).withTag("participant", getName(participant)).add(t));
1367    }
1368
1369    private String tmInfo() {
1370        return String.format ("in-transit=%d, head=%d, tail=%d, paused=%d, outstanding=%d, active-sessions=%d/%d%s",
1371          getInTransit(), head.get(), tail.get(), pausedSessions.get(), getOutstandingTransactions(),
1372          getActiveSessions(), maxSessions,
1373          (tps != null ? ", " + tps : "")
1374        );
1375    }
1376
1377    private long getLong (Element e, String attributeName, long defValue) {
1378        String s = QFactory.getAttributeValue (e, attributeName);
1379        if (s != null) {
1380            try {
1381                return Long.parseLong(s);
1382            } catch (NumberFormatException ignored) {}
1383        }
1384        return defValue;
1385    }
1386
1387    private Instant getStart (Serializable context) {
1388        if (context instanceof Context) {
1389            Object o = ((Context) context).get(TIMESTAMP);
1390            if (o instanceof Instant)
1391                return (Instant) o;
1392        }
1393        return Instant.now();
1394    }
1395
1396    private boolean heavyLoaded() {
1397        return getActiveSessions() >= maxSessions;
1398    }
1399
1400    private int pauseAndWait(Serializable context, int action) {
1401        if (context instanceof Pausable pausable) try {
1402            pausedSessions.incrementAndGet();
1403            Future<Integer> paused = pausable.pause();
1404            long timeout = pausable.getTimeout();
1405            timeout = timeout > 0 ? Math.min (timeout, pauseTimeout) : pauseTimeout;
1406            try {
1407                action = paused.get(timeout, TimeUnit.MILLISECONDS);
1408            } catch (InterruptedException | ExecutionException e) {
1409                if (context instanceof Context ctx)
1410                    ctx.log(e);
1411            } catch (TimeoutException e) {
1412                action &= (PREPARED ^ 0xFFFF); // turn off 'PREPARED' - we need to abort
1413            } finally {
1414                pausable.reset();
1415            }
1416        } finally {
1417            pausedSessions.decrementAndGet();
1418        }
1419        return action;
1420    }
1421
1422    /**
1423     * Per-participant configuration: short name, timeouts, dependency sets,
1424     * and the timer bundle used to record per-phase metrics.
1425     *
1426     * @param name     short name used in trace messages and timer tags
1427     * @param timeout  per-transaction soft timeout in milliseconds
1428     * @param maxTime  hard ceiling in milliseconds (overrides {@code timeout} when smaller)
1429     * @param requires names this participant requires from a prior {@code provides}
1430     * @param provides names this participant exports for downstream {@code requires}
1431     * @param optional names this participant may consume but does not require
1432     * @param timers   per-phase Micrometer timers
1433     */
1434    private record ParticipantParams (
1435      String name,
1436      long timeout,
1437      long maxTime,
1438      Set<String> requires,
1439      Set<String> provides,
1440      Set<String> optional,
1441      Timers timers
1442    )
1443    {
1444        public boolean isConstrained() {
1445            return !requires.isEmpty() || !optional.isEmpty();
1446        }
1447    }
1448    private record Timers (
1449        io.micrometer.core.instrument.Timer prepareTimer,
1450        io.micrometer.core.instrument.Timer prepareForAbortTimer,
1451        io.micrometer.core.instrument.Timer commitTimer,
1452        io.micrometer.core.instrument.Timer abortTimer,
1453        io.micrometer.core.instrument.Timer snapshotTimer)
1454    { }
1455    /**
1456     * Single line of participant trace data accumulated on a {@link LogEvent}
1457     * during a transaction's lifecycle.
1458     *
1459     * @param phase   lifecycle phase tag (e.g. {@code prepare}, {@code commit})
1460     * @param message free-form message, typically the participant short name
1461     * @param info    optional trailing info, rendered after the message
1462     */
1463    public record Trace (String phase, String message, String info) {
1464        @Override
1465        public String toString() {
1466            return "%15s: %s%s".formatted(phase, message, info);
1467        }
1468        /**
1469         * Creates a trace line with no trailing info.
1470         *
1471         * @param phase   lifecycle phase tag
1472         * @param message free-form message
1473         * @return the assembled trace
1474         */
1475        public static Trace of (String phase, String message) {
1476            return new Trace (phase, message, "");
1477        }
1478        /**
1479         * Creates a trace line with trailing info.
1480         *
1481         * @param phase   lifecycle phase tag
1482         * @param message free-form message
1483         * @param info    trailing info
1484         * @return the assembled trace
1485         */
1486        public static Trace of (String phase, String message, String info) {
1487            return new Trace (phase, message, info);
1488        }
1489    }
1490
1491    private Set<String> getSet (Element e) {
1492        return e != null ? new HashSet<>(Arrays.asList(ISOUtil.commaDecode(e.getTextTrim()))) : Collections.emptySet();
1493    }
1494
1495    private int prepareOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriFunction<TransactionParticipant, Long, Serializable, Integer> preparationFunction) {
1496        int action;
1497
1498        if (context instanceof Context ctx && pp.isConstrained()) {
1499            if (!ctx.hasKeys(pp.requires.toArray())) {
1500                ctx.log ("missing.requires: '%s'".formatted(ctx.keysNotPresent(pp.requires.toArray())));
1501                action = ABORTED;
1502            } else {
1503                Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray());
1504                action = preparationFunction.apply(p, id, c);
1505                if (!pp.requires.contains(LOGEVT.toString())) {
1506                    // if we are not inheriting parent's log event and there's a log event
1507                    // in the childs context, copy it.
1508                    LogEvent evt = c.get(LOGEVT.toString());
1509                    if (evt != null) {
1510                        LogEvent parentLogEvent = ctx.getLogEvent();
1511                        synchronized (parentLogEvent) {
1512                            parentLogEvent.getPayLoad().addAll(evt.getPayLoad());
1513                        }
1514                        c.remove(LOGEVT.toString());
1515                    }
1516                }
1517                ctx.merge(c.clone(pp.provides.toArray()));
1518            }
1519        } else {
1520            action = preparationFunction.apply(p, id, context);
1521        }
1522        if ((action & PAUSE) == PAUSE) {
1523            var jfrp = new TMEvent.Pause(getName(), id);
1524            jfrp.begin();
1525            action = pauseAndWait(context, action);
1526            jfrp.commit();
1527        }
1528        return action;
1529    }
1530
1531    private void commitOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriConsumer<TransactionParticipant, Long, Serializable> preparationFunction) {
1532        if (context instanceof Context ctx && pp.isConstrained()) {
1533            Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray());
1534            preparationFunction.accept(p, id, c);
1535            ctx.merge(c.clone(pp.provides.toArray()));
1536        } else {
1537            preparationFunction.accept(p, id, context);
1538        }
1539    }
1540
1541    private Serializable recover (ContextRecovery p, long id, Serializable context, ParticipantParams pp, boolean commit) {
1542        var jfr = new TMEvent.Recover("%s:%s".formatted(getName(), p.getClass().getName()), id);
1543        jfr.begin();
1544        try {
1545            if (context instanceof Context ctx && pp.isConstrained()) {
1546                Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray());
1547                Serializable s = p.recover (id, c, commit);
1548                return (s instanceof Context rc) ?
1549                  rc.clone (pp.provides.toArray()) : s;
1550            } else {
1551                return p.recover (id, context, commit);
1552            }
1553        } finally {
1554            jfr.commit();
1555        }
1556    }
1557
1558    private Timers getOrCreateTimers(TransactionParticipant p) {
1559        return Optional.ofNullable(params.get(p)).map(ParticipantParams::timers).orElseGet(() -> {
1560            String participantShortName = Optional.ofNullable(params.get(p))
1561              .map(ParticipantParams::name)
1562              .orElseGet(() -> defaultParticipantName(p));
1563            var mr = getServer().getMeterRegistry();
1564            var tags = Tags.of("name", getName(), "participant", participantShortName);
1565            String realm = (p instanceof LogSource ls) ? ls.getRealm() : null;
1566            tags = tags.and("realm", (realm != null && !realm.isEmpty()) ? realm.trim() : "");
1567
1568            return new Timers(
1569              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare"))),
1570              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare-for-abort"))),
1571              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "commit"))),
1572              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "abort"))),
1573              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "snapshot")))
1574            );
1575        });
1576    }
1577
1578    private Timer addTimer (Timer m) {
1579        meters.add (m);
1580        return m;
1581    }
1582
1583    private UUID getTraceId (long transactionId) {
1584        return new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ transactionId);
1585    }
1586}