001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import org.jpos.jfr.SpaceEvent; 022import org.jpos.util.Loggeable; 023 024import java.io.PrintStream; 025import java.io.Serializable; 026import java.lang.ref.Cleaner; 027import java.util.*; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ScheduledFuture; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.LockSupport; 034import java.util.concurrent.locks.ReentrantLock; 035 036/** 037 * LSpace (Loom-optimized Space) implementation with per-key locking for Virtual Thread efficiency. 038 * 039 * <p>This implementation addresses the thundering herd problem in traditional Space implementations 040 * by using per-key {@link ReentrantLock} and {@link Condition} objects instead of global synchronization. 041 * With Virtual Threads (Project Loom), this prevents thousands of threads from being 042 * unnecessarily woken up when only one is relevant.</p> 043 * 044 * <p>Key features: 045 * <ul> 046 * <li>Per-key isolation: Each key has its own lock and condition variables</li> 047 * <li>Targeted wakeups: Only threads waiting on a specific key are signaled</li> 048 * <li>Virtual Thread optimized: Scales efficiently with thousands of concurrent threads</li> 049 * <li>Full LocalSpace compatibility: Drop-in replacement with same API and behavior</li> 050 * <li>JFR instrumentation: All operations emit SpaceEvent for monitoring</li> 051 * </ul> 052 * </p> 053 * 054 * Concurrency notes (core safety invariants): 055 * - Never remove a KeyEntry from entries while threads are waiting on that entry's hasValue Condition, 056 * otherwise those waiters can be stranded forever (future out() creates a new KeyEntry+Condition). 057 * - Blocking rd()/in() must never return null unless interrupted (or timed out for timed variants). 058 * 059 * @author Alejandro Revilla 060 * @version $Revision$ $Date$ 061 * @since 3.0 062 */ 063@SuppressWarnings("unchecked") 064public class LSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable { 065 private final ConcurrentHashMap<K, KeyEntry> entries; 066 private volatile LocalSpace<K, SpaceListener<K,V>> sl; 067 private final ScheduledFuture<?> gcFuture; 068 private final Object[] expLocks = new Object[] { new Object(), new Object() }; 069 070 public static final long GCDELAY = 5 * 1000; 071 private static final long GCLONG = 60_000L; 072 private static final long NRD_RESOLUTION = 500L; 073 private static final int MAX_ENTRIES_IN_DUMP = 1000; 074 075 private static final long ONE_MILLION = 1_000_000L; // millis -> nanos 076 private static final long NO_TIMEOUT = -1L; 077 078 private final Set<K>[] expirables; 079 private long lastLongGC = System.nanoTime(); 080 private final AtomicBoolean closed = new AtomicBoolean(false); 081 private static final Cleaner CLEANER = Cleaner.create(); 082 private final Cleaner.Cleanable cleanable; 083 private final CleaningState cleaningState; 084 085 /** 086 * Per-key synchronization and queue structure. 087 */ 088 private static class KeyEntry { 089 final ReentrantLock lock = new ReentrantLock(); 090 final Condition hasValue = lock.newCondition(); // signaled when value added 091 final Condition isEmpty = lock.newCondition(); // signaled when queue becomes empty (for nrd) 092 final LinkedList<Object> queue = new LinkedList<>(); 093 volatile boolean hasExpirable = false; 094 } 095 096 097 public LSpace() { 098 super(); 099 this.entries = new ConcurrentHashMap<>(256); 100 this.expirables = new Set[] { 101 ConcurrentHashMap.newKeySet(), 102 ConcurrentHashMap.newKeySet() 103 }; 104 this.gcFuture = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS); 105 this.cleaningState = new CleaningState(gcFuture, entries, expirables); 106 this.cleanable = CLEANER.register(this, cleaningState); 107 } 108 109 // ------------------------- 110 // JFR tagging helper (patch) 111 // ------------------------- 112 private String jfrTag(Object keyOrTemplate) { 113 if (keyOrTemplate instanceof Template) { 114 Object k = ((Template) keyOrTemplate).getKey(); 115 return "" + k; 116 } 117 return "" + keyOrTemplate; 118 } 119 120 121 // ========== Producer (enqueuing) operations ========== 122 123 @FunctionalInterface 124 private interface Enqueuer { 125 void enqueue(KeyEntry entry, Object value); 126 } 127 128 @Override 129 public void out(K key, V value) { 130 out(key, value, NO_TIMEOUT); 131 } 132 133 @Override 134 public void out(K key, V value, long timeout) { 135 enqueueValue("out", key, value, timeout, (ent,v) -> ent.queue.addLast(v)); 136 } 137 138 @Override 139 public void push(K key, V value) { 140 push(key, value, NO_TIMEOUT); 141 } 142 143 @Override 144 public void push(K key, V value, long timeout) { 145 enqueueValue("push", key, value, timeout, (ent,v) -> ent.queue.addFirst(v)); 146 } 147 148 @Override 149 public void put(K key, V value) { 150 put(key, value, NO_TIMEOUT); 151 } 152 153 @Override 154 public void put(K key, V value, long timeout) { 155 enqueueValue("put", key, value, timeout, (ent,v) -> { 156 ent.queue.clear(); 157 ent.queue.addLast(v); 158 ent.hasExpirable = timeout > 0; 159 if (timeout <= 0) 160 unregisterExpirable(key); 161 }); 162 } 163 164 /** 165 * Common method for all enqueuing operations (out, push, put, with/out timeout) 166 */ 167 private void enqueueValue(String opTag, K key, V value, long timeout, Enqueuer op) { 168 ensureOpen(); 169 var jfr = new SpaceEvent(opTag + (timeout > 0 ? ":tim" : ""), "" + key); 170 jfr.begin(); 171 try { 172 if (key == null || value == null) 173 throw new NullPointerException("key=" + key + ", value=" + value); 174 175 Object v = value; 176 if (timeout > 0) 177 v = new Expirable(value, System.nanoTime() + (timeout * ONE_MILLION)); 178 179 while (true) { 180 KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry()); 181 182 entry.lock.lock(); 183 try { 184 if (entries.get(key) != entry) { 185 continue; 186 } 187 188 op.enqueue(entry, v); 189 190 if (timeout > 0) { 191 entry.hasExpirable = true; 192 registerExpirable(key, timeout); 193 } 194 195 if (entry.queue.size() == 1) { // was empty (or became empty after clear) 196 entry.hasValue.signalAll(); // Wake ALL readers (multiple rd() can read same value) 197 } 198 199 break; 200 } finally { 201 entry.lock.unlock(); 202 } 203 } 204 205 if (sl != null) 206 notifyListeners(key, value); 207 } finally { 208 jfr.commit(); 209 } 210 } 211 212 213 @Override 214 public V rdp(Object key) { 215 ensureOpen(); 216 var jfr = new SpaceEvent("rdp", "" + key); 217 jfr.begin(); 218 try { 219 if (key instanceof Template) 220 return (V) getObjectNonBlocking((Template) key, false); 221 return (V) getHeadNonBlocking((K) key, false); 222 } finally { 223 jfr.commit(); 224 } 225 } 226 227 @Override 228 public V inp(Object key) { 229 ensureOpen(); 230 var jfr = new SpaceEvent("inp", "" + key); 231 jfr.begin(); 232 try { 233 if (key instanceof Template) 234 return (V) getObjectNonBlocking((Template) key, true); 235 return (V) getHeadNonBlocking((K) key, true); 236 } finally { 237 jfr.commit(); 238 } 239 } 240 241 @Override 242 public V in(Object key) { 243 ensureOpen(); 244 String op = key instanceof Template ? "in:tmpl" : "in"; 245 var jfr = new SpaceEvent(op, jfrTag(key)); 246 jfr.begin(); 247 try { 248 if (key instanceof Template) 249 return inTemplate((Template) key); 250 return inKey((K) key); 251 } finally { 252 jfr.commit(); 253 } 254 } 255 256 @Override 257 public V in(Object key, long timeout) { 258 ensureOpen(); 259 String op = key instanceof Template ? "in:tim:tmpl" : "in:tim"; 260 var jfr = new SpaceEvent(op, jfrTag(key)); 261 jfr.begin(); 262 try { 263 if (key instanceof Template) 264 return inTemplate((Template) key, timeout); 265 return inKey((K) key, timeout); 266 } finally { 267 jfr.commit(); 268 } 269 } 270 271 @Override 272 public V rd(Object key) { 273 ensureOpen(); 274 String op = key instanceof Template ? "rd:tmpl" : "rd"; 275 var jfr = new SpaceEvent(op, jfrTag(key)); 276 jfr.begin(); 277 try { 278 if (key instanceof Template) 279 return rdTemplate((Template) key); 280 return rdKey((K) key); 281 } finally { 282 jfr.commit(); 283 } 284 } 285 286 @Override 287 public V rd(Object key, long timeout) { 288 ensureOpen(); 289 String op = key instanceof Template ? "rd:tim:tmpl" : "rd:tim"; 290 var jfr = new SpaceEvent(op, jfrTag(key)); 291 jfr.begin(); 292 try { 293 if (key instanceof Template) 294 return rdTemplate((Template) key, timeout); 295 return rdKey((K) key, timeout); 296 } finally { 297 jfr.commit(); 298 } 299 } 300 301 @Override 302 public void nrd(Object key) { 303 ensureOpen(); 304 var jfr = new SpaceEvent("nrd", "" + key); 305 jfr.begin(); 306 try { 307 K k = (K) key; 308 while (true) { 309 KeyEntry entry = entries.get(k); 310 if (entry == null) 311 return; 312 313 entry.lock.lock(); 314 try { 315 Object obj = getHead(entry, k, false); 316 if (obj == null) { 317 postFetchHousekeeping(k, entry); 318 return; 319 } 320 try { 321 entry.isEmpty.await(NRD_RESOLUTION, TimeUnit.MILLISECONDS); 322 } catch (InterruptedException ignored) { 323 Thread.currentThread().interrupt(); 324 return; 325 } 326 } finally { 327 entry.lock.unlock(); 328 } 329 } 330 } finally { 331 jfr.commit(); 332 } 333 } 334 335 @Override 336 public V nrd(Object key, long timeout) { 337 ensureOpen(); 338 var jfr = new SpaceEvent("nrd:tim", "" + key); 339 jfr.begin(); 340 try { 341 K k = (K) key; 342 long deadline = System.nanoTime() + timeout * ONE_MILLION; 343 344 while (true) { 345 KeyEntry entry = entries.get(k); 346 if (entry == null) 347 return null; 348 349 entry.lock.lock(); 350 try { 351 V obj = (V) getHead(entry, k, false); 352 if (obj == null) { 353 postFetchHousekeeping(k, entry); 354 return null; 355 } 356 long remaining = deadline - System.nanoTime(); 357 if (remaining <= 0) 358 return obj; 359 360 long waitTime = Math.min(NRD_RESOLUTION * ONE_MILLION, remaining); 361 try { 362 entry.isEmpty.awaitNanos(waitTime); 363 } catch (InterruptedException ignored) { 364 Thread.currentThread().interrupt(); 365 return obj; 366 } 367 } finally { 368 entry.lock.unlock(); 369 } 370 } 371 } finally { 372 jfr.commit(); 373 } 374 } 375 376 @Override 377 public boolean existAny(K[] keys) { 378 ensureOpen(); 379 for (K key : keys) { 380 if (rdp(key) != null) 381 return true; 382 } 383 return false; 384 } 385 386 @Override 387 public boolean existAny(K[] keys, long timeout) { 388 ensureOpen(); 389 var jfr = new SpaceEvent("existAny:tim", Integer.toString(keys != null ? keys.length : 0)); 390 jfr.begin(); 391 try { 392 long deadline = System.nanoTime() + timeout * ONE_MILLION; 393 long pollInterval = 10 * ONE_MILLION; 394 395 while (true) { 396 for (K key : keys) { 397 if (rdp(key) != null) 398 return true; 399 } 400 401 long remaining = deadline - System.nanoTime(); 402 if (remaining <= 0) 403 return false; 404 405 LockSupport.parkNanos(Math.min(pollInterval, remaining)); 406 } 407 } finally { 408 jfr.commit(); 409 } 410 } 411 412 @Override 413 public void run() { 414 // Scheduler ticks may race with close(); treat closed as a no-op. 415 if (closed.get()) 416 return; 417 try { 418 gc(); 419 } catch (Exception e) { 420 e.printStackTrace(); // should never happen 421 } 422 } 423 424 public void gc() { 425 // Avoid work after close if a scheduled tick slips through. 426 if (closed.get()) 427 return; 428 429 gc(0); 430 if (System.nanoTime() - lastLongGC > GCLONG * ONE_MILLION) { 431 gc(1); 432 lastLongGC = System.nanoTime(); 433 } 434 } 435 436 private void gc(int generation) { 437 // gc() already guards closed; keep gc(int) lean. 438 var jfr = new SpaceEvent("gc", Integer.toString(generation)); 439 jfr.begin(); 440 441 Set<K> keysToCheck; 442 synchronized (expLocks[generation]) { 443 keysToCheck = new HashSet<>(expirables[generation]); 444 expirables[generation].clear(); 445 } 446 for (K key : keysToCheck) { 447 KeyEntry entry = entries.get(key); 448 if (entry == null) 449 continue; 450 451 entry.lock.lock(); 452 try { 453 boolean stillHasExpirable = false; 454 boolean sawAnyExpirable = false; 455 456 Iterator<Object> iterator = entry.queue.iterator(); 457 while (iterator.hasNext()) { 458 Object obj = iterator.next(); 459 if (obj instanceof Expirable) { 460 sawAnyExpirable = true; 461 Object value = ((Expirable) obj).getValue(); 462 if (value == null) { 463 iterator.remove(); 464 } else { 465 stillHasExpirable = true; 466 } 467 } 468 } 469 470 entry.hasExpirable = stillHasExpirable; 471 472 if (stillHasExpirable) { 473 synchronized (expLocks[generation]) { 474 expirables[generation].add(key); 475 } 476 // Queue might have changed (expired items removed), wake any rd/in waiters. 477 entry.hasValue.signalAll(); 478 } else { 479 // No longer has expirables anywhere; ensure we don't keep the key in either generation set. 480 if (sawAnyExpirable) 481 unregisterExpirable(key); 482 } 483 484 // Apply the same safe empty-entry policy used everywhere else. 485 if (entry.queue.isEmpty()) { 486 postFetchHousekeeping(key, entry); 487 } 488 } finally { 489 entry.lock.unlock(); 490 } 491 492 Thread.yield(); 493 } 494 495 if (sl != null && sl.getKeySet().isEmpty()) { 496 sl = null; 497 } 498 499 jfr.commit(); 500 } 501 502 @Override 503 public int size(Object key) { 504 ensureOpen(); 505 var jfr = new SpaceEvent("size", "" + key); 506 jfr.begin(); 507 508 int size = 0; 509 KeyEntry entry = entries.get((K) key); 510 if (entry != null) { 511 entry.lock.lock(); 512 try { 513 size = entry.queue.size(); 514 } finally { 515 entry.lock.unlock(); 516 } 517 } 518 519 jfr.commit(); 520 return size; 521 } 522 523 @Override 524 public void addListener(Object key, SpaceListener listener) { 525 ensureOpen(); 526 getSL().out((K) key, listener); 527 } 528 529 @Override 530 public void addListener(Object key, SpaceListener listener, long timeout) { 531 ensureOpen(); 532 getSL().out((K) key, listener, timeout); 533 } 534 535 @Override 536 public void removeListener(Object key, SpaceListener listener) { 537 ensureOpen(); 538 if (sl != null) { 539 sl.inp((K) new ObjectTemplate(key, listener)); 540 } 541 } 542 543 public boolean isEmpty() { 544 ensureOpen(); 545 return entries.isEmpty(); 546 } 547 548 @Override 549 public Set<K> getKeySet() { 550 ensureOpen(); 551 return new HashSet<>(entries.keySet()); 552 } 553 554 public String getKeysAsString() { 555 ensureOpen(); 556 StringBuilder sb = new StringBuilder(); 557 Object[] keys = entries.keySet().toArray(); 558 for (int i = 0; i < keys.length; i++) { 559 if (i > 0) 560 sb.append(' '); 561 sb.append(keys[i]); 562 } 563 return sb.toString(); 564 } 565 566 @Override 567 public void dump(PrintStream p, String indent) { 568 ensureOpen(); 569 var jfr = new SpaceEvent("dump", ""); 570 jfr.begin(); 571 572 int size = entries.size(); 573 if (size > MAX_ENTRIES_IN_DUMP * 100) { 574 p.printf("%sWARNING - space too big, size=%d%n", indent, size); 575 jfr.commit(); 576 return; 577 } 578 579 Object[] keys = entries.keySet().toArray(); 580 581 int i = 0; 582 for (Object key : keys) { 583 p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key); 584 if (i++ > MAX_ENTRIES_IN_DUMP) { 585 p.printf("%s...%n", indent); 586 p.printf("%s...%n", indent); 587 break; 588 } 589 } 590 p.printf("%s key-count: %d%n", indent, keys.length); 591 592 int exp0 = expirables[0].size(); 593 int exp1 = expirables[1].size(); 594 p.printf("%s gcinfo: %d,%d%n", indent, exp0, exp1); 595 596 jfr.commit(); 597 } 598 599 public void notifyListeners(Object key, Object value) { 600 ensureOpen(); 601 var jfr = new SpaceEvent("notify", "" + key); 602 jfr.begin(); 603 LocalSpace<K, SpaceListener<K,V>> localSl = sl; // Capture volatile read once 604 if (localSl == null) { 605 jfr.commit(); 606 return; 607 } 608 Object[] listeners = null; 609 LSpace<K, SpaceListener<K,V>> lsl = (LSpace<K, SpaceListener<K,V>>) localSl; 610 KeyEntry slEntry = lsl.entries.get((K) key); 611 if (slEntry != null) { 612 slEntry.lock.lock(); 613 try { 614 listeners = slEntry.queue.toArray(); 615 } finally { 616 slEntry.lock.unlock(); 617 } 618 } 619 620 if (listeners != null) { 621 for (Object listener : listeners) { 622 Object o = listener; 623 if (o instanceof Expirable) 624 o = ((Expirable) o).getValue(); 625 if (o instanceof SpaceListener) 626 ((SpaceListener) o).notify(key, value); 627 } 628 } 629 630 jfr.commit(); 631 } 632 633 /** 634 * Non-standard method (required for space replication) - use with care. 635 */ 636 public Map getEntries() { 637 ensureOpen(); 638 Map<K, List> result = new HashMap<>(); 639 for (var e : entries.entrySet()) { 640 KeyEntry entry = e.getValue(); 641 entry.lock.lock(); 642 try { 643 result.put(e.getKey(), new LinkedList<>(entry.queue)); 644 } finally { 645 entry.lock.unlock(); 646 } 647 } 648 return result; 649 } 650 651 /** 652 * Non-standard method (required for space replication) - use with care. 653 */ 654 public void setEntries(Map entries) { 655 ensureOpen(); 656 this.entries.clear(); 657 for (var e : (Set<Map.Entry>)entries.entrySet()) { 658 K key = (K)e.getKey(); 659 List<V> list = (List<V>)e.getValue(); 660 KeyEntry entry = this.entries.computeIfAbsent(key, k -> new KeyEntry()); 661 entry.lock.lock(); 662 try { 663 entry.queue.clear(); 664 entry.queue.addAll(list); 665 // Conservatively: if replication injects Expirables, caller should also registerExpirable appropriately. 666 // We do not attempt to infer expirables here. 667 } finally { 668 entry.lock.unlock(); 669 } 670 } 671 } 672 673 /** 674 * Cancels the periodic GC task so this instance can be garbage-collected. 675 * Safe to call multiple times. 676 */ 677 @Override 678 public void close() { 679 if (!closed.compareAndSet(false, true)) 680 return; 681 682 if (gcFuture != null) { 683 gcFuture.cancel(false); 684 } 685 // If sl is an LSpace, allow it to release resources as well. 686 LocalSpace<K, SpaceListener<K,V>> s = sl; 687 if (s instanceof LSpace<?,?>) { 688 ((LSpace<?,?>) s).close(); 689 } 690 sl = null; 691 entries.clear(); 692 expirables[0].clear(); 693 expirables[1].clear(); 694 cleanable.clean(); // Eager cleanup 695 } 696 697 // ========== Blocking (deduplicated) ========== 698 699 private V inKey(K key) { 700 return awaitValue(key, entry -> getHead(entry, key, true), NO_TIMEOUT); 701 } 702 703 private V inKey(K key, long timeout) { 704 return awaitValue(key, entry -> getHead(entry, key, true), timeout); 705 } 706 707 private V rdKey(K key) { 708 return awaitValue(key, entry -> getHead(entry, key, false), NO_TIMEOUT); 709 } 710 711 private V rdKey(K key, long timeout) { 712 return awaitValue(key, entry -> getHead(entry, key, false), timeout); 713 } 714 715 private V inTemplate(Template tmpl) { 716 K key = (K) tmpl.getKey(); 717 return awaitValue(key, entry -> getObject(entry, key, tmpl, true), NO_TIMEOUT); 718 } 719 720 private V inTemplate(Template tmpl, long timeout) { 721 K key = (K) tmpl.getKey(); 722 return awaitValue(key, entry -> getObject(entry, key, tmpl, true), timeout); 723 } 724 725 private V rdTemplate(Template tmpl) { 726 K key = (K) tmpl.getKey(); 727 return awaitValue(key, entry -> getObject(entry, key, tmpl, false), NO_TIMEOUT); 728 } 729 730 private V rdTemplate(Template tmpl, long timeout) { 731 K key = (K) tmpl.getKey(); 732 return awaitValue(key, entry -> getObject(entry, key, tmpl, false), timeout); 733 } 734 735 // ========== Non-blocking helpers ========== 736 737 /** 738 * Get head of queue (non-blocking version for rdp/inp). 739 * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters. 740 */ 741 private Object getHeadNonBlocking(K key, boolean remove) { 742 KeyEntry entry = entries.get(key); 743 if (entry == null) 744 return null; 745 746 entry.lock.lock(); 747 try { 748 if (entries.get(key) != entry) 749 return null; 750 751 Object result = getHead(entry, key, remove); 752 753 if (remove) { 754 // If remove emptied the queue, postFetchHousekeeping will handle safe removal/signals. 755 postFetchHousekeeping(key, entry); 756 } 757 return result; 758 } finally { 759 entry.lock.unlock(); 760 } 761 } 762 763 /** 764 * Get object matching template (non-blocking version for rdp/inp). 765 * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters. 766 */ 767 private Object getObjectNonBlocking(Template tmpl, boolean remove) { 768 K key = (K) tmpl.getKey(); 769 KeyEntry entry = entries.get(key); 770 if (entry == null) 771 return null; 772 773 entry.lock.lock(); 774 try { 775 if (entries.get(key) != entry) 776 return null; 777 778 Object result = getObject(entry, key, tmpl, remove); 779 780 if (remove) { 781 postFetchHousekeeping(key, entry); 782 } 783 return result; 784 } finally { 785 entry.lock.unlock(); 786 } 787 } 788 789 /** 790 * Get head of queue. 791 * MUST be called with entry.lock held. 792 */ 793 private Object getHead(KeyEntry entry, K key, boolean remove) { 794 Object result = null; 795 boolean wasExpirable = false; 796 797 while (result == null && !entry.queue.isEmpty()) { 798 Object obj = entry.queue.getFirst(); 799 800 if (obj instanceof Expirable) { 801 Object value = ((Expirable) obj).getValue(); 802 wasExpirable = true; 803 804 if (value == null) { 805 entry.queue.removeFirst(); 806 continue; 807 } else { 808 result = value; 809 } 810 } else { 811 result = obj; 812 } 813 814 if (remove && result != null) { 815 entry.queue.removeFirst(); 816 } 817 } 818 819 if (entry.queue.isEmpty()) { 820 entry.hasExpirable = false; 821 if (wasExpirable) 822 unregisterExpirable(key); 823 } 824 825 return result; 826 } 827 828 /** 829 * Get object matching template. 830 * MUST be called with entry.lock held. 831 */ 832 private Object getObject(KeyEntry entry, K key, Template tmpl, boolean remove) { 833 Object result = null; 834 Iterator<Object> iterator = entry.queue.iterator(); 835 boolean wasExpirable = false; 836 837 while (iterator.hasNext()) { 838 Object obj = iterator.next(); 839 840 if (obj instanceof Expirable) { 841 Object value = ((Expirable) obj).getValue(); 842 if (value == null) { 843 iterator.remove(); 844 wasExpirable = true; 845 continue; 846 } else { 847 obj = value; 848 } 849 } 850 851 if (tmpl.equals(obj)) { 852 result = obj; 853 if (remove) 854 iterator.remove(); 855 break; 856 } 857 } 858 859 if (entry.queue.isEmpty()) { 860 entry.hasExpirable = false; 861 if (wasExpirable) 862 unregisterExpirable(key); 863 } 864 865 return result; 866 } 867 868 private void ensureOpen() { 869 if (closed.get()) 870 throw new IllegalStateException("LSpace is closed"); 871 } 872 873 // ========== Listener-space helpers ========== 874 875 private LocalSpace<K, SpaceListener<K,V>> getSL() { 876 ensureOpen(); 877 if (sl == null) { 878 synchronized (this) { 879 ensureOpen(); 880 if (sl == null) { 881 sl = new LSpace<>(); 882 cleaningState.sl = (AutoCloseable) sl; 883 } 884 } 885 } 886 return sl; 887 } 888 889 private void registerExpirable(K k, long t) { 890 int g = (t > GCLONG) ? 1 : 0; 891 synchronized (expLocks[g]) { 892 expirables[g].add(k); 893 } 894 } 895 896 private void unregisterExpirable(K k) { 897 synchronized (expLocks[0]) { 898 synchronized (expLocks[1]) { 899 expirables[0].remove(k); 900 expirables[1].remove(k); 901 } 902 } 903 } 904 905 // ========== Blocking core (shared) ========== 906 907 @FunctionalInterface 908 private interface Fetcher { 909 Object fetch(KeyEntry entry); 910 } 911 912 /** 913 * Common blocking wait-loop for rd/in operations (key or template). 914 * 915 * Ensures: 916 * - No premature null returns due to entry replacement (retries outer loop). 917 * - Timed variants remove empty computeIfAbsent-created entries on timeout/interrupt (postFetchHousekeeping). 918 * - Does not orphan waiters because postFetchHousekeeping refuses to remove if hasValue waiters exist. 919 */ 920 @SuppressWarnings("unchecked") 921 private V awaitValue(K key, Fetcher fetcher, long timeoutMillis) { 922 ensureOpen(); 923 924 final boolean timed = timeoutMillis != NO_TIMEOUT; 925 final long deadlineNanos = timed ? System.nanoTime() + timeoutMillis * ONE_MILLION : 0L; 926 927 for (;;) { 928 final KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry()); 929 930 entry.lock.lock(); 931 try { 932 if (entries.get(key) != entry) 933 continue; 934 935 for (;;) { 936 Object obj = fetcher.fetch(entry); 937 if (obj != null) { 938 postFetchHousekeeping(key, entry); 939 return (V) obj; 940 } 941 942 if (!timed) { 943 try { 944 entry.hasValue.await(); 945 } catch (InterruptedException ie) { 946 Thread.currentThread().interrupt(); 947 // Avoid leaking an empty entry created by computeIfAbsent for a waiter that got interrupted. 948 postFetchHousekeeping(key, entry); 949 break; 950 } 951 } else { 952 try { 953 long remaining = entry.hasValue.awaitNanos(deadlineNanos - System.nanoTime()); 954 if (remaining <= 0) { 955 // Avoid leaking empty entries created by computeIfAbsent when timing out. 956 postFetchHousekeeping(key, entry); 957 return null; 958 } 959 } catch (InterruptedException ie) { 960 Thread.currentThread().interrupt(); 961 postFetchHousekeeping(key, entry); 962 return null; 963 } 964 } 965 966 // If someone removed/replaced the entry, restart outer loop to bind to the current entry. 967 if (entries.get(key) != entry) 968 break; 969 } // inner loop 970 } finally { 971 entry.lock.unlock(); 972 } 973 } // outer loop 974 } 975 976 /** 977 * Housekeeping that must run under entry.lock: 978 * - Wake nrd waiters when queue becomes empty. 979 * - Remove entry only if queue is empty AND no waiters are parked on hasValue (prevents orphaning). 980 */ 981 private void postFetchHousekeeping(K key, KeyEntry entry) { 982 if (!entry.queue.isEmpty()) 983 return; 984 985 // Always wake nrd waiters when empty. 986 entry.isEmpty.signalAll(); 987 988 // Remove only when safe (no hasValue waiters). 989 if (entries.get(key) == entry && !entry.lock.hasWaiters(entry.hasValue)) { 990 entries.remove(key, entry); 991 } 992 } 993 994 /** 995 * Expirable wrapper for values with timeout. 996 */ 997 static class Expirable implements Comparable, Serializable { 998 private static final long serialVersionUID = 0xA7F22BF5; 999 1000 Object value; 1001 long expires; 1002 1003 Expirable(Object value, long expires) { 1004 super(); 1005 this.value = value; 1006 this.expires = expires; 1007 } 1008 1009 boolean isExpired() { 1010 return (System.nanoTime() - expires) > 0; 1011 } 1012 1013 @Override 1014 public String toString() { 1015 return getClass().getName() 1016 + "@" + Integer.toHexString(hashCode()) 1017 + ",value=" + value.toString() 1018 + ",expired=" + isExpired(); 1019 } 1020 1021 Object getValue() { 1022 return isExpired() ? null : value; 1023 } 1024 1025 @Override 1026 public int compareTo(Object other) { 1027 long diff = this.expires - ((Expirable) other).expires; 1028 return diff > 0 ? 1 : diff < 0 ? -1 : 0; 1029 } 1030 } 1031 1032 private static final class CleaningState implements Runnable { 1033 private final ScheduledFuture<?> gcFuture; 1034 private final ConcurrentHashMap<?,?> entries; 1035 private final Set<?>[] expirables; 1036 1037 // We keep a reference to sl so we can cancel its scheduler too. 1038 // This does not introduce a new retention path; it already hangs off the parent space. 1039 private volatile AutoCloseable sl; // store as AutoCloseable to avoid generics pain 1040 1041 private final AtomicBoolean cleaned = new AtomicBoolean(false); 1042 1043 private CleaningState(ScheduledFuture<?> gcFuture, 1044 ConcurrentHashMap<?,?> entries, 1045 Set<?>[] expirables) { 1046 this.gcFuture = gcFuture; 1047 this.entries = entries; 1048 this.expirables = expirables; 1049 } 1050 1051 @Override 1052 public void run() { 1053 if (!cleaned.compareAndSet(false, true)) 1054 return; 1055 1056 try { 1057 if (gcFuture != null) 1058 gcFuture.cancel(false); 1059 } catch (Throwable ignored) { } 1060 1061 // Best-effort close of the listener space. 1062 AutoCloseable s = sl; 1063 if (s != null) { 1064 try { 1065 s.close(); 1066 } catch (Throwable ignored) { } 1067 sl = null; 1068 } 1069 1070 try { 1071 entries.clear(); 1072 } catch (Throwable ignored) { } 1073 1074 try { 1075 expirables[0].clear(); 1076 } catch (Throwable ignored) { } 1077 try { 1078 expirables[1].clear(); 1079 } catch (Throwable ignored) { } 1080 } 1081 } 1082 1083 // ========================= 1084 // Test-only visibility hooks 1085 // ========================= 1086 // Package-private on purpose (same package as tests). 1087 // These methods are intended strictly for unit tests that validate internal invariants. 1088 boolean isExpirableTrackedForTest(K key) { 1089 synchronized (expLocks[0]) { 1090 synchronized (expLocks[1]) { 1091 return expirables[0].contains(key) || expirables[1].contains(key); 1092 } 1093 } 1094 } 1095 1096 boolean isExpirableTrackedForTest(K key, int generation) { 1097 synchronized (expLocks[generation]) { 1098 return expirables[generation].contains(key); 1099 } 1100 } 1101 1102 void forceTrackExpirableForTest(K key, int generation) { 1103 synchronized (expLocks[generation]) { 1104 expirables[generation].add(key); 1105 } 1106 } 1107}