001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.transaction;
020
021import io.micrometer.core.instrument.Counter;
022import io.micrometer.core.instrument.Gauge;
023import io.micrometer.core.instrument.Meter;
024import io.micrometer.core.instrument.Tags;
025import io.micrometer.core.instrument.Timer;
026import io.micrometer.core.instrument.binder.BaseUnits;
027import org.HdrHistogram.AtomicHistogram;
028import org.jdom2.Element;
029import org.jpos.core.Configuration;
030import org.jpos.core.ConfigurationException;
031import org.jpos.log.evt.Txn;
032import org.jpos.metrics.MeterInfo;
033import org.jpos.function.TriConsumer;
034import org.jpos.function.TriFunction;
035import org.jpos.jfr.TMEvent;
036import org.jpos.metrics.MeterFactory;
037import org.jpos.q2.QBeanSupport;
038import org.jpos.q2.QFactory;
039import org.jpos.space.*;
040import org.jpos.util.*;
041
042import java.io.PrintStream;
043import java.io.Serializable;
044import java.util.*;
045import java.util.concurrent.*;
046import java.util.concurrent.atomic.AtomicInteger;
047
048import java.time.Instant;
049import java.time.LocalDateTime;
050import java.time.ZoneId;
051import java.util.concurrent.atomic.AtomicLong;
052import java.util.concurrent.locks.Lock;
053import java.util.concurrent.locks.ReentrantLock;
054
055import org.jpos.iso.ISOUtil;
056import org.jpos.util.Metrics;
057
058import static org.jpos.transaction.ContextConstants.LOGEVT;
059import static org.jpos.transaction.ContextConstants.TIMESTAMP;
060
061
062@SuppressWarnings("unchecked")
063public class TransactionManager 
064    extends QBeanSupport 
065    implements Runnable, TransactionConstants, TransactionManagerMBean, Loggeable, MetricsProvider {
066
067    public static final String  CONTEXT    = "$CONTEXT.";
068    public static final String  STATE      = "$STATE.";
069    public static final String  GROUPS     = "$GROUPS.";
070    public static final String  RETRY_QUEUE = "$RETRY_QUEUE";
071    public static final Integer PREPARING  = 0;
072    public static final Integer COMMITTING = 1;
073    public static final Integer DONE       = 2;
074    public static final String  DEFAULT_GROUP = "";
075    public static final long    MAX_PARTICIPANTS = 1000;  // loop prevention
076    public static final long    MAX_WAIT = 15000L;
077    protected Map<String,List<TransactionParticipant>> groups;
078    private Set<Destroyable> destroyables = new HashSet<>();
079    private static final ThreadLocal<Serializable> tlContext = new ThreadLocal<>();
080    private static final ThreadLocal<Long> tlId = new ThreadLocal<>();
081    private Metrics metrics;
082    private Map<TransactionParticipant,ParticipantParams> params = new HashMap<>();
083    private long globalMaxTime;
084
085    private Space<String,Object> sp;
086    private Space<String,Object> psp;
087    private Space<String,Object> isp;  // real input space
088    private Space<String,Object> iisp; // internal input space
089    private String queue;
090    private Lock tailLock = new ReentrantLock();
091    private final List<TransactionStatusListener> statusListeners = new ArrayList<>();
092    private boolean hasStatusListeners;
093    private boolean doRecover;
094    private boolean callSelectorOnAbort;
095    private boolean abortOnMisconfiguredGroups;
096    private int sessions;
097    private int maxSessions;
098    private int threshold;
099    private int maxActiveTransactions;
100    private final AtomicInteger activeSessions = new AtomicInteger();
101    private final AtomicInteger pausedSessions = new AtomicInteger();
102
103    private final AtomicLong head = new AtomicLong();
104    private final AtomicLong tail = new AtomicLong();
105
106    private long retryInterval = 5000L;
107    private long retryTimeout  = 60000L;
108    private long pauseTimeout  = 60000L;
109    private boolean abortOnPauseTimeout = true;
110    private Runnable retryTask = null;
111    private TPS tps;
112    private ExecutorService executor;
113    private final List<Meter> meters = new ArrayList<>();
114
115    private Gauge activeSessionsGauge;
116    private Counter transactionCounter;
117    private boolean freezeLog;
118    private UUID uuid = UUID.randomUUID();
119
120    @Override
121    public void initService () throws ConfigurationException {
122        queue = cfg.get ("queue", null);
123        if (queue == null)
124            throw new ConfigurationException ("queue property not specified");
125        sp  = SpaceFactory.getSpace (cfg.get ("space"));
126        isp = iisp = SpaceFactory.getSpace (cfg.get ("input-space", cfg.get ("space")));
127        psp  = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString()));
128        doRecover = cfg.getBoolean ("recover", psp instanceof PersistentSpace);
129        tail.set(cfg.getLong ("initial-tail", 1));
130        head.set(tail.get());
131        groups = new HashMap<>();
132        initParticipants (getPersist());
133        initStatusListeners (getPersist());
134        executor = QFactory.executorService(cfg.getBoolean("virtual-threads", true));
135    }
136
137    @Override
138    public void startService () throws Exception {
139        recover();
140        if (tps != null)
141            tps.stop();
142        tps = new TPS (cfg.getBoolean ("auto-update-tps", true));
143        Thread.ofPlatform().start(this);
144        if (psp.rdp (RETRY_QUEUE) != null)
145            checkRetryTask();
146
147        if (iisp != isp) {
148            Thread.ofPlatform().unstarted(
149              new InputQueueMonitor()
150            ).start();
151        }
152        NameRegistrar.register(getName(), this);
153    }
154
155    @Override
156    public void stopService () {
157        NameRegistrar.unregister(getName());
158        if (iisp != isp)
159            for (Object o=iisp.inp(queue); o != null; o=iisp.inp(queue))
160                isp.out(queue, o); // push back to replicated space
161
162        meters.forEach(getServer().getMeterRegistry()::remove);
163        tps.stop();
164        for (Destroyable destroyable : destroyables) {
165            try {
166                destroyable.destroy();
167            } catch (Throwable t) {
168                getLog().warn (t);
169            }
170        }
171    }
172    public void queue (Serializable context) {
173        iisp.out(queue, context);
174    }
175    public void push (Serializable context) {
176        iisp.push(queue, context);
177    }
178    @SuppressWarnings("unused")
179    public String getQueueName() {
180        return queue;
181    }
182    public Space getSpace() {
183        return sp;
184    }
185    public Space getInputSpace() {
186        return isp;
187    }
188    public Space getPersistentSpace() {
189        return psp;
190    }
191
192    @Override
193    public void run () {
194        while (running()) {
195            if (heavyLoaded()) {
196                ISOUtil.sleep (100L);
197                getLog().info ("HeavyLoaded - active sessions: " + getActiveSessions());
198                continue;
199            }
200            Object obj = iisp.in (queue, MAX_WAIT);
201            if (obj instanceof Serializable context) {
202                if (getActiveSessions() <= maxSessions) {
203                    if (context instanceof Context ctx)
204                        ctx.log ("active=%d, maxSessions=%d".formatted(getActiveSessions(), maxSessions));
205                    int session = activeSessions.incrementAndGet();
206                    transactionCounter.increment();
207                    executor.execute(() -> {
208                        try {
209                            runTransaction(context, session);
210                        } finally {
211                            activeSessions.decrementAndGet();
212                        }
213                    });
214                }
215                else {
216                    iisp.push(queue, context);  // push it back
217                    ISOUtil.sleep(100L);
218                }
219            }
220        }
221    }
222
223    private void runTransaction (Serializable context, int session) {
224        long id = 0;
225        List<TransactionParticipant> members;
226        Iterator<TransactionParticipant> iter;
227        boolean abort;
228        LogEvent evt;
229        Profiler prof;
230        Thread thread = Thread.currentThread();
231
232        prof = null;
233        evt = null;
234        thread.setName (getName() + "-" + session + ":idle");
235        int action = -1;
236        id = head.getAndIncrement ();
237        TMEvent tme = new TMEvent(getName(), id);
238        Txn txn = new Txn(getName(), id);
239
240        tme.begin();
241        try {
242            setThreadLocal(id, context);
243            if (hasStatusListeners)
244                notifyStatusListeners (session, TransactionStatusEvent.State.READY, id, "", null);
245
246            Chronometer chronometer = new Chronometer(getStart(context));
247
248            abort = false;
249            members = new ArrayList<> ();
250            iter = getParticipants (DEFAULT_GROUP).iterator();
251            evt = new LogEvent()
252              .withSource(log)
253              .withTraceId(getTraceId(id));
254            evt.addMessage(txn);
255            evt.addMessage(context);
256            prof = new Profiler();
257            snapshot (id, context, PREPARING);
258            action = prepare (session, id, context, members, iter, abort, evt, prof, chronometer);
259            switch (action) {
260                case PREPARED:
261                    if (members.size() > 0) {
262                        setState(id, COMMITTING);
263                        commit(session, id, context, members, false, evt, prof);
264                    }
265                    break;
266                case ABORTED:
267                    if (members.size() > 0) {
268                        abort(session, id, context, members, false, evt, prof);
269                    }
270                    break;
271                case RETRY:
272                    psp.out (RETRY_QUEUE, context);
273                    checkRetryTask();
274                    break;
275                case NO_JOIN:
276                    break;
277            }
278            snapshot (id, null, DONE);
279            if (id == tail.get()) {
280                checkTail ();
281            } else {
282                purge (id, false);
283            }
284            tps.tick();
285        } catch (Throwable t) {
286            if (evt == null)
287                getLog().fatal (t); // should never happen
288            else
289                evt.addMessage (t);
290        } finally {
291            removeThreadLocal();
292            if (hasStatusListeners) {
293                notifyStatusListeners (
294                  session,
295                  TransactionStatusEvent.State.DONE,
296                  id, "", context);
297            }
298            if (evt != null && (action == PREPARED || action == ABORTED || (action == -1 && prof != null))) {
299                switch (action) {
300                    case PREPARED :
301                        evt.setTag("commit");
302                        break;
303                    case ABORTED :
304                        evt.setTag ("abort");
305                        break;
306                    case -1:
307                        evt.setTag ("undefined");
308                        break;
309                }
310                if (getInTransit() > Math.max(maxActiveTransactions, activeSessions.get()) * 100L) {
311                    evt.addMessage("WARNING: IN-TRANSIT TOO HIGH");
312                }
313                evt.addMessage (
314                  String.format (" %s, elapsed=%dms",
315                    tmInfo(),
316                    prof.getElapsedInMillis()
317                  )
318                );
319                evt.addMessage (prof);
320                try {
321                    Logger.log(freeze(context, evt, prof));
322                } catch (Throwable t) {
323                    getLog().error(t);
324                }
325            }
326            tme.commit();
327        }
328    }
329
330    @Override
331    public long getTail () {
332        return tail.get();
333    }
334
335    @Override
336    public long getHead () {
337        return head.get();
338    }
339
340    public long getInTransit () {
341        return head.get() - tail.get();
342    }
343
344    @Override
345    public void setConfiguration (Configuration cfg) throws ConfigurationException {
346        super.setConfiguration (cfg);
347        retryInterval = cfg.getLong ("retry-interval", retryInterval);
348        retryTimeout  = cfg.getLong ("retry-timeout", retryTimeout);
349        pauseTimeout  = cfg.getLong ("pause-timeout", pauseTimeout);
350        abortOnPauseTimeout = cfg.getBoolean("abort-on-pause-timeout", true);
351        maxActiveTransactions = cfg.getInt  ("max-active-sessions", 0);
352        sessions = cfg.getInt ("sessions", 1);
353        threshold = cfg.getInt ("threshold", sessions / 2);
354        maxSessions = cfg.getInt ("max-sessions", sessions);
355        globalMaxTime = cfg.getLong("max-time", 0L);
356        if (maxSessions < sessions)
357            throw new ConfigurationException("max-sessions < sessions");
358        if (maxActiveTransactions > 0) {
359            if (maxActiveTransactions < sessions)
360                throw new ConfigurationException("max-active-sessions < sessions");
361            if (maxActiveTransactions < maxSessions)
362                throw new ConfigurationException("max-active-sessions < max-sessions");
363        }
364        callSelectorOnAbort = cfg.getBoolean("call-selector-on-abort", true);
365        metrics = new Metrics(new AtomicHistogram(cfg.getLong("metrics-highest-trackable-value", 60000), 2));
366        abortOnMisconfiguredGroups = cfg.getBoolean("abort-on-misconfigured-groups");
367
368        try {
369            activeSessionsGauge = MeterFactory.gauge
370              (getServer().getMeterRegistry(), MeterInfo.TM_ACTIVE, Tags.of("name", getName()), BaseUnits.SESSIONS, activeSessions::get
371            );
372            transactionCounter = MeterFactory.counter
373              (getServer().getMeterRegistry(), MeterInfo.TM_COUNTER, Tags.of("name", getName())
374            );
375            meters.add(activeSessionsGauge);
376            meters.add(transactionCounter);
377        } catch (Exception e) {
378            throw new ConfigurationException (e);
379        }
380        freezeLog = cfg.getBoolean("freeze-log", true);
381    }
382    public void addListener (TransactionStatusListener l) {
383        synchronized (statusListeners) {
384            statusListeners.add (l);
385            hasStatusListeners = true;
386        }
387    }
388    public void removeListener (TransactionStatusListener l) {
389        synchronized (statusListeners) {
390            statusListeners.remove(l);
391            hasStatusListeners = !statusListeners.isEmpty();
392        }
393    }
394    public TPS getTPS() {
395        return tps;
396    }
397
398    @Override
399    public String getTPSAsString() {
400        return tps.toString();
401    }
402
403    @Override
404    public float getTPSAvg() {
405        return tps.getAvg();
406    }
407
408    @Override
409    public int getTPSPeak() {
410        return tps.getPeak();
411    }
412
413    @Override
414    public Date getTPSPeakWhen() {
415        return new Date(tps.getPeakWhen());
416    }
417
418    @Override
419    public long getTPSElapsed() {
420        return tps.getElapsed();
421    }
422
423    @Override
424    public void resetTPS() {
425        tps.reset();
426    }
427
428    @Override
429    public Metrics getMetrics() {
430        return metrics;
431    }
432
433    @Override
434    public void dump (PrintStream ps, String indent) {
435        ps.printf ("%s%s%n", indent, tmInfo());
436        if (metrics != null) {
437            metrics.dump(ps, indent);
438        }
439    }
440
441    protected void commit
442        (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof)
443    {
444        for (TransactionParticipant p :members) {
445            var jfr = new TMEvent.Commit("%s:%s".formatted(getName(), p.getClass().getName()), id);
446            jfr.begin();
447            ParticipantParams pp = getParams(p);
448            if (recover && p instanceof ContextRecovery cr) {
449                context = recover (cr, id, context, pp, true);
450                if (evt != null)
451                    evt.addMessage (Trace.of("commit-recover", getName(p)));
452            }
453            if (hasStatusListeners)
454                notifyStatusListeners (
455                    session, TransactionStatusEvent.State.COMMITING, id, getName(p), context
456                );
457            commitOrAbort (p, id, context, pp, this::commit);
458            if (evt != null) {
459                evt.addMessage (Trace.of("commit", getName(p)));
460                if (prof != null)
461                    prof.checkPoint (" commit: " + getName(p));
462            }
463            jfr.commit();
464        }
465    }
466    protected void abort 
467        (int session, long id, Serializable context, List<TransactionParticipant> members, boolean recover, LogEvent evt, Profiler prof)
468    {
469        for (TransactionParticipant p :members) {
470            ParticipantParams pp = getParams(p);
471            if (recover && p instanceof ContextRecovery cr) {
472                context = recover (cr, id, context, pp, true);
473                if (evt != null)
474                    evt.addMessage (Trace.of("abort-recover", getName(p)));
475            }
476            if (hasStatusListeners)
477                notifyStatusListeners (
478                    session, TransactionStatusEvent.State.ABORTING, id, getName(p), context
479                );
480
481            commitOrAbort (p, id, context, pp, this::abort);
482            if (evt != null) {
483                evt.addMessage (Trace.of("abort", getName(p)));
484                if (prof != null)
485                    prof.checkPoint ("  abort: " + getName(p));
486            }
487        }
488    }
489    protected int prepareForAbort
490        (TransactionParticipant p, long id, Serializable context) 
491    {
492        Chronometer c = new Chronometer();
493        try {
494            if (p instanceof AbortParticipant) {
495                setThreadName(id, "prepareForAbort", p);
496                return ((AbortParticipant)p).prepareForAbort (id, context);
497            }
498        } catch (Throwable t) {
499            getLog().warn ("PREPARE-FOR-ABORT: " + id, t);
500        } finally {
501            getParams(p).timers.prepareForAbortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
502            if (metrics != null)
503                metrics.record(getName(p) + "-prepare-for-abort", c.elapsed());
504        }
505        return ABORTED | NO_JOIN;
506    }
507    protected int prepare 
508        (TransactionParticipant p, long id, Serializable context) 
509    {
510        Chronometer c = new Chronometer();
511        try {
512            setThreadName(id, "prepare", p);
513            return p.prepare (id, context);
514        } catch (Throwable t) {
515            getLog().warn ("PREPARE: " + id, t);
516        } finally {
517            getParams(p).timers.prepareTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
518            if (metrics != null) {
519                metrics.record(getName(p) + "-prepare", c.elapsed());
520            }
521        }
522        return ABORTED;
523    }
524    protected void commit 
525        (TransactionParticipant p, long id, Serializable context) 
526    {
527        Chronometer c = new Chronometer();
528        try {
529            setThreadName(id, "commit", p);
530            p.commit(id, context);
531        } catch (Throwable t) {
532            getLog().warn ("COMMIT: " + id, t);
533        } finally {
534            getParams(p).timers.commitTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
535            if (metrics != null)
536                metrics.record(getName(p) + "-commit", c.elapsed());
537        }
538    }
539    protected void abort 
540        (TransactionParticipant p, long id, Serializable context) 
541    {
542        Chronometer c = new Chronometer();
543        try {
544            setThreadName(id, "abort", p);
545            p.abort(id, context);
546        } catch (Throwable t) {
547            getLog().warn ("ABORT: " + id, t);
548        } finally {
549            getParams(p).timers.abortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
550            if (metrics != null)
551                metrics.record(getName(p) + "-abort", c.elapsed());
552        }
553    }
554    protected int prepare
555        (int session, long id, Serializable context, List<TransactionParticipant> members, Iterator<TransactionParticipant> iter, boolean abort, LogEvent evt, Profiler prof, Chronometer chronometer)
556    {
557        boolean retry = false;
558        for (int i=0; iter.hasNext (); i++) {
559            int action;
560            if (i > MAX_PARTICIPANTS) {
561                getLog().warn (
562                    "loop detected - transaction " +id + " aborted."
563                );
564                return ABORTED;
565            }
566            TransactionParticipant p = iter.next();
567
568            ParticipantParams pp = getParams(p);
569            if (!abort && pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime) {
570                abort = true;
571                if (evt != null)
572                    evt.addMessage("    forcedAbort: " + getName(p) + " elapsed=" + chronometer.elapsed());
573            }
574
575            TMEvent jfr;
576            if (abort) {
577                jfr = new TMEvent.PrepareForAbort("%s:%s".formatted(getName(), p.getClass().getName()), id);
578                jfr.begin();
579                if (hasStatusListeners)
580                    notifyStatusListeners (
581                        session, TransactionStatusEvent.State.PREPARING_FOR_ABORT, id, getName(p), context
582                    );
583
584                action = prepareOrAbort (p, id, context, pp, this::prepareForAbort);
585
586                if (evt != null && p instanceof AbortParticipant) {
587                    evt.addMessage(Trace.of("prepareForAbort", getName(p)));
588                    if (prof != null)
589                        prof.checkPoint ("prepareForAbort: " + getName(p));
590                }
591            } else {
592                if (hasStatusListeners)
593                    notifyStatusListeners (
594                        session, TransactionStatusEvent.State.PREPARING, id, getName(p), context
595                    );
596
597                jfr = new TMEvent.Prepare("%s:%s".formatted(getName(), p.getClass().getName()), id);
598                jfr.begin();
599
600                chronometer.lap();
601                action = prepareOrAbort (p, id, context, pp, this::prepare);
602                boolean timeout = pp.timeout > 0 && chronometer.partial() > pp.timeout;
603                boolean maxTime = pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime;
604                if (timeout || maxTime)
605                    action &= (PREPARED ^ 0xFFFF);
606
607                abort  = (action & PREPARED) == ABORTED;
608                retry  = (action & RETRY) == RETRY;
609
610                if (evt != null) {
611                    evt.addMessage (Trace.of("prepare", getName(p),
612                            (abort ? " ABORTED" : " PREPARED")
613                            + (timeout ? " TIMEOUT" : "")
614                            + (maxTime ? " MAX_TIMEOUT" : "")
615                            + (retry ? " RETRY" : "")
616                            + ((action & READONLY) == READONLY ? " READONLY" : "")
617                            + ((action & NO_JOIN) == NO_JOIN ? " NO_JOIN" : ""))
618                    );
619                    if (prof != null)
620                        prof.checkPoint ("prepare: " + getName(p));
621                }
622            }
623
624            if ((action & READONLY) == 0) {
625                Chronometer c = new Chronometer();
626                snapshot (id, context);
627                getParams(p).timers.snapshotTimer.record (c.elapsed(), TimeUnit.MILLISECONDS);
628                if (metrics != null)
629                    metrics.record(getName(p) + "-snapshot", c.elapsed());
630            }
631            if ((action & NO_JOIN) == 0) {
632                members.add (p);
633            }
634            if (p instanceof GroupSelector && ((action & PREPARED) == PREPARED || callSelectorOnAbort)) {
635                String groupName = null;
636                Chronometer c = new Chronometer();
637                try {
638                    groupName = ((GroupSelector)p).select (id, context);
639                } catch (Exception e) {
640                    if (evt != null) 
641                        evt.addMessage ("       selector: " + getName(p) + " " + e.getMessage());
642                    else 
643                        getLog().error ("       selector: " + getName(p) + " " + e.getMessage());
644                } finally {
645                    if (metrics != null)
646                        metrics.record(getName(p) + "-selector", c.lap());
647                }
648                if (evt != null) {
649                    evt.addMessage ("       selector: '" + groupName +"'");
650                }
651                if (groupName != null) {
652                    StringTokenizer st = new StringTokenizer (groupName, " ,");
653                    List<TransactionParticipant> participants = new ArrayList();
654                    while (st.hasMoreTokens ()) {
655                        String grp = st.nextToken();
656                        addGroup (id, grp);
657                        if (evt != null && groups.get(grp) == null) {
658                            evt.addMessage ("                 WARNING: group '" + grp + "' not configured");
659                            if (abortOnMisconfiguredGroups)
660                                abort = true;
661                        }
662                        participants.addAll (getParticipants (grp));
663                    }
664                    while (iter.hasNext())
665                        participants.add (iter.next());
666
667                    iter = participants.iterator();
668                }
669            }
670            jfr.commit();
671        }
672        return abort ? retry ? RETRY : ABORTED : PREPARED;
673    }
674    protected List<TransactionParticipant> getParticipants (String groupName) {
675        List<TransactionParticipant> participants = groups.get (groupName);
676        if (participants == null) {
677            participants = new ArrayList();
678        }
679        return participants;
680    }
681    protected List<TransactionParticipant> getParticipants (long id) {
682        // Use a local copy of participant to avoid adding the 
683        // GROUP participant to the DEFAULT_GROUP
684        List<TransactionParticipant> participantsChain = new ArrayList<>();
685        List<TransactionParticipant> participants = getParticipants (DEFAULT_GROUP);
686        // Add DEFAULT_GROUP participants 
687        participantsChain.addAll(participants);
688        String key = getKey(GROUPS, id);
689        String grp;
690        // now add participants of Group 
691        while ( (grp = (String) psp.inp (key)) != null) {
692            participantsChain.addAll (getParticipants (grp));
693        }
694        return participantsChain;
695    }
696
697    protected void initStatusListeners (Element config)  throws ConfigurationException{
698        final Iterator iter = config.getChildren ("status-listener").iterator();
699        while (iter.hasNext()) {
700            final Element e = (Element) iter.next();
701            final QFactory factory = getFactory();
702            final TransactionStatusListener listener = (TransactionStatusListener) factory.newInstance (QFactory.getAttributeValue (e, "class"));
703            factory.setConfiguration (listener, config);
704            addListener(listener);
705        }
706    }
707
708    protected void initParticipants (Element config) 
709        throws ConfigurationException
710    {
711        groups.put (DEFAULT_GROUP,  initGroup (config));
712        for (Element e : config.getChildren("group")) {
713            String name = QFactory.getAttributeValue (e, "name");
714            if (name == null) 
715                throw new ConfigurationException ("missing group name");
716            if (groups.containsKey(name)) {
717                throw new ConfigurationException (
718                    "Group '" + name + "' already defined"
719                );
720            }
721            groups.put (name, initGroup (e));
722        }
723    }
724    protected List<TransactionParticipant> initGroup (Element e) 
725        throws ConfigurationException
726    {
727        List<TransactionParticipant> group = new ArrayList<>();
728        for (Element el : e.getChildren ("participant")) {
729            if (QFactory.isEnabled(el)) {
730                group.add(createParticipant(el));
731            } else {
732                getLog().warn ("participant ignored (enabled='" + QFactory.getEnabledAttribute(el) + "'): " + el.getAttributeValue("class") + "/" + el.getAttributeValue("realm"));
733            }
734        }
735        return group;
736    }
737    public TransactionParticipant createParticipant (Element e)
738        throws ConfigurationException
739    {
740        QFactory factory = getFactory();
741        TransactionParticipant participant = factory.newInstance (QFactory.getAttributeValue (e, "class"));
742        factory.setLogger (participant, e);
743        QFactory.invoke (participant, "setTransactionManager", this, TransactionManager.class);
744        factory.setConfiguration (participant, e);
745        String realm = QFactory.getAttributeValue(e, "realm");
746
747        try {
748            String participantShortName = Caller.shortClassName(participant.getClass().getName());
749            params.put(participant, new ParticipantParams(
750                participantShortName + (realm != null && !realm.isEmpty() ? ":" + realm : ""),
751                getLong (e, "timeout", 0L),
752                getLong (e, "max-time", globalMaxTime),
753                getSet(e.getChild("requires")),
754                getSet(e.getChild("provides")),
755                getSet(e.getChild("optional")),
756                getOrCreateTimers(participant)
757              )
758            );
759        } catch (Exception ex) {
760            throw new ConfigurationException (ex);
761        }
762        if (participant instanceof Destroyable) {
763            destroyables.add((Destroyable) participant);
764        }
765        return participant;
766    }
767
768    @Override
769    public int getOutstandingTransactions() {
770        if (iisp instanceof LocalSpace)
771            return ((LocalSpace) iisp).size(queue);
772        return -1;
773    }
774    protected String getKey (String prefix, long id) {
775        StringBuilder sb = new StringBuilder (getName());
776        sb.append ('.');
777        sb.append (prefix);
778        sb.append (id);
779        return sb.toString ();
780    }
781    protected void commitOff (Space sp) {
782        if (sp instanceof JDBMSpace jsp) {
783            jsp.setAutoCommit(false);
784        }
785    }
786    protected void commitOn (Space sp) {
787        if (sp instanceof JDBMSpace jsp) {
788            jsp.commit ();
789            jsp.setAutoCommit(true);
790        }
791    }
792    protected void checkTail () {
793        tailLock.lock();
794        try {
795            while (tailDone()) {
796                tail.incrementAndGet();
797            }
798        } finally {
799            tailLock.unlock();
800        }
801    }
802    protected boolean tailDone () {
803        String stateKey = getKey(STATE, tail.get());
804        if (DONE.equals (psp.rdp (stateKey))) {
805            purge (tail.get(), true);
806            return true;
807        }
808        return false;
809    }
810    protected void snapshot (long id, Serializable context) {
811        snapshot (id, context, null);
812    }
813    protected void snapshot (long id, Serializable context, Integer status) {
814        if (!doRecover && status != DONE)
815            return; // nothing to do
816
817        var jfr = new TMEvent.Snapshot(getName()+":"+status, id);
818        jfr.begin();
819
820        String contextKey = getKey (CONTEXT, id);
821        synchronized (psp) {
822            commitOff (psp);
823            SpaceUtil.wipe(psp, contextKey);
824            if (context != null)
825                psp.out (contextKey, context);
826
827            if (status != null) {
828                String stateKey  = getKey (STATE, id);
829                psp.put (stateKey, status);
830            }
831            commitOn (psp);
832        }
833        jfr.commit();
834    }
835    protected void setState (long id, Integer state) {
836        String stateKey  = getKey (STATE, id);
837        synchronized (psp) {
838            commitOff (psp);
839            SpaceUtil.wipe(psp, stateKey);
840            if (state!= null)
841                psp.out (stateKey, state);
842            commitOn (psp);
843        }
844    }
845    protected void addGroup (long id, String groupName) {
846        if (groupName != null)
847            psp.out (getKey (GROUPS, id), groupName);
848    }
849    protected void purge (long id, boolean full) {
850        String stateKey   = getKey (STATE, id);
851        String contextKey = getKey (CONTEXT, id);
852        String groupsKey  = getKey (GROUPS, id);
853        synchronized (psp) {
854            commitOff (psp);
855            if (full)
856                SpaceUtil.wipe(psp, stateKey);
857            SpaceUtil.wipe(psp, contextKey);
858            SpaceUtil.wipe(psp, groupsKey);
859            commitOn (psp);
860        }
861    }
862
863    protected void recover () {
864        if (doRecover) {
865            if (tail.get() < head.get()) {
866                getLog().info ("recover - tail=" +tail.get()+", head="+head.get());
867            }
868            while (tail.get() < head.get()) {
869                recover (tail.getAndIncrement());
870            }
871        }
872    }
873    protected void recover (long id) {
874        LogEvent evt = getLog().createLogEvent ("recover");
875        Profiler prof = new Profiler();
876        evt.addMessage ("<id>" + id + "</id>");
877        try {
878            String stateKey   = getKey (STATE, id);
879            String contextKey = getKey (CONTEXT, id);
880            Integer state = (Integer) psp.rdp (stateKey);
881            if (state == null) {
882                evt.addMessage ("unknown stateKey " + stateKey);
883                SpaceUtil.wipe (psp, contextKey);   // just in case ...
884                return;
885            }
886            Serializable context = (Serializable) psp.rdp (contextKey);
887            if (context != null)
888                evt.addMessage (context);
889
890            if (DONE.equals (state)) {
891                evt.addMessage ("<done/>");
892            } else if (COMMITTING.equals (state)) {
893                commit (0, id, context, getParticipants (id), true, evt, prof);
894            } else if (PREPARING.equals (state)) {
895                abort (0, id, context, getParticipants (id), true, evt, prof);
896            }
897            purge (id, true);
898        } finally {
899            evt.addMessage (prof);
900            Logger.log (evt);
901        }
902    }
903    protected synchronized void checkRetryTask () {
904        if (retryTask == null) {
905            retryTask = new RetryTask();
906            Thread.ofVirtual().start(retryTask);
907        }
908    }
909
910    /**
911     * This method gives the opportunity to decorate a LogEvent right before
912     * it gets logged. When overriding it, unless you know what you're doing,
913     * you should return a FrozenLogEvent in order to prevent concurrency issues.
914     *
915     * @param context current Context
916     * @param evt current LogEvent
917     * @param prof profiler (may be null)
918     * @return FrozenLogEvent
919     */
920    protected LogEvent freeze(Serializable context, LogEvent evt, Profiler prof) {
921        return freezeLog ? new FrozenLogEvent(evt) : evt;
922    }
923
924    public class RetryTask implements Runnable {
925        @Override
926        public void run() {
927            Thread.currentThread().setName (getName()+"-retry-task");
928            while (running()) {
929                for (Serializable context; (context = (Serializable)psp.rdp (RETRY_QUEUE)) != null;) 
930                {
931                    iisp.out (queue, context, retryTimeout);
932                    psp.inp (RETRY_QUEUE);
933                }
934                ISOUtil.sleep(retryInterval);
935            }
936        }
937    }
938
939    public class InputQueueMonitor implements Runnable {
940        @Override
941        public void run() {
942            Thread.currentThread().setName (getName()+"-input-queue-monitor");
943            while (running()) {
944                while (getOutstandingTransactions() > getActiveSessions() + threshold && running()) {
945                    ISOUtil.sleep(100L);
946                }
947                if (!running())
948                    break;
949                try {
950                    Object context = isp.in(queue, 1000L);
951                    if (context != null) {
952                        if (!running()) {
953                            isp.out(queue, context); // place it back
954                            break;
955                        }
956                        iisp.out(queue, context);
957                    }
958                } catch (SpaceError e) {
959                    getLog().error(e);
960                    ISOUtil.sleep(1000L); // relax on error
961                }
962            }
963        }
964    }
965
966    /**
967     * This method returns the number of sessions that can be started at this point in time
968     * @return number of sessions
969     */
970    protected int getSessionsToStandUp() {
971        int outstandingTransactions = getOutstandingTransactions();
972        int activeSessions = getActiveSessions();
973        int count = 0;
974        if (activeSessions < maxSessions && outstandingTransactions > threshold) {
975            count = Math.min(outstandingTransactions, maxSessions - activeSessions);
976        }
977        return Math.min(1000, count); // reasonable value for virtual thread creation within one second
978    }
979  
980    /** 
981     * This method returns true if current session should stop working on more messages
982     * @return
983     */
984    protected boolean isSessionToStandDown() {
985        return false;
986    }
987
988    @Override
989    public int getActiveSessions() {
990        return activeSessions.intValue();
991    }
992    public int getMaxSessions() {
993        return maxSessions;
994    }
995    public static Serializable getSerializable() {
996        return tlContext.get();
997    }
998    public static <T extends Serializable> T getContext() {
999        return (T) tlContext.get();
1000    }
1001    public static Long getId() {
1002        return tlId.get();
1003    }
1004
1005
1006    private void notifyStatusListeners
1007            (int session, TransactionStatusEvent.State state, long id, String info, Serializable context)
1008    {
1009        TransactionStatusEvent e = new TransactionStatusEvent(session, state, id, info, context);
1010        synchronized (statusListeners) {
1011            for (TransactionStatusListener l : statusListeners) {
1012                l.update (e);
1013            }
1014        }
1015    }
1016    private void setThreadName (long id, String method, TransactionParticipant p) {
1017        Thread.currentThread().setName(
1018            String.format("%s:%d %s %s %s", getName(), id, method, p.getClass().getName(),
1019                LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault()))
1020        );
1021    }
1022    private void setThreadLocal (long id, Serializable context) {
1023        tlId.set(id);
1024        tlContext.set(context);
1025    }
1026    private void removeThreadLocal() {
1027        tlId.remove();
1028        tlContext.remove();
1029    }
1030
1031    private String getName(TransactionParticipant p) {
1032        return p.getClass().getName();
1033    }
1034
1035    private ParticipantParams getParams (TransactionParticipant p) {
1036        return Optional.ofNullable(params.get(p)).orElseGet(() ->
1037          new ParticipantParams(p.getClass().getName(), 0L, 0L, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
1038            getOrCreateTimers(p))
1039        );
1040    }
1041
1042    private String tmInfo() {
1043        return String.format ("in-transit=%d, head=%d, tail=%d, paused=%d, outstanding=%d, active-sessions=%d/%d%s",
1044          getInTransit(), head.get(), tail.get(), pausedSessions.get(), getOutstandingTransactions(),
1045          getActiveSessions(), maxSessions,
1046          (tps != null ? ", " + tps : "")
1047        );
1048    }
1049
1050    private long getLong (Element e, String attributeName, long defValue) {
1051        String s = QFactory.getAttributeValue (e, attributeName);
1052        if (s != null) {
1053            try {
1054                return Long.parseLong(s);
1055            } catch (NumberFormatException ignored) {}
1056        }
1057        return defValue;
1058    }
1059
1060    private Instant getStart (Serializable context) {
1061        if (context instanceof Context) {
1062            Object o = ((Context) context).get(TIMESTAMP);
1063            if (o instanceof Instant)
1064                return (Instant) o;
1065        }
1066        return Instant.now();
1067    }
1068
1069    private boolean heavyLoaded() {
1070        return getActiveSessions() >= maxSessions;
1071    }
1072
1073    private int pauseAndWait(Serializable context, int action) {
1074        if (context instanceof Pausable pausable) try {
1075            pausedSessions.incrementAndGet();
1076            Future<Integer> paused = pausable.pause();
1077            long timeout = pausable.getTimeout();
1078            timeout = timeout > 0 ? Math.min (timeout, pauseTimeout) : pauseTimeout;
1079            try {
1080                action = paused.get(timeout, TimeUnit.MILLISECONDS);
1081            } catch (InterruptedException | ExecutionException e) {
1082                if (context instanceof Context ctx)
1083                    ctx.log(e);
1084            } catch (TimeoutException e) {
1085                action &= (PREPARED ^ 0xFFFF); // turn off 'PREPARED' - we need to abort
1086            } finally {
1087                pausable.reset();
1088            }
1089        } finally {
1090            pausedSessions.decrementAndGet();
1091        }
1092        return action;
1093    }
1094
1095    private record ParticipantParams (
1096      String name,
1097      long timeout,
1098      long maxTime,
1099      Set<String> requires,
1100      Set<String> provides,
1101      Set<String> optional,
1102      Timers timers
1103    )
1104    {
1105        public boolean isConstrained() {
1106            return !requires.isEmpty() || !optional.isEmpty();
1107        }
1108    }
1109    private record Timers (
1110        io.micrometer.core.instrument.Timer prepareTimer,
1111        io.micrometer.core.instrument.Timer prepareForAbortTimer,
1112        io.micrometer.core.instrument.Timer commitTimer,
1113        io.micrometer.core.instrument.Timer abortTimer,
1114        io.micrometer.core.instrument.Timer snapshotTimer)
1115    { }
1116    public record Trace (String phase, String message, String info) {
1117        @Override
1118        public String toString() {
1119            return "%15s: %s%s".formatted(phase, message, info);
1120        }
1121        public static Trace of (String phase, String message) {
1122            return new Trace (phase, message, "");
1123        }
1124        public static Trace of (String phase, String message, String info) {
1125            return new Trace (phase, message, info);
1126        }
1127    }
1128
1129    private Set<String> getSet (Element e) {
1130        return e != null ? new HashSet<>(Arrays.asList(ISOUtil.commaDecode(e.getTextTrim()))) : Collections.emptySet();
1131    }
1132
1133    private int prepareOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriFunction<TransactionParticipant, Long, Serializable, Integer> preparationFunction) {
1134        int action;
1135
1136        if (context instanceof Context ctx && pp.isConstrained()) {
1137            if (!ctx.hasKeys(pp.requires.toArray())) {
1138                ctx.log ("missing.requires: '%s'".formatted(ctx.keysNotPresent(pp.requires.toArray())));
1139                action = ABORTED;
1140            } else {
1141                Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray());
1142                action = preparationFunction.apply(p, id, c);
1143                if (!pp.requires.contains(LOGEVT.toString())) {
1144                    // if we are not inheriting parent's log event and there's a log event
1145                    // in the childs context, copy it.
1146                    LogEvent evt = c.get(LOGEVT.toString());
1147                    if (evt != null) {
1148                        LogEvent parentLogEvent = ctx.getLogEvent();
1149                        synchronized (parentLogEvent) {
1150                            parentLogEvent.getPayLoad().addAll(evt.getPayLoad());
1151                        }
1152                        c.remove(LOGEVT.toString());
1153                    }
1154                }
1155                ctx.merge(c.clone(pp.provides.toArray()));
1156            }
1157        } else {
1158            action = preparationFunction.apply(p, id, context);
1159        }
1160        if ((action & PAUSE) == PAUSE) {
1161            var jfrp = new TMEvent.Pause(getName(), id);
1162            jfrp.begin();
1163            action = pauseAndWait(context, action);
1164            jfrp.commit();
1165        }
1166        return action;
1167    }
1168
1169    private void commitOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriConsumer<TransactionParticipant, Long, Serializable> preparationFunction) {
1170        if (context instanceof Context ctx && pp.isConstrained()) {
1171            Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray());
1172            preparationFunction.accept(p, id, c);
1173            ctx.merge(c.clone(pp.provides.toArray()));
1174        } else {
1175            preparationFunction.accept(p, id, context);
1176        }
1177    }
1178
1179    private Serializable recover (ContextRecovery p, long id, Serializable context, ParticipantParams pp, boolean commit) {
1180        var jfr = new TMEvent.Recover("%s:%s".formatted(getName(), p.getClass().getName()), id);
1181        jfr.begin();
1182        try {
1183            if (context instanceof Context ctx && pp.isConstrained()) {
1184                Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray());
1185                Serializable s = p.recover (id, c, commit);
1186                return (s instanceof Context rc) ?
1187                  rc.clone (pp.provides.toArray()) : s;
1188            } else {
1189                return p.recover (id, context, commit);
1190            }
1191        } finally {
1192            jfr.commit();
1193        }
1194    }
1195
1196    private Timers getOrCreateTimers(TransactionParticipant p) {
1197        return Optional.ofNullable(params.get(p)).map(ParticipantParams::timers).orElseGet(() -> {
1198            String participantShortName = Caller.shortClassName(p.getClass().getName());
1199            var mr = getServer().getMeterRegistry();
1200            var tags = Tags.of("name", getName(), "participant", participantShortName);
1201            String realm = (p instanceof LogSource ls) ? ls.getRealm() : null;
1202            tags = tags.and("realm", (realm != null && !realm.isEmpty()) ? realm.trim() : "");
1203
1204            return new Timers(
1205              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare"))),
1206              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare-for-abort"))),
1207              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "commit"))),
1208              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "abort"))),
1209              addTimer(MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "snapshot")))
1210            );
1211        });
1212    }
1213
1214    private Timer addTimer (Timer m) {
1215        meters.add (m);
1216        return m;
1217    }
1218
1219    private UUID getTraceId (long transactionId) {
1220        return new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ transactionId);
1221    }
1222}