001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import org.jpos.jfr.SpaceEvent;
022import org.jpos.util.Loggeable;
023
024import java.io.PrintStream;
025import java.io.Serializable;
026import java.lang.ref.Cleaner;
027import java.util.*;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ScheduledFuture;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.LockSupport;
034import java.util.concurrent.locks.ReentrantLock;
035
036/**
037 * LSpace (Loom-optimized Space) implementation with per-key locking for Virtual Thread efficiency.
038 *
039 * <p>This implementation addresses the thundering herd problem in traditional Space implementations
040 * by using per-key {@link ReentrantLock} and {@link Condition} objects instead of global synchronization.
041 * With Virtual Threads (Project Loom), this prevents thousands of threads from being
042 * unnecessarily woken up when only one is relevant.</p>
043 *
044 * <p>Key features:
045 * <ul>
046 *   <li>Per-key isolation: Each key has its own lock and condition variables</li>
047 *   <li>Targeted wakeups: Only threads waiting on a specific key are signaled</li>
048 *   <li>Virtual Thread optimized: Scales efficiently with thousands of concurrent threads</li>
049 *   <li>Full LocalSpace compatibility: Drop-in replacement with same API and behavior</li>
050 *   <li>JFR instrumentation: All operations emit SpaceEvent for monitoring</li>
051 * </ul>
052 * </p>
053 *
054 * Concurrency notes (core safety invariants):
055 * - Never remove a KeyEntry from entries while threads are waiting on that entry's hasValue Condition,
056 *   otherwise those waiters can be stranded forever (future out() creates a new KeyEntry+Condition).
057 * - Blocking rd()/in() must never return null unless interrupted (or timed out for timed variants).
058 *
059 * @author Alejandro Revilla
060 * @version $Revision$ $Date$
061 * @since 3.0
062 */
063@SuppressWarnings("unchecked")
064public class LSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
065    private final ConcurrentHashMap<K, KeyEntry> entries;
066    private volatile LocalSpace<K, SpaceListener<K,V>> sl;
067    private final ScheduledFuture<?> gcFuture;
068    private final Object[] expLocks = new Object[] { new Object(), new Object() };
069
070    public static final long GCDELAY = 5 * 1000;
071    private static final long GCLONG = 60_000L;
072    private static final long NRD_RESOLUTION = 500L;
073    private static final int MAX_ENTRIES_IN_DUMP = 1000;
074
075    private static final long ONE_MILLION = 1_000_000L; // millis -> nanos
076    private static final long NO_TIMEOUT = -1L;
077
078    private final Set<K>[] expirables;
079    private long lastLongGC = System.nanoTime();
080    private final AtomicBoolean closed = new AtomicBoolean(false);
081    private static final Cleaner CLEANER = Cleaner.create();
082    private final Cleaner.Cleanable cleanable;
083    private final CleaningState cleaningState;
084
085    /**
086     * Per-key synchronization and queue structure.
087     */
088    private static class KeyEntry {
089        final ReentrantLock lock = new ReentrantLock();
090        final Condition hasValue = lock.newCondition();   // signaled when value added
091        final Condition isEmpty = lock.newCondition();    // signaled when queue becomes empty (for nrd)
092        final LinkedList<Object> queue = new LinkedList<>();
093        volatile boolean hasExpirable = false;
094    }
095
096
097    public LSpace() {
098        super();
099        this.entries = new ConcurrentHashMap<>(256);
100        this.expirables = new Set[] {
101          ConcurrentHashMap.newKeySet(),
102          ConcurrentHashMap.newKeySet()
103        };
104        this.gcFuture = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS);
105        this.cleaningState = new CleaningState(gcFuture, entries, expirables);
106        this.cleanable = CLEANER.register(this, cleaningState);
107    }
108
109    // -------------------------
110    // JFR tagging helper (patch)
111    // -------------------------
112    private String jfrTag(Object keyOrTemplate) {
113        if (keyOrTemplate instanceof Template) {
114            Object k = ((Template) keyOrTemplate).getKey();
115            return "" + k;
116        }
117        return "" + keyOrTemplate;
118    }
119
120
121    // ========== Producer (enqueuing) operations ==========
122
123    @FunctionalInterface
124    private interface Enqueuer {
125        void enqueue(KeyEntry entry, Object value);
126    }
127
128    @Override
129    public void out(K key, V value) {
130        out(key, value, NO_TIMEOUT);
131    }
132
133    @Override
134    public void out(K key, V value, long timeout) {
135        enqueueValue("out", key, value, timeout, (ent,v) -> ent.queue.addLast(v));
136    }
137
138    @Override
139    public void push(K key, V value) {
140        push(key, value, NO_TIMEOUT);
141    }
142
143    @Override
144    public void push(K key, V value, long timeout) {
145        enqueueValue("push", key, value, timeout, (ent,v) -> ent.queue.addFirst(v));
146    }
147
148    @Override
149    public void put(K key, V value) {
150        put(key, value, NO_TIMEOUT);
151    }
152
153    @Override
154    public void put(K key, V value, long timeout) {
155        enqueueValue("put", key, value, timeout, (ent,v) -> {
156            ent.queue.clear();
157            ent.queue.addLast(v);
158            ent.hasExpirable = timeout > 0;
159            if (timeout <= 0)
160                unregisterExpirable(key);
161        });
162    }
163
164    /**
165     * Common method for all enqueuing operations (out, push, put, with/out timeout)
166     */
167    private void enqueueValue(String opTag, K key, V value, long timeout, Enqueuer op) {
168        ensureOpen();
169        var jfr = new SpaceEvent(opTag + (timeout > 0 ? ":tim" : ""), "" + key);
170        jfr.begin();
171        try {
172            if (key == null || value == null)
173                throw new NullPointerException("key=" + key + ", value=" + value);
174
175            Object v = value;
176            if (timeout > 0)
177                v = new Expirable(value, System.nanoTime() + (timeout * ONE_MILLION));
178
179            while (true) {
180                KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry());
181
182                entry.lock.lock();
183                try {
184                    if (entries.get(key) != entry) {
185                        continue;
186                    }
187
188                    op.enqueue(entry, v);
189
190                    if (timeout > 0) {
191                        entry.hasExpirable = true;
192                        registerExpirable(key, timeout);
193                    }
194
195                    if (entry.queue.size() == 1) {  // was empty (or became empty after clear)
196                        entry.hasValue.signalAll(); // Wake ALL readers (multiple rd() can read same value)
197                    }
198
199                    break;
200                } finally {
201                    entry.lock.unlock();
202                }
203            }
204
205            if (sl != null)
206                notifyListeners(key, value);
207        } finally {
208            jfr.commit();
209        }
210    }
211
212
213    @Override
214    public V rdp(Object key) {
215        ensureOpen();
216        var jfr = new SpaceEvent("rdp", "" + key);
217        jfr.begin();
218        try {
219            if (key instanceof Template)
220                return (V) getObjectNonBlocking((Template) key, false);
221            return (V) getHeadNonBlocking((K) key, false);
222        } finally {
223            jfr.commit();
224        }
225    }
226
227    @Override
228    public V inp(Object key) {
229        ensureOpen();
230        var jfr = new SpaceEvent("inp", "" + key);
231        jfr.begin();
232        try {
233            if (key instanceof Template)
234                return (V) getObjectNonBlocking((Template) key, true);
235            return (V) getHeadNonBlocking((K) key, true);
236        } finally {
237            jfr.commit();
238        }
239    }
240
241    @Override
242    public V in(Object key) {
243        ensureOpen();
244        String op = key instanceof Template ? "in:tmpl" : "in";
245        var jfr = new SpaceEvent(op, jfrTag(key));
246        jfr.begin();
247        try {
248            if (key instanceof Template)
249                return inTemplate((Template) key);
250            return inKey((K) key);
251        } finally {
252            jfr.commit();
253        }
254    }
255
256    @Override
257    public V in(Object key, long timeout) {
258        ensureOpen();
259        String op = key instanceof Template ? "in:tim:tmpl" : "in:tim";
260        var jfr = new SpaceEvent(op, jfrTag(key));
261        jfr.begin();
262        try {
263            if (key instanceof Template)
264                return inTemplate((Template) key, timeout);
265            return inKey((K) key, timeout);
266        } finally {
267            jfr.commit();
268        }
269    }
270
271    @Override
272    public V rd(Object key) {
273        ensureOpen();
274        String op = key instanceof Template ? "rd:tmpl" : "rd";
275        var jfr = new SpaceEvent(op, jfrTag(key));
276        jfr.begin();
277        try {
278            if (key instanceof Template)
279                return rdTemplate((Template) key);
280            return rdKey((K) key);
281        } finally {
282            jfr.commit();
283        }
284    }
285
286    @Override
287    public V rd(Object key, long timeout) {
288        ensureOpen();
289        String op = key instanceof Template ? "rd:tim:tmpl" : "rd:tim";
290        var jfr = new SpaceEvent(op, jfrTag(key));
291        jfr.begin();
292        try {
293            if (key instanceof Template)
294                return rdTemplate((Template) key, timeout);
295            return rdKey((K) key, timeout);
296        } finally {
297            jfr.commit();
298        }
299    }
300
301    @Override
302    public void nrd(Object key) {
303        ensureOpen();
304        var jfr = new SpaceEvent("nrd", "" + key);
305        jfr.begin();
306        try {
307            K k = (K) key;
308            while (true) {
309                KeyEntry entry = entries.get(k);
310                if (entry == null)
311                    return;
312
313                entry.lock.lock();
314                try {
315                    Object obj = getHead(entry, k, false);
316                    if (obj == null) {
317                        postFetchHousekeeping(k, entry);
318                        return;
319                    }
320                    try {
321                        entry.isEmpty.await(NRD_RESOLUTION, TimeUnit.MILLISECONDS);
322                    } catch (InterruptedException ignored) {
323                        Thread.currentThread().interrupt();
324                        return;
325                    }
326                } finally {
327                    entry.lock.unlock();
328                }
329            }
330        } finally {
331            jfr.commit();
332        }
333    }
334
335    @Override
336    public V nrd(Object key, long timeout) {
337        ensureOpen();
338        var jfr = new SpaceEvent("nrd:tim", "" + key);
339        jfr.begin();
340        try {
341            K k = (K) key;
342            long deadline = System.nanoTime() + timeout * ONE_MILLION;
343
344            while (true) {
345                KeyEntry entry = entries.get(k);
346                if (entry == null)
347                    return null;
348
349                entry.lock.lock();
350                try {
351                    V obj = (V) getHead(entry, k, false);
352                    if (obj == null) {
353                        postFetchHousekeeping(k, entry);
354                        return null;
355                    }
356                    long remaining = deadline - System.nanoTime();
357                    if (remaining <= 0)
358                        return obj;
359
360                    long waitTime = Math.min(NRD_RESOLUTION * ONE_MILLION, remaining);
361                    try {
362                        entry.isEmpty.awaitNanos(waitTime);
363                    } catch (InterruptedException ignored) {
364                        Thread.currentThread().interrupt();
365                        return obj;
366                    }
367                } finally {
368                    entry.lock.unlock();
369                }
370            }
371        } finally {
372            jfr.commit();
373        }
374    }
375
376    @Override
377    public boolean existAny(K[] keys) {
378        ensureOpen();
379        for (K key : keys) {
380            if (rdp(key) != null)
381                return true;
382        }
383        return false;
384    }
385
386    @Override
387    public boolean existAny(K[] keys, long timeout) {
388        ensureOpen();
389        var jfr = new SpaceEvent("existAny:tim", Integer.toString(keys != null ? keys.length : 0));
390        jfr.begin();
391        try {
392            long deadline = System.nanoTime() + timeout * ONE_MILLION;
393            long pollInterval = 10 * ONE_MILLION;
394
395            while (true) {
396                for (K key : keys) {
397                    if (rdp(key) != null)
398                        return true;
399                }
400
401                long remaining = deadline - System.nanoTime();
402                if (remaining <= 0)
403                    return false;
404
405                LockSupport.parkNanos(Math.min(pollInterval, remaining));
406            }
407        } finally {
408            jfr.commit();
409        }
410    }
411
412    @Override
413    public void run() {
414        // Scheduler ticks may race with close(); treat closed as a no-op.
415        if (closed.get())
416            return;
417        try {
418            gc();
419        } catch (Exception e) {
420            e.printStackTrace(); // should never happen
421        }
422    }
423
424    public void gc() {
425        // Avoid work after close if a scheduled tick slips through.
426        if (closed.get())
427            return;
428
429        gc(0);
430        if (System.nanoTime() - lastLongGC > GCLONG * ONE_MILLION) {
431            gc(1);
432            lastLongGC = System.nanoTime();
433        }
434    }
435
436    private void gc(int generation) {
437        // gc() already guards closed; keep gc(int) lean.
438        var jfr = new SpaceEvent("gc", Integer.toString(generation));
439        jfr.begin();
440
441        Set<K> keysToCheck;
442        synchronized (expLocks[generation]) {
443            keysToCheck = new HashSet<>(expirables[generation]);
444            expirables[generation].clear();
445        }
446        for (K key : keysToCheck) {
447            KeyEntry entry = entries.get(key);
448            if (entry == null)
449                continue;
450
451            entry.lock.lock();
452            try {
453                boolean stillHasExpirable = false;
454                boolean sawAnyExpirable = false;
455
456                Iterator<Object> iterator = entry.queue.iterator();
457                while (iterator.hasNext()) {
458                    Object obj = iterator.next();
459                    if (obj instanceof Expirable) {
460                        sawAnyExpirable = true;
461                        Object value = ((Expirable) obj).getValue();
462                        if (value == null) {
463                            iterator.remove();
464                        } else {
465                            stillHasExpirable = true;
466                        }
467                    }
468                }
469
470                entry.hasExpirable = stillHasExpirable;
471
472                if (stillHasExpirable) {
473                    synchronized (expLocks[generation]) {
474                        expirables[generation].add(key);
475                    }
476                    // Queue might have changed (expired items removed), wake any rd/in waiters.
477                    entry.hasValue.signalAll();
478                } else {
479                    // No longer has expirables anywhere; ensure we don't keep the key in either generation set.
480                    if (sawAnyExpirable)
481                        unregisterExpirable(key);
482                }
483
484                // Apply the same safe empty-entry policy used everywhere else.
485                if (entry.queue.isEmpty()) {
486                    postFetchHousekeeping(key, entry);
487                }
488            } finally {
489                entry.lock.unlock();
490            }
491
492            Thread.yield();
493        }
494
495        if (sl != null && sl.getKeySet().isEmpty()) {
496            sl = null;
497        }
498
499        jfr.commit();
500    }
501
502    @Override
503    public int size(Object key) {
504        ensureOpen();
505        var jfr = new SpaceEvent("size", "" + key);
506        jfr.begin();
507
508        int size = 0;
509        KeyEntry entry = entries.get((K) key);
510        if (entry != null) {
511            entry.lock.lock();
512            try {
513                size = entry.queue.size();
514            } finally {
515                entry.lock.unlock();
516            }
517        }
518
519        jfr.commit();
520        return size;
521    }
522
523    @Override
524    public void addListener(Object key, SpaceListener listener) {
525        ensureOpen();
526        getSL().out((K) key, listener);
527    }
528
529    @Override
530    public void addListener(Object key, SpaceListener listener, long timeout) {
531        ensureOpen();
532        getSL().out((K) key, listener, timeout);
533    }
534
535    @Override
536    public void removeListener(Object key, SpaceListener listener) {
537        ensureOpen();
538        if (sl != null) {
539            sl.inp((K) new ObjectTemplate(key, listener));
540        }
541    }
542
543    public boolean isEmpty() {
544        ensureOpen();
545        return entries.isEmpty();
546    }
547
548    @Override
549    public Set<K> getKeySet() {
550        ensureOpen();
551        return new HashSet<>(entries.keySet());
552    }
553
554    public String getKeysAsString() {
555        ensureOpen();
556        StringBuilder sb = new StringBuilder();
557        Object[] keys = entries.keySet().toArray();
558        for (int i = 0; i < keys.length; i++) {
559            if (i > 0)
560                sb.append(' ');
561            sb.append(keys[i]);
562        }
563        return sb.toString();
564    }
565
566    @Override
567    public void dump(PrintStream p, String indent) {
568        ensureOpen();
569        var jfr = new SpaceEvent("dump", "");
570        jfr.begin();
571
572        int size = entries.size();
573        if (size > MAX_ENTRIES_IN_DUMP * 100) {
574            p.printf("%sWARNING - space too big, size=%d%n", indent, size);
575            jfr.commit();
576            return;
577        }
578
579        Object[] keys = entries.keySet().toArray();
580
581        int i = 0;
582        for (Object key : keys) {
583            p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key);
584            if (i++ > MAX_ENTRIES_IN_DUMP) {
585                p.printf("%s...%n", indent);
586                p.printf("%s...%n", indent);
587                break;
588            }
589        }
590        p.printf("%s key-count: %d%n", indent, keys.length);
591
592        int exp0 = expirables[0].size();
593        int exp1 = expirables[1].size();
594        p.printf("%s    gcinfo: %d,%d%n", indent, exp0, exp1);
595
596        jfr.commit();
597    }
598
599    public void notifyListeners(Object key, Object value) {
600        ensureOpen();
601        var jfr = new SpaceEvent("notify", "" + key);
602        jfr.begin();
603        LocalSpace<K, SpaceListener<K,V>> localSl = sl;  // Capture volatile read once
604        if (localSl == null) {
605            jfr.commit();
606            return;
607        }
608        Object[] listeners = null;
609        LSpace<K, SpaceListener<K,V>> lsl = (LSpace<K, SpaceListener<K,V>>) localSl;
610        KeyEntry slEntry = lsl.entries.get((K) key);
611        if (slEntry != null) {
612            slEntry.lock.lock();
613            try {
614                listeners = slEntry.queue.toArray();
615            } finally {
616                slEntry.lock.unlock();
617            }
618        }
619
620        if (listeners != null) {
621            for (Object listener : listeners) {
622                Object o = listener;
623                if (o instanceof Expirable)
624                    o = ((Expirable) o).getValue();
625                if (o instanceof SpaceListener)
626                    ((SpaceListener) o).notify(key, value);
627            }
628        }
629
630        jfr.commit();
631    }
632
633    /**
634     * Non-standard method (required for space replication) - use with care.
635     */
636    public Map getEntries() {
637        ensureOpen();
638        Map<K, List> result = new HashMap<>();
639        for (var e : entries.entrySet()) {
640            KeyEntry entry = e.getValue();
641            entry.lock.lock();
642            try {
643                result.put(e.getKey(), new LinkedList<>(entry.queue));
644            } finally {
645                entry.lock.unlock();
646            }
647        }
648        return result;
649    }
650
651    /**
652     * Non-standard method (required for space replication) - use with care.
653     */
654    public void setEntries(Map entries) {
655        ensureOpen();
656        this.entries.clear();
657        for (var e : (Set<Map.Entry>)entries.entrySet()) {
658            K key =  (K)e.getKey();
659            List<V> list = (List<V>)e.getValue();
660            KeyEntry entry = this.entries.computeIfAbsent(key, k -> new KeyEntry());
661            entry.lock.lock();
662            try {
663                entry.queue.clear();
664                entry.queue.addAll(list);
665                // Conservatively: if replication injects Expirables, caller should also registerExpirable appropriately.
666                // We do not attempt to infer expirables here.
667            } finally {
668                entry.lock.unlock();
669            }
670        }
671    }
672
673    /**
674     * Cancels the periodic GC task so this instance can be garbage-collected.
675     * Safe to call multiple times.
676     */
677    @Override
678    public void close() {
679        if (!closed.compareAndSet(false, true))
680            return;
681
682        if (gcFuture != null) {
683            gcFuture.cancel(false);
684        }
685        // If sl is an LSpace, allow it to release resources as well.
686        LocalSpace<K, SpaceListener<K,V>> s = sl;
687        if (s instanceof LSpace<?,?>) {
688            ((LSpace<?,?>) s).close();
689        }
690        sl = null;
691        entries.clear();
692        expirables[0].clear();
693        expirables[1].clear();
694        cleanable.clean(); // Eager cleanup
695    }
696
697    // ========== Blocking (deduplicated) ==========
698
699    private V inKey(K key) {
700        return awaitValue(key, entry -> getHead(entry, key, true), NO_TIMEOUT);
701    }
702
703    private V inKey(K key, long timeout) {
704        return awaitValue(key, entry -> getHead(entry, key, true), timeout);
705    }
706
707    private V rdKey(K key) {
708        return awaitValue(key, entry -> getHead(entry, key, false), NO_TIMEOUT);
709    }
710
711    private V rdKey(K key, long timeout) {
712        return awaitValue(key, entry -> getHead(entry, key, false), timeout);
713    }
714
715    private V inTemplate(Template tmpl) {
716        K key = (K) tmpl.getKey();
717        return awaitValue(key, entry -> getObject(entry, key, tmpl, true), NO_TIMEOUT);
718    }
719
720    private V inTemplate(Template tmpl, long timeout) {
721        K key = (K) tmpl.getKey();
722        return awaitValue(key, entry -> getObject(entry, key, tmpl, true), timeout);
723    }
724
725    private V rdTemplate(Template tmpl) {
726        K key = (K) tmpl.getKey();
727        return awaitValue(key, entry -> getObject(entry, key, tmpl, false), NO_TIMEOUT);
728    }
729
730    private V rdTemplate(Template tmpl, long timeout) {
731        K key = (K) tmpl.getKey();
732        return awaitValue(key, entry -> getObject(entry, key, tmpl, false), timeout);
733    }
734
735    // ========== Non-blocking helpers ==========
736
737    /**
738     * Get head of queue (non-blocking version for rdp/inp).
739     * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters.
740     */
741    private Object getHeadNonBlocking(K key, boolean remove) {
742        KeyEntry entry = entries.get(key);
743        if (entry == null)
744            return null;
745
746        entry.lock.lock();
747        try {
748            if (entries.get(key) != entry)
749                return null;
750
751            Object result = getHead(entry, key, remove);
752
753            if (remove) {
754                // If remove emptied the queue, postFetchHousekeeping will handle safe removal/signals.
755                postFetchHousekeeping(key, entry);
756            }
757            return result;
758        } finally {
759            entry.lock.unlock();
760        }
761    }
762
763    /**
764     * Get object matching template (non-blocking version for rdp/inp).
765     * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters.
766     */
767    private Object getObjectNonBlocking(Template tmpl, boolean remove) {
768        K key = (K) tmpl.getKey();
769        KeyEntry entry = entries.get(key);
770        if (entry == null)
771            return null;
772
773        entry.lock.lock();
774        try {
775            if (entries.get(key) != entry)
776                return null;
777
778            Object result = getObject(entry, key, tmpl, remove);
779
780            if (remove) {
781                postFetchHousekeeping(key, entry);
782            }
783            return result;
784        } finally {
785            entry.lock.unlock();
786        }
787    }
788
789    /**
790     * Get head of queue.
791     * MUST be called with entry.lock held.
792     */
793    private Object getHead(KeyEntry entry, K key, boolean remove) {
794        Object result = null;
795        boolean wasExpirable = false;
796
797        while (result == null && !entry.queue.isEmpty()) {
798            Object obj = entry.queue.getFirst();
799
800            if (obj instanceof Expirable) {
801                Object value = ((Expirable) obj).getValue();
802                wasExpirable = true;
803
804                if (value == null) {
805                    entry.queue.removeFirst();
806                    continue;
807                } else {
808                    result = value;
809                }
810            } else {
811                result = obj;
812            }
813
814            if (remove && result != null) {
815                entry.queue.removeFirst();
816            }
817        }
818
819        if (entry.queue.isEmpty()) {
820            entry.hasExpirable = false;
821            if (wasExpirable)
822                unregisterExpirable(key);
823        }
824
825        return result;
826    }
827
828    /**
829     * Get object matching template.
830     * MUST be called with entry.lock held.
831     */
832    private Object getObject(KeyEntry entry, K key, Template tmpl, boolean remove) {
833        Object result = null;
834        Iterator<Object> iterator = entry.queue.iterator();
835        boolean wasExpirable = false;
836
837        while (iterator.hasNext()) {
838            Object obj = iterator.next();
839
840            if (obj instanceof Expirable) {
841                Object value = ((Expirable) obj).getValue();
842                if (value == null) {
843                    iterator.remove();
844                    wasExpirable = true;
845                    continue;
846                } else {
847                    obj = value;
848                }
849            }
850
851            if (tmpl.equals(obj)) {
852                result = obj;
853                if (remove)
854                    iterator.remove();
855                break;
856            }
857        }
858
859        if (entry.queue.isEmpty()) {
860            entry.hasExpirable = false;
861            if (wasExpirable)
862                unregisterExpirable(key);
863        }
864
865        return result;
866    }
867
868    private void ensureOpen() {
869        if (closed.get())
870            throw new IllegalStateException("LSpace is closed");
871    }
872
873    // ========== Listener-space helpers ==========
874
875    private LocalSpace<K, SpaceListener<K,V>> getSL() {
876        ensureOpen();
877        if (sl == null) {
878            synchronized (this) {
879                ensureOpen();
880                if (sl == null) {
881                    sl = new LSpace<>();
882                    cleaningState.sl = (AutoCloseable) sl;
883                }
884            }
885        }
886        return sl;
887    }
888
889    private void registerExpirable(K k, long t) {
890        int g = (t > GCLONG) ? 1 : 0;
891        synchronized (expLocks[g]) {
892            expirables[g].add(k);
893        }
894    }
895
896    private void unregisterExpirable(K k) {
897        synchronized (expLocks[0]) {
898            synchronized (expLocks[1]) {
899                expirables[0].remove(k);
900                expirables[1].remove(k);
901            }
902        }
903    }
904
905    // ========== Blocking core (shared) ==========
906
907    @FunctionalInterface
908    private interface Fetcher {
909        Object fetch(KeyEntry entry);
910    }
911
912    /**
913     * Common blocking wait-loop for rd/in operations (key or template).
914     *
915     * Ensures:
916     * - No premature null returns due to entry replacement (retries outer loop).
917     * - Timed variants remove empty computeIfAbsent-created entries on timeout/interrupt (postFetchHousekeeping).
918     * - Does not orphan waiters because postFetchHousekeeping refuses to remove if hasValue waiters exist.
919     */
920    @SuppressWarnings("unchecked")
921    private V awaitValue(K key, Fetcher fetcher, long timeoutMillis) {
922        ensureOpen();
923
924        final boolean timed = timeoutMillis != NO_TIMEOUT;
925        final long deadlineNanos = timed ? System.nanoTime() + timeoutMillis * ONE_MILLION : 0L;
926
927        for (;;) {
928            final KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry());
929
930            entry.lock.lock();
931            try {
932                if (entries.get(key) != entry)
933                    continue;
934
935                for (;;) {
936                    Object obj = fetcher.fetch(entry);
937                    if (obj != null) {
938                        postFetchHousekeeping(key, entry);
939                        return (V) obj;
940                    }
941
942                    if (!timed) {
943                        try {
944                            entry.hasValue.await();
945                        } catch (InterruptedException ie) {
946                            Thread.currentThread().interrupt();
947                            // Avoid leaking an empty entry created by computeIfAbsent for a waiter that got interrupted.
948                            postFetchHousekeeping(key, entry);
949                            break;
950                        }
951                    } else {
952                        try {
953                            long remaining = entry.hasValue.awaitNanos(deadlineNanos - System.nanoTime());
954                            if (remaining <= 0) {
955                                // Avoid leaking empty entries created by computeIfAbsent when timing out.
956                                postFetchHousekeeping(key, entry);
957                                return null;
958                            }
959                        } catch (InterruptedException ie) {
960                            Thread.currentThread().interrupt();
961                            postFetchHousekeeping(key, entry);
962                            return null;
963                        }
964                    }
965
966                    // If someone removed/replaced the entry, restart outer loop to bind to the current entry.
967                    if (entries.get(key) != entry)
968                        break;
969                } // inner loop
970            } finally {
971                entry.lock.unlock();
972            }
973        } // outer loop
974    }
975
976    /**
977     * Housekeeping that must run under entry.lock:
978     * - Wake nrd waiters when queue becomes empty.
979     * - Remove entry only if queue is empty AND no waiters are parked on hasValue (prevents orphaning).
980     */
981    private void postFetchHousekeeping(K key, KeyEntry entry) {
982        if (!entry.queue.isEmpty())
983            return;
984
985        // Always wake nrd waiters when empty.
986        entry.isEmpty.signalAll();
987
988        // Remove only when safe (no hasValue waiters).
989        if (entries.get(key) == entry && !entry.lock.hasWaiters(entry.hasValue)) {
990            entries.remove(key, entry);
991        }
992    }
993
994    /**
995     * Expirable wrapper for values with timeout.
996     */
997    static class Expirable implements Comparable, Serializable {
998        private static final long serialVersionUID = 0xA7F22BF5;
999
1000        Object value;
1001        long expires;
1002
1003        Expirable(Object value, long expires) {
1004            super();
1005            this.value = value;
1006            this.expires = expires;
1007        }
1008
1009        boolean isExpired() {
1010            return (System.nanoTime() - expires) > 0;
1011        }
1012
1013        @Override
1014        public String toString() {
1015            return getClass().getName()
1016              + "@" + Integer.toHexString(hashCode())
1017              + ",value=" + value.toString()
1018              + ",expired=" + isExpired();
1019        }
1020
1021        Object getValue() {
1022            return isExpired() ? null : value;
1023        }
1024
1025        @Override
1026        public int compareTo(Object other) {
1027            long diff = this.expires - ((Expirable) other).expires;
1028            return diff > 0 ? 1 : diff < 0 ? -1 : 0;
1029        }
1030    }
1031
1032    private static final class CleaningState implements Runnable {
1033        private final ScheduledFuture<?> gcFuture;
1034        private final ConcurrentHashMap<?,?> entries;
1035        private final Set<?>[] expirables;
1036
1037        // We keep a reference to sl so we can cancel its scheduler too.
1038        // This does not introduce a new retention path; it already hangs off the parent space.
1039        private volatile AutoCloseable sl; // store as AutoCloseable to avoid generics pain
1040
1041        private final AtomicBoolean cleaned = new AtomicBoolean(false);
1042
1043        private CleaningState(ScheduledFuture<?> gcFuture,
1044                              ConcurrentHashMap<?,?> entries,
1045                              Set<?>[] expirables) {
1046            this.gcFuture = gcFuture;
1047            this.entries = entries;
1048            this.expirables = expirables;
1049        }
1050
1051        @Override
1052        public void run() {
1053            if (!cleaned.compareAndSet(false, true))
1054                return;
1055
1056            try {
1057                if (gcFuture != null)
1058                    gcFuture.cancel(false);
1059            } catch (Throwable ignored) { }
1060
1061            // Best-effort close of the listener space.
1062            AutoCloseable s = sl;
1063            if (s != null) {
1064                try {
1065                    s.close();
1066                } catch (Throwable ignored) { }
1067                sl = null;
1068            }
1069
1070            try {
1071                entries.clear();
1072            } catch (Throwable ignored) { }
1073
1074            try {
1075                expirables[0].clear();
1076            } catch (Throwable ignored) { }
1077            try {
1078                expirables[1].clear();
1079            } catch (Throwable ignored) { }
1080        }
1081    }
1082
1083    // =========================
1084    // Test-only visibility hooks
1085    // =========================
1086    // Package-private on purpose (same package as tests).
1087    // These methods are intended strictly for unit tests that validate internal invariants.
1088    boolean isExpirableTrackedForTest(K key) {
1089        synchronized (expLocks[0]) {
1090            synchronized (expLocks[1]) {
1091                return expirables[0].contains(key) || expirables[1].contains(key);
1092            }
1093        }
1094    }
1095
1096    boolean isExpirableTrackedForTest(K key, int generation) {
1097        synchronized (expLocks[generation]) {
1098            return expirables[generation].contains(key);
1099        }
1100    }
1101
1102    void forceTrackExpirableForTest(K key, int generation) {
1103        synchronized (expLocks[generation]) {
1104            expirables[generation].add(key);
1105        }
1106    }
1107}