001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import org.jpos.jfr.SpaceEvent;
022import org.jpos.util.Loggeable;
023
024import java.io.PrintStream;
025import java.io.Serializable;
026import java.lang.ref.Cleaner;
027import java.util.*;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ScheduledFuture;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.locks.Condition;
033import java.util.concurrent.locks.LockSupport;
034import java.util.concurrent.locks.ReentrantLock;
035
036/**
037 * LSpace (Loom-optimized Space) implementation with per-key locking for Virtual Thread efficiency.
038 *
039 * <p>This implementation addresses the thundering herd problem in traditional Space implementations
040 * by using per-key {@link ReentrantLock} and {@link Condition} objects instead of global synchronization.
041 * With Virtual Threads (Project Loom), this prevents thousands of threads from being
042 * unnecessarily woken up when only one is relevant.</p>
043 *
044 * <p>Key features:
045 * <ul>
046 *   <li>Per-key isolation: Each key has its own lock and condition variables</li>
047 *   <li>Targeted wakeups: Only threads waiting on a specific key are signaled</li>
048 *   <li>Virtual Thread optimized: Scales efficiently with thousands of concurrent threads</li>
049 *   <li>Full LocalSpace compatibility: Drop-in replacement with same API and behavior</li>
050 *   <li>JFR instrumentation: All operations emit SpaceEvent for monitoring</li>
051 * </ul>
052 *
053 * Concurrency notes (core safety invariants):
054 * - Never remove a KeyEntry from entries while threads are waiting on that entry's hasValue Condition,
055 *   otherwise those waiters can be stranded forever (future out() creates a new KeyEntry+Condition).
056 * - Blocking rd()/in() must never return null unless interrupted (or timed out for timed variants).
057 *
058 * @author Alejandro Revilla
059 * @version $Revision$ $Date$
060 * @since 3.0
061
062 * @param <K> the key type
063 * @param <V> the value type
064 */
065@SuppressWarnings("unchecked")
066public class LSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
067    private final ConcurrentHashMap<K, KeyEntry> entries;
068    private volatile LocalSpace<K, SpaceListener<K,V>> sl;
069    private final ScheduledFuture<?> gcFuture;
070    private final Object[] expLocks = new Object[] { new Object(), new Object() };
071
072    /** GC sweep delay in milliseconds. */
073    public static final long GCDELAY = 5 * 1000;
074    private static final long GCLONG = 60_000L;
075    private static final long NRD_RESOLUTION = 500L;
076    private static final int MAX_ENTRIES_IN_DUMP = 1000;
077
078    private static final long ONE_MILLION = 1_000_000L; // millis -> nanos
079    private static final long NO_TIMEOUT = -1L;
080
081    private final Set<K>[] expirables;
082    private long lastLongGC = System.nanoTime();
083    private final AtomicBoolean closed = new AtomicBoolean(false);
084    private static final Cleaner CLEANER = Cleaner.create();
085    private final Cleaner.Cleanable cleanable;
086    private final CleaningState cleaningState;
087
088    /**
089     * Per-key synchronization and queue structure.
090     */
091    private static class KeyEntry {
092        final ReentrantLock lock = new ReentrantLock();
093        final Condition hasValue = lock.newCondition();   // signaled when value added
094        final Condition isEmpty = lock.newCondition();    // signaled when queue becomes empty (for nrd)
095        final LinkedList<Object> queue = new LinkedList<>();
096        volatile boolean hasExpirable = false;
097    }
098
099
100    /** Default constructor. */
101    public LSpace() {
102        super();
103        this.entries = new ConcurrentHashMap<>(256);
104        this.expirables = new Set[] {
105          ConcurrentHashMap.newKeySet(),
106          ConcurrentHashMap.newKeySet()
107        };
108        this.gcFuture = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS);
109        this.cleaningState = new CleaningState(gcFuture, entries, expirables);
110        this.cleanable = CLEANER.register(this, cleaningState);
111    }
112
113    // -------------------------
114    // JFR tagging helper (patch)
115    // -------------------------
116    private String jfrTag(Object keyOrTemplate) {
117        if (keyOrTemplate instanceof Template) {
118            Object k = ((Template) keyOrTemplate).getKey();
119            return "" + k;
120        }
121        return "" + keyOrTemplate;
122    }
123
124
125    // ========== Producer (enqueuing) operations ==========
126
127    @FunctionalInterface
128    private interface Enqueuer {
129        void enqueue(KeyEntry entry, Object value);
130    }
131
132    @Override
133    public void out(K key, V value) {
134        out(key, value, NO_TIMEOUT);
135    }
136
137    @Override
138    public void out(K key, V value, long timeout) {
139        enqueueValue("out", key, value, timeout, (ent,v) -> ent.queue.addLast(v));
140    }
141
142    @Override
143    public void push(K key, V value) {
144        push(key, value, NO_TIMEOUT);
145    }
146
147    @Override
148    public void push(K key, V value, long timeout) {
149        enqueueValue("push", key, value, timeout, (ent,v) -> ent.queue.addFirst(v));
150    }
151
152    @Override
153    public void put(K key, V value) {
154        put(key, value, NO_TIMEOUT);
155    }
156
157    @Override
158    public void put(K key, V value, long timeout) {
159        enqueueValue("put", key, value, timeout, (ent,v) -> {
160            ent.queue.clear();
161            ent.queue.addLast(v);
162            ent.hasExpirable = timeout > 0;
163            if (timeout <= 0)
164                unregisterExpirable(key);
165        });
166    }
167
168    /**
169     * Common method for all enqueuing operations (out, push, put, with/out timeout)
170     */
171    private void enqueueValue(String opTag, K key, V value, long timeout, Enqueuer op) {
172        ensureOpen();
173        var jfr = new SpaceEvent(opTag + (timeout > 0 ? ":tim" : ""), "" + key);
174        jfr.begin();
175        try {
176            if (key == null || value == null)
177                throw new NullPointerException("key=" + key + ", value=" + value);
178
179            Object v = value;
180            if (timeout > 0)
181                v = new Expirable(value, System.nanoTime() + (timeout * ONE_MILLION));
182
183            while (true) {
184                KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry());
185
186                entry.lock.lock();
187                try {
188                    if (entries.get(key) != entry) {
189                        continue;
190                    }
191
192                    op.enqueue(entry, v);
193
194                    if (timeout > 0) {
195                        entry.hasExpirable = true;
196                        registerExpirable(key, timeout);
197                    }
198
199                    if (entry.queue.size() == 1) {  // was empty (or became empty after clear)
200                        entry.hasValue.signalAll(); // Wake ALL readers (multiple rd() can read same value)
201                    }
202
203                    break;
204                } finally {
205                    entry.lock.unlock();
206                }
207            }
208
209            if (sl != null)
210                notifyListeners(key, value);
211        } finally {
212            jfr.commit();
213        }
214    }
215
216
217    @Override
218    public V rdp(Object key) {
219        ensureOpen();
220        var jfr = new SpaceEvent("rdp", "" + key);
221        jfr.begin();
222        try {
223            if (key instanceof Template)
224                return (V) getObjectNonBlocking((Template) key, false);
225            return (V) getHeadNonBlocking((K) key, false);
226        } finally {
227            jfr.commit();
228        }
229    }
230
231    @Override
232    public V inp(Object key) {
233        ensureOpen();
234        var jfr = new SpaceEvent("inp", "" + key);
235        jfr.begin();
236        try {
237            if (key instanceof Template)
238                return (V) getObjectNonBlocking((Template) key, true);
239            return (V) getHeadNonBlocking((K) key, true);
240        } finally {
241            jfr.commit();
242        }
243    }
244
245    @Override
246    public V in(Object key) {
247        ensureOpen();
248        String op = key instanceof Template ? "in:tmpl" : "in";
249        var jfr = new SpaceEvent(op, jfrTag(key));
250        jfr.begin();
251        try {
252            if (key instanceof Template)
253                return inTemplate((Template) key);
254            return inKey((K) key);
255        } finally {
256            jfr.commit();
257        }
258    }
259
260    @Override
261    public V in(Object key, long timeout) {
262        ensureOpen();
263        String op = key instanceof Template ? "in:tim:tmpl" : "in:tim";
264        var jfr = new SpaceEvent(op, jfrTag(key));
265        jfr.begin();
266        try {
267            if (key instanceof Template)
268                return inTemplate((Template) key, timeout);
269            return inKey((K) key, timeout);
270        } finally {
271            jfr.commit();
272        }
273    }
274
275    @Override
276    public V rd(Object key) {
277        ensureOpen();
278        String op = key instanceof Template ? "rd:tmpl" : "rd";
279        var jfr = new SpaceEvent(op, jfrTag(key));
280        jfr.begin();
281        try {
282            if (key instanceof Template)
283                return rdTemplate((Template) key);
284            return rdKey((K) key);
285        } finally {
286            jfr.commit();
287        }
288    }
289
290    @Override
291    public V rd(Object key, long timeout) {
292        ensureOpen();
293        String op = key instanceof Template ? "rd:tim:tmpl" : "rd:tim";
294        var jfr = new SpaceEvent(op, jfrTag(key));
295        jfr.begin();
296        try {
297            if (key instanceof Template)
298                return rdTemplate((Template) key, timeout);
299            return rdKey((K) key, timeout);
300        } finally {
301            jfr.commit();
302        }
303    }
304
305    @Override
306    public void nrd(Object key) {
307        ensureOpen();
308        var jfr = new SpaceEvent("nrd", "" + key);
309        jfr.begin();
310        try {
311            K k = (K) key;
312            while (true) {
313                KeyEntry entry = entries.get(k);
314                if (entry == null)
315                    return;
316
317                entry.lock.lock();
318                try {
319                    Object obj = getHead(entry, k, false);
320                    if (obj == null) {
321                        postFetchHousekeeping(k, entry);
322                        return;
323                    }
324                    try {
325                        entry.isEmpty.await(NRD_RESOLUTION, TimeUnit.MILLISECONDS);
326                    } catch (InterruptedException ignored) {
327                        Thread.currentThread().interrupt();
328                        return;
329                    }
330                } finally {
331                    entry.lock.unlock();
332                }
333            }
334        } finally {
335            jfr.commit();
336        }
337    }
338
339    @Override
340    public V nrd(Object key, long timeout) {
341        ensureOpen();
342        var jfr = new SpaceEvent("nrd:tim", "" + key);
343        jfr.begin();
344        try {
345            K k = (K) key;
346            long deadline = System.nanoTime() + timeout * ONE_MILLION;
347
348            while (true) {
349                KeyEntry entry = entries.get(k);
350                if (entry == null)
351                    return null;
352
353                entry.lock.lock();
354                try {
355                    V obj = (V) getHead(entry, k, false);
356                    if (obj == null) {
357                        postFetchHousekeeping(k, entry);
358                        return null;
359                    }
360                    long remaining = deadline - System.nanoTime();
361                    if (remaining <= 0)
362                        return obj;
363
364                    long waitTime = Math.min(NRD_RESOLUTION * ONE_MILLION, remaining);
365                    try {
366                        entry.isEmpty.awaitNanos(waitTime);
367                    } catch (InterruptedException ignored) {
368                        Thread.currentThread().interrupt();
369                        return obj;
370                    }
371                } finally {
372                    entry.lock.unlock();
373                }
374            }
375        } finally {
376            jfr.commit();
377        }
378    }
379
380    @Override
381    public boolean existAny(K[] keys) {
382        ensureOpen();
383        for (K key : keys) {
384            if (rdp(key) != null)
385                return true;
386        }
387        return false;
388    }
389
390    @Override
391    public boolean existAny(K[] keys, long timeout) {
392        ensureOpen();
393        var jfr = new SpaceEvent("existAny:tim", Integer.toString(keys != null ? keys.length : 0));
394        jfr.begin();
395        try {
396            long deadline = System.nanoTime() + timeout * ONE_MILLION;
397            long pollInterval = 10 * ONE_MILLION;
398
399            while (true) {
400                for (K key : keys) {
401                    if (rdp(key) != null)
402                        return true;
403                }
404
405                long remaining = deadline - System.nanoTime();
406                if (remaining <= 0)
407                    return false;
408
409                LockSupport.parkNanos(Math.min(pollInterval, remaining));
410            }
411        } finally {
412            jfr.commit();
413        }
414    }
415
416    @Override
417    public void run() {
418        // Scheduler ticks may race with close(); treat closed as a no-op.
419        if (closed.get())
420            return;
421        try {
422            gc();
423        } catch (Exception e) {
424            e.printStackTrace(); // should never happen
425        }
426    }
427
428    /** Runs a garbage-collection sweep to remove expired space entries. */
429    public void gc() {
430        // Avoid work after close if a scheduled tick slips through.
431        if (closed.get())
432            return;
433
434        gc(0);
435        if (System.nanoTime() - lastLongGC > GCLONG * ONE_MILLION) {
436            gc(1);
437            lastLongGC = System.nanoTime();
438        }
439    }
440
441    private void gc(int generation) {
442        // gc() already guards closed; keep gc(int) lean.
443        var jfr = new SpaceEvent("gc", Integer.toString(generation));
444        jfr.begin();
445
446        Set<K> keysToCheck;
447        synchronized (expLocks[generation]) {
448            keysToCheck = new HashSet<>(expirables[generation]);
449            expirables[generation].clear();
450        }
451        for (K key : keysToCheck) {
452            KeyEntry entry = entries.get(key);
453            if (entry == null)
454                continue;
455
456            entry.lock.lock();
457            try {
458                boolean stillHasExpirable = false;
459                boolean sawAnyExpirable = false;
460
461                Iterator<Object> iterator = entry.queue.iterator();
462                while (iterator.hasNext()) {
463                    Object obj = iterator.next();
464                    if (obj instanceof Expirable) {
465                        sawAnyExpirable = true;
466                        Object value = ((Expirable) obj).getValue();
467                        if (value == null) {
468                            iterator.remove();
469                        } else {
470                            stillHasExpirable = true;
471                        }
472                    }
473                }
474
475                entry.hasExpirable = stillHasExpirable;
476
477                if (stillHasExpirable) {
478                    synchronized (expLocks[generation]) {
479                        expirables[generation].add(key);
480                    }
481                    // Queue might have changed (expired items removed), wake any rd/in waiters.
482                    entry.hasValue.signalAll();
483                } else {
484                    // No longer has expirables anywhere; ensure we don't keep the key in either generation set.
485                    if (sawAnyExpirable)
486                        unregisterExpirable(key);
487                }
488
489                // Apply the same safe empty-entry policy used everywhere else.
490                if (entry.queue.isEmpty()) {
491                    postFetchHousekeeping(key, entry);
492                }
493            } finally {
494                entry.lock.unlock();
495            }
496
497            Thread.yield();
498        }
499
500        if (sl != null && sl.getKeySet().isEmpty()) {
501            sl = null;
502        }
503
504        jfr.commit();
505    }
506
507    @Override
508    public int size(Object key) {
509        ensureOpen();
510        var jfr = new SpaceEvent("size", "" + key);
511        jfr.begin();
512
513        int size = 0;
514        KeyEntry entry = entries.get((K) key);
515        if (entry != null) {
516            entry.lock.lock();
517            try {
518                size = entry.queue.size();
519            } finally {
520                entry.lock.unlock();
521            }
522        }
523
524        jfr.commit();
525        return size;
526    }
527
528    @Override
529    public void addListener(Object key, SpaceListener listener) {
530        ensureOpen();
531        getSL().out((K) key, listener);
532    }
533
534    @Override
535    public void addListener(Object key, SpaceListener listener, long timeout) {
536        ensureOpen();
537        getSL().out((K) key, listener, timeout);
538    }
539
540    @Override
541    public void removeListener(Object key, SpaceListener listener) {
542        ensureOpen();
543        if (sl != null) {
544            sl.inp((K) new ObjectTemplate(key, listener));
545        }
546    }
547
548    /** Returns true if this space contains no entries.
549     * @return true if empty
550     */
551    public boolean isEmpty() {
552        ensureOpen();
553        return entries.isEmpty();
554    }
555
556    @Override
557    public Set<K> getKeySet() {
558        ensureOpen();
559        return new HashSet<>(entries.keySet());
560    }
561
562    /** Returns all current keys as a space-separated string.
563     * @return space-separated key list
564     */
565    public String getKeysAsString() {
566        ensureOpen();
567        StringBuilder sb = new StringBuilder();
568        Object[] keys = entries.keySet().toArray();
569        for (int i = 0; i < keys.length; i++) {
570            if (i > 0)
571                sb.append(' ');
572            sb.append(keys[i]);
573        }
574        return sb.toString();
575    }
576
577    @Override
578    public void dump(PrintStream p, String indent) {
579        ensureOpen();
580        var jfr = new SpaceEvent("dump", "");
581        jfr.begin();
582
583        int size = entries.size();
584        if (size > MAX_ENTRIES_IN_DUMP * 100) {
585            p.printf("%sWARNING - space too big, size=%d%n", indent, size);
586            jfr.commit();
587            return;
588        }
589
590        Object[] keys = entries.keySet().toArray();
591
592        int i = 0;
593        for (Object key : keys) {
594            p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key);
595            if (i++ > MAX_ENTRIES_IN_DUMP) {
596                p.printf("%s...%n", indent);
597                p.printf("%s...%n", indent);
598                break;
599            }
600        }
601        p.printf("%s key-count: %d%n", indent, keys.length);
602
603        int exp0 = expirables[0].size();
604        int exp1 = expirables[1].size();
605        p.printf("%s    gcinfo: %d,%d%n", indent, exp0, exp1);
606
607        jfr.commit();
608    }
609
610    /** Notifies all registered listeners for the given key/value pair.
611     * @param key the space key
612     * @param value the new value
613     */
614    public void notifyListeners(Object key, Object value) {
615        ensureOpen();
616        var jfr = new SpaceEvent("notify", "" + key);
617        jfr.begin();
618        LocalSpace<K, SpaceListener<K,V>> localSl = sl;  // Capture volatile read once
619        if (localSl == null) {
620            jfr.commit();
621            return;
622        }
623        Object[] listeners = null;
624        LSpace<K, SpaceListener<K,V>> lsl = (LSpace<K, SpaceListener<K,V>>) localSl;
625        KeyEntry slEntry = lsl.entries.get((K) key);
626        if (slEntry != null) {
627            slEntry.lock.lock();
628            try {
629                listeners = slEntry.queue.toArray();
630            } finally {
631                slEntry.lock.unlock();
632            }
633        }
634
635        if (listeners != null) {
636            for (Object listener : listeners) {
637                Object o = listener;
638                if (o instanceof Expirable)
639                    o = ((Expirable) o).getValue();
640                if (o instanceof SpaceListener)
641                    ((SpaceListener) o).notify(key, value);
642            }
643        }
644
645        jfr.commit();
646    }
647
648    /**
649     * Non-standard method (required for space replication) - use with care.
650     * @return snapshot map of all entries
651     */
652    public Map getEntries() {
653        ensureOpen();
654        Map<K, List> result = new HashMap<>();
655        for (var e : entries.entrySet()) {
656            KeyEntry entry = e.getValue();
657            entry.lock.lock();
658            try {
659                result.put(e.getKey(), new LinkedList<>(entry.queue));
660            } finally {
661                entry.lock.unlock();
662            }
663        }
664        return result;
665    }
666
667    /**
668     * Non-standard method (required for space replication) - use with care.
669     * @param entries the entries map to load into this space
670     */
671    public void setEntries(Map entries) {
672        ensureOpen();
673        this.entries.clear();
674        for (var e : (Set<Map.Entry>)entries.entrySet()) {
675            K key =  (K)e.getKey();
676            List<V> list = (List<V>)e.getValue();
677            KeyEntry entry = this.entries.computeIfAbsent(key, k -> new KeyEntry());
678            entry.lock.lock();
679            try {
680                entry.queue.clear();
681                entry.queue.addAll(list);
682                // Conservatively: if replication injects Expirables, caller should also registerExpirable appropriately.
683                // We do not attempt to infer expirables here.
684            } finally {
685                entry.lock.unlock();
686            }
687        }
688    }
689
690    /**
691     * Cancels the periodic GC task so this instance can be garbage-collected.
692     * Safe to call multiple times.
693     */
694    @Override
695    public void close() {
696        if (!closed.compareAndSet(false, true))
697            return;
698
699        if (gcFuture != null) {
700            gcFuture.cancel(false);
701        }
702        // If sl is an LSpace, allow it to release resources as well.
703        LocalSpace<K, SpaceListener<K,V>> s = sl;
704        if (s instanceof LSpace<?,?>) {
705            ((LSpace<?,?>) s).close();
706        }
707        sl = null;
708        entries.clear();
709        expirables[0].clear();
710        expirables[1].clear();
711        cleanable.clean(); // Eager cleanup
712    }
713
714    // ========== Blocking (deduplicated) ==========
715
716    private V inKey(K key) {
717        return awaitValue(key, entry -> getHead(entry, key, true), NO_TIMEOUT);
718    }
719
720    private V inKey(K key, long timeout) {
721        return awaitValue(key, entry -> getHead(entry, key, true), timeout);
722    }
723
724    private V rdKey(K key) {
725        return awaitValue(key, entry -> getHead(entry, key, false), NO_TIMEOUT);
726    }
727
728    private V rdKey(K key, long timeout) {
729        return awaitValue(key, entry -> getHead(entry, key, false), timeout);
730    }
731
732    private V inTemplate(Template tmpl) {
733        K key = (K) tmpl.getKey();
734        return awaitValue(key, entry -> getObject(entry, key, tmpl, true), NO_TIMEOUT);
735    }
736
737    private V inTemplate(Template tmpl, long timeout) {
738        K key = (K) tmpl.getKey();
739        return awaitValue(key, entry -> getObject(entry, key, tmpl, true), timeout);
740    }
741
742    private V rdTemplate(Template tmpl) {
743        K key = (K) tmpl.getKey();
744        return awaitValue(key, entry -> getObject(entry, key, tmpl, false), NO_TIMEOUT);
745    }
746
747    private V rdTemplate(Template tmpl, long timeout) {
748        K key = (K) tmpl.getKey();
749        return awaitValue(key, entry -> getObject(entry, key, tmpl, false), timeout);
750    }
751
752    // ========== Non-blocking helpers ==========
753
754    /**
755     * Get head of queue (non-blocking version for rdp/inp).
756     * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters.
757     */
758    private Object getHeadNonBlocking(K key, boolean remove) {
759        KeyEntry entry = entries.get(key);
760        if (entry == null)
761            return null;
762
763        entry.lock.lock();
764        try {
765            if (entries.get(key) != entry)
766                return null;
767
768            Object result = getHead(entry, key, remove);
769
770            if (remove) {
771                // If remove emptied the queue, postFetchHousekeeping will handle safe removal/signals.
772                postFetchHousekeeping(key, entry);
773            }
774            return result;
775        } finally {
776            entry.lock.unlock();
777        }
778    }
779
780    /**
781     * Get object matching template (non-blocking version for rdp/inp).
782     * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters.
783     */
784    private Object getObjectNonBlocking(Template tmpl, boolean remove) {
785        K key = (K) tmpl.getKey();
786        KeyEntry entry = entries.get(key);
787        if (entry == null)
788            return null;
789
790        entry.lock.lock();
791        try {
792            if (entries.get(key) != entry)
793                return null;
794
795            Object result = getObject(entry, key, tmpl, remove);
796
797            if (remove) {
798                postFetchHousekeeping(key, entry);
799            }
800            return result;
801        } finally {
802            entry.lock.unlock();
803        }
804    }
805
806    /**
807     * Get head of queue.
808     * MUST be called with entry.lock held.
809     */
810    private Object getHead(KeyEntry entry, K key, boolean remove) {
811        Object result = null;
812        boolean wasExpirable = false;
813
814        while (result == null && !entry.queue.isEmpty()) {
815            Object obj = entry.queue.getFirst();
816
817            if (obj instanceof Expirable) {
818                Object value = ((Expirable) obj).getValue();
819                wasExpirable = true;
820
821                if (value == null) {
822                    entry.queue.removeFirst();
823                    continue;
824                } else {
825                    result = value;
826                }
827            } else {
828                result = obj;
829            }
830
831            if (remove && result != null) {
832                entry.queue.removeFirst();
833            }
834        }
835
836        if (entry.queue.isEmpty()) {
837            entry.hasExpirable = false;
838            if (wasExpirable)
839                unregisterExpirable(key);
840        }
841
842        return result;
843    }
844
845    /**
846     * Get object matching template.
847     * MUST be called with entry.lock held.
848     */
849    private Object getObject(KeyEntry entry, K key, Template tmpl, boolean remove) {
850        Object result = null;
851        Iterator<Object> iterator = entry.queue.iterator();
852        boolean wasExpirable = false;
853
854        while (iterator.hasNext()) {
855            Object obj = iterator.next();
856
857            if (obj instanceof Expirable) {
858                Object value = ((Expirable) obj).getValue();
859                if (value == null) {
860                    iterator.remove();
861                    wasExpirable = true;
862                    continue;
863                } else {
864                    obj = value;
865                }
866            }
867
868            if (tmpl.equals(obj)) {
869                result = obj;
870                if (remove)
871                    iterator.remove();
872                break;
873            }
874        }
875
876        if (entry.queue.isEmpty()) {
877            entry.hasExpirable = false;
878            if (wasExpirable)
879                unregisterExpirable(key);
880        }
881
882        return result;
883    }
884
885    private void ensureOpen() {
886        if (closed.get())
887            throw new IllegalStateException("LSpace is closed");
888    }
889
890    // ========== Listener-space helpers ==========
891
892    private LocalSpace<K, SpaceListener<K,V>> getSL() {
893        ensureOpen();
894        if (sl == null) {
895            synchronized (this) {
896                ensureOpen();
897                if (sl == null) {
898                    sl = new LSpace<>();
899                    cleaningState.sl = (AutoCloseable) sl;
900                }
901            }
902        }
903        return sl;
904    }
905
906    private void registerExpirable(K k, long t) {
907        int g = (t > GCLONG) ? 1 : 0;
908        synchronized (expLocks[g]) {
909            expirables[g].add(k);
910        }
911    }
912
913    private void unregisterExpirable(K k) {
914        synchronized (expLocks[0]) {
915            synchronized (expLocks[1]) {
916                expirables[0].remove(k);
917                expirables[1].remove(k);
918            }
919        }
920    }
921
922    // ========== Blocking core (shared) ==========
923
924    @FunctionalInterface
925    private interface Fetcher {
926        Object fetch(KeyEntry entry);
927    }
928
929    /**
930     * Common blocking wait-loop for rd/in operations (key or template).
931     *
932     * Ensures:
933     * - No premature null returns due to entry replacement (retries outer loop).
934     * - Timed variants remove empty computeIfAbsent-created entries on timeout/interrupt (postFetchHousekeeping).
935     * - Does not orphan waiters because postFetchHousekeeping refuses to remove if hasValue waiters exist.
936     */
937    @SuppressWarnings("unchecked")
938    private V awaitValue(K key, Fetcher fetcher, long timeoutMillis) {
939        ensureOpen();
940
941        final boolean timed = timeoutMillis != NO_TIMEOUT;
942        final long deadlineNanos = timed ? System.nanoTime() + timeoutMillis * ONE_MILLION : 0L;
943
944        for (;;) {
945            final KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry());
946
947            entry.lock.lock();
948            try {
949                if (entries.get(key) != entry)
950                    continue;
951
952                for (;;) {
953                    Object obj = fetcher.fetch(entry);
954                    if (obj != null) {
955                        postFetchHousekeeping(key, entry);
956                        return (V) obj;
957                    }
958
959                    if (!timed) {
960                        try {
961                            entry.hasValue.await();
962                        } catch (InterruptedException ie) {
963                            Thread.currentThread().interrupt();
964                            // Avoid leaking an empty entry created by computeIfAbsent for a waiter that got interrupted.
965                            postFetchHousekeeping(key, entry);
966                            break;
967                        }
968                    } else {
969                        try {
970                            long remaining = entry.hasValue.awaitNanos(deadlineNanos - System.nanoTime());
971                            if (remaining <= 0) {
972                                // Avoid leaking empty entries created by computeIfAbsent when timing out.
973                                postFetchHousekeeping(key, entry);
974                                return null;
975                            }
976                        } catch (InterruptedException ie) {
977                            Thread.currentThread().interrupt();
978                            postFetchHousekeeping(key, entry);
979                            return null;
980                        }
981                    }
982
983                    // If someone removed/replaced the entry, restart outer loop to bind to the current entry.
984                    if (entries.get(key) != entry)
985                        break;
986                } // inner loop
987            } finally {
988                entry.lock.unlock();
989            }
990        } // outer loop
991    }
992
993    /**
994     * Housekeeping that must run under entry.lock:
995     * - Wake nrd waiters when queue becomes empty.
996     * - Remove entry only if queue is empty AND no waiters are parked on hasValue (prevents orphaning).
997     */
998    private void postFetchHousekeeping(K key, KeyEntry entry) {
999        if (!entry.queue.isEmpty())
1000            return;
1001
1002        // Always wake nrd waiters when empty.
1003        entry.isEmpty.signalAll();
1004
1005        // Remove only when safe (no hasValue waiters).
1006        if (entries.get(key) == entry && !entry.lock.hasWaiters(entry.hasValue)) {
1007            entries.remove(key, entry);
1008        }
1009    }
1010
1011    /**
1012     * Expirable wrapper for values with timeout.
1013     */
1014    static class Expirable implements Comparable, Serializable {
1015        private static final long serialVersionUID = 0xA7F22BF5;
1016
1017        Object value;
1018        long expires;
1019
1020        Expirable(Object value, long expires) {
1021            super();
1022            this.value = value;
1023            this.expires = expires;
1024        }
1025
1026        boolean isExpired() {
1027            return (System.nanoTime() - expires) > 0;
1028        }
1029
1030        @Override
1031        public String toString() {
1032            return getClass().getName()
1033              + "@" + Integer.toHexString(hashCode())
1034              + ",value=" + value.toString()
1035              + ",expired=" + isExpired();
1036        }
1037
1038        Object getValue() {
1039            return isExpired() ? null : value;
1040        }
1041
1042        @Override
1043        public int compareTo(Object other) {
1044            long diff = this.expires - ((Expirable) other).expires;
1045            return diff > 0 ? 1 : diff < 0 ? -1 : 0;
1046        }
1047    }
1048
1049    private static final class CleaningState implements Runnable {
1050        private final ScheduledFuture<?> gcFuture;
1051        private final ConcurrentHashMap<?,?> entries;
1052        private final Set<?>[] expirables;
1053
1054        // We keep a reference to sl so we can cancel its scheduler too.
1055        // This does not introduce a new retention path; it already hangs off the parent space.
1056        private volatile AutoCloseable sl; // store as AutoCloseable to avoid generics pain
1057
1058        private final AtomicBoolean cleaned = new AtomicBoolean(false);
1059
1060        private CleaningState(ScheduledFuture<?> gcFuture,
1061                              ConcurrentHashMap<?,?> entries,
1062                              Set<?>[] expirables) {
1063            this.gcFuture = gcFuture;
1064            this.entries = entries;
1065            this.expirables = expirables;
1066        }
1067
1068        @Override
1069        public void run() {
1070            if (!cleaned.compareAndSet(false, true))
1071                return;
1072
1073            try {
1074                if (gcFuture != null)
1075                    gcFuture.cancel(false);
1076            } catch (Throwable ignored) { }
1077
1078            // Best-effort close of the listener space.
1079            AutoCloseable s = sl;
1080            if (s != null) {
1081                try {
1082                    s.close();
1083                } catch (Throwable ignored) { }
1084                sl = null;
1085            }
1086
1087            try {
1088                entries.clear();
1089            } catch (Throwable ignored) { }
1090
1091            try {
1092                expirables[0].clear();
1093            } catch (Throwable ignored) { }
1094            try {
1095                expirables[1].clear();
1096            } catch (Throwable ignored) { }
1097        }
1098    }
1099
1100    // =========================
1101    // Test-only visibility hooks
1102    // =========================
1103    // Package-private on purpose (same package as tests).
1104    // These methods are intended strictly for unit tests that validate internal invariants.
1105    boolean isExpirableTrackedForTest(K key) {
1106        synchronized (expLocks[0]) {
1107            synchronized (expLocks[1]) {
1108                return expirables[0].contains(key) || expirables[1].contains(key);
1109            }
1110        }
1111    }
1112
1113    boolean isExpirableTrackedForTest(K key, int generation) {
1114        synchronized (expLocks[generation]) {
1115            return expirables[generation].contains(key);
1116        }
1117    }
1118
1119    void forceTrackExpirableForTest(K key, int generation) {
1120        synchronized (expLocks[generation]) {
1121            expirables[generation].add(key);
1122        }
1123    }
1124}