001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import org.jpos.jfr.SpaceEvent; 022import org.jpos.util.Loggeable; 023 024import java.io.PrintStream; 025import java.io.Serializable; 026import java.lang.ref.Cleaner; 027import java.util.*; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ScheduledFuture; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.locks.Condition; 033import java.util.concurrent.locks.LockSupport; 034import java.util.concurrent.locks.ReentrantLock; 035 036/** 037 * LSpace (Loom-optimized Space) implementation with per-key locking for Virtual Thread efficiency. 038 * 039 * <p>This implementation addresses the thundering herd problem in traditional Space implementations 040 * by using per-key {@link ReentrantLock} and {@link Condition} objects instead of global synchronization. 041 * With Virtual Threads (Project Loom), this prevents thousands of threads from being 042 * unnecessarily woken up when only one is relevant.</p> 043 * 044 * <p>Key features: 045 * <ul> 046 * <li>Per-key isolation: Each key has its own lock and condition variables</li> 047 * <li>Targeted wakeups: Only threads waiting on a specific key are signaled</li> 048 * <li>Virtual Thread optimized: Scales efficiently with thousands of concurrent threads</li> 049 * <li>Full LocalSpace compatibility: Drop-in replacement with same API and behavior</li> 050 * <li>JFR instrumentation: All operations emit SpaceEvent for monitoring</li> 051 * </ul> 052 * 053 * Concurrency notes (core safety invariants): 054 * - Never remove a KeyEntry from entries while threads are waiting on that entry's hasValue Condition, 055 * otherwise those waiters can be stranded forever (future out() creates a new KeyEntry+Condition). 056 * - Blocking rd()/in() must never return null unless interrupted (or timed out for timed variants). 057 * 058 * @author Alejandro Revilla 059 * @version $Revision$ $Date$ 060 * @since 3.0 061 062 * @param <K> the key type 063 * @param <V> the value type 064 */ 065@SuppressWarnings("unchecked") 066public class LSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable { 067 private final ConcurrentHashMap<K, KeyEntry> entries; 068 private volatile LocalSpace<K, SpaceListener<K,V>> sl; 069 private final ScheduledFuture<?> gcFuture; 070 private final Object[] expLocks = new Object[] { new Object(), new Object() }; 071 072 /** GC sweep delay in milliseconds. */ 073 public static final long GCDELAY = 5 * 1000; 074 private static final long GCLONG = 60_000L; 075 private static final long NRD_RESOLUTION = 500L; 076 private static final int MAX_ENTRIES_IN_DUMP = 1000; 077 078 private static final long ONE_MILLION = 1_000_000L; // millis -> nanos 079 private static final long NO_TIMEOUT = -1L; 080 081 private final Set<K>[] expirables; 082 private long lastLongGC = System.nanoTime(); 083 private final AtomicBoolean closed = new AtomicBoolean(false); 084 private static final Cleaner CLEANER = Cleaner.create(); 085 private final Cleaner.Cleanable cleanable; 086 private final CleaningState cleaningState; 087 088 /** 089 * Per-key synchronization and queue structure. 090 */ 091 private static class KeyEntry { 092 final ReentrantLock lock = new ReentrantLock(); 093 final Condition hasValue = lock.newCondition(); // signaled when value added 094 final Condition isEmpty = lock.newCondition(); // signaled when queue becomes empty (for nrd) 095 final LinkedList<Object> queue = new LinkedList<>(); 096 volatile boolean hasExpirable = false; 097 } 098 099 100 /** Default constructor. */ 101 public LSpace() { 102 super(); 103 this.entries = new ConcurrentHashMap<>(256); 104 this.expirables = new Set[] { 105 ConcurrentHashMap.newKeySet(), 106 ConcurrentHashMap.newKeySet() 107 }; 108 this.gcFuture = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS); 109 this.cleaningState = new CleaningState(gcFuture, entries, expirables); 110 this.cleanable = CLEANER.register(this, cleaningState); 111 } 112 113 // ------------------------- 114 // JFR tagging helper (patch) 115 // ------------------------- 116 private String jfrTag(Object keyOrTemplate) { 117 if (keyOrTemplate instanceof Template) { 118 Object k = ((Template) keyOrTemplate).getKey(); 119 return "" + k; 120 } 121 return "" + keyOrTemplate; 122 } 123 124 125 // ========== Producer (enqueuing) operations ========== 126 127 @FunctionalInterface 128 private interface Enqueuer { 129 void enqueue(KeyEntry entry, Object value); 130 } 131 132 @Override 133 public void out(K key, V value) { 134 out(key, value, NO_TIMEOUT); 135 } 136 137 @Override 138 public void out(K key, V value, long timeout) { 139 enqueueValue("out", key, value, timeout, (ent,v) -> ent.queue.addLast(v)); 140 } 141 142 @Override 143 public void push(K key, V value) { 144 push(key, value, NO_TIMEOUT); 145 } 146 147 @Override 148 public void push(K key, V value, long timeout) { 149 enqueueValue("push", key, value, timeout, (ent,v) -> ent.queue.addFirst(v)); 150 } 151 152 @Override 153 public void put(K key, V value) { 154 put(key, value, NO_TIMEOUT); 155 } 156 157 @Override 158 public void put(K key, V value, long timeout) { 159 enqueueValue("put", key, value, timeout, (ent,v) -> { 160 ent.queue.clear(); 161 ent.queue.addLast(v); 162 ent.hasExpirable = timeout > 0; 163 if (timeout <= 0) 164 unregisterExpirable(key); 165 }); 166 } 167 168 /** 169 * Common method for all enqueuing operations (out, push, put, with/out timeout) 170 */ 171 private void enqueueValue(String opTag, K key, V value, long timeout, Enqueuer op) { 172 ensureOpen(); 173 var jfr = new SpaceEvent(opTag + (timeout > 0 ? ":tim" : ""), "" + key); 174 jfr.begin(); 175 try { 176 if (key == null || value == null) 177 throw new NullPointerException("key=" + key + ", value=" + value); 178 179 Object v = value; 180 if (timeout > 0) 181 v = new Expirable(value, System.nanoTime() + (timeout * ONE_MILLION)); 182 183 while (true) { 184 KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry()); 185 186 entry.lock.lock(); 187 try { 188 if (entries.get(key) != entry) { 189 continue; 190 } 191 192 op.enqueue(entry, v); 193 194 if (timeout > 0) { 195 entry.hasExpirable = true; 196 registerExpirable(key, timeout); 197 } 198 199 if (entry.queue.size() == 1) { // was empty (or became empty after clear) 200 entry.hasValue.signalAll(); // Wake ALL readers (multiple rd() can read same value) 201 } 202 203 break; 204 } finally { 205 entry.lock.unlock(); 206 } 207 } 208 209 if (sl != null) 210 notifyListeners(key, value); 211 } finally { 212 jfr.commit(); 213 } 214 } 215 216 217 @Override 218 public V rdp(Object key) { 219 ensureOpen(); 220 var jfr = new SpaceEvent("rdp", "" + key); 221 jfr.begin(); 222 try { 223 if (key instanceof Template) 224 return (V) getObjectNonBlocking((Template) key, false); 225 return (V) getHeadNonBlocking((K) key, false); 226 } finally { 227 jfr.commit(); 228 } 229 } 230 231 @Override 232 public V inp(Object key) { 233 ensureOpen(); 234 var jfr = new SpaceEvent("inp", "" + key); 235 jfr.begin(); 236 try { 237 if (key instanceof Template) 238 return (V) getObjectNonBlocking((Template) key, true); 239 return (V) getHeadNonBlocking((K) key, true); 240 } finally { 241 jfr.commit(); 242 } 243 } 244 245 @Override 246 public V in(Object key) { 247 ensureOpen(); 248 String op = key instanceof Template ? "in:tmpl" : "in"; 249 var jfr = new SpaceEvent(op, jfrTag(key)); 250 jfr.begin(); 251 try { 252 if (key instanceof Template) 253 return inTemplate((Template) key); 254 return inKey((K) key); 255 } finally { 256 jfr.commit(); 257 } 258 } 259 260 @Override 261 public V in(Object key, long timeout) { 262 ensureOpen(); 263 String op = key instanceof Template ? "in:tim:tmpl" : "in:tim"; 264 var jfr = new SpaceEvent(op, jfrTag(key)); 265 jfr.begin(); 266 try { 267 if (key instanceof Template) 268 return inTemplate((Template) key, timeout); 269 return inKey((K) key, timeout); 270 } finally { 271 jfr.commit(); 272 } 273 } 274 275 @Override 276 public V rd(Object key) { 277 ensureOpen(); 278 String op = key instanceof Template ? "rd:tmpl" : "rd"; 279 var jfr = new SpaceEvent(op, jfrTag(key)); 280 jfr.begin(); 281 try { 282 if (key instanceof Template) 283 return rdTemplate((Template) key); 284 return rdKey((K) key); 285 } finally { 286 jfr.commit(); 287 } 288 } 289 290 @Override 291 public V rd(Object key, long timeout) { 292 ensureOpen(); 293 String op = key instanceof Template ? "rd:tim:tmpl" : "rd:tim"; 294 var jfr = new SpaceEvent(op, jfrTag(key)); 295 jfr.begin(); 296 try { 297 if (key instanceof Template) 298 return rdTemplate((Template) key, timeout); 299 return rdKey((K) key, timeout); 300 } finally { 301 jfr.commit(); 302 } 303 } 304 305 @Override 306 public void nrd(Object key) { 307 ensureOpen(); 308 var jfr = new SpaceEvent("nrd", "" + key); 309 jfr.begin(); 310 try { 311 K k = (K) key; 312 while (true) { 313 KeyEntry entry = entries.get(k); 314 if (entry == null) 315 return; 316 317 entry.lock.lock(); 318 try { 319 Object obj = getHead(entry, k, false); 320 if (obj == null) { 321 postFetchHousekeeping(k, entry); 322 return; 323 } 324 try { 325 entry.isEmpty.await(NRD_RESOLUTION, TimeUnit.MILLISECONDS); 326 } catch (InterruptedException ignored) { 327 Thread.currentThread().interrupt(); 328 return; 329 } 330 } finally { 331 entry.lock.unlock(); 332 } 333 } 334 } finally { 335 jfr.commit(); 336 } 337 } 338 339 @Override 340 public V nrd(Object key, long timeout) { 341 ensureOpen(); 342 var jfr = new SpaceEvent("nrd:tim", "" + key); 343 jfr.begin(); 344 try { 345 K k = (K) key; 346 long deadline = System.nanoTime() + timeout * ONE_MILLION; 347 348 while (true) { 349 KeyEntry entry = entries.get(k); 350 if (entry == null) 351 return null; 352 353 entry.lock.lock(); 354 try { 355 V obj = (V) getHead(entry, k, false); 356 if (obj == null) { 357 postFetchHousekeeping(k, entry); 358 return null; 359 } 360 long remaining = deadline - System.nanoTime(); 361 if (remaining <= 0) 362 return obj; 363 364 long waitTime = Math.min(NRD_RESOLUTION * ONE_MILLION, remaining); 365 try { 366 entry.isEmpty.awaitNanos(waitTime); 367 } catch (InterruptedException ignored) { 368 Thread.currentThread().interrupt(); 369 return obj; 370 } 371 } finally { 372 entry.lock.unlock(); 373 } 374 } 375 } finally { 376 jfr.commit(); 377 } 378 } 379 380 @Override 381 public boolean existAny(K[] keys) { 382 ensureOpen(); 383 for (K key : keys) { 384 if (rdp(key) != null) 385 return true; 386 } 387 return false; 388 } 389 390 @Override 391 public boolean existAny(K[] keys, long timeout) { 392 ensureOpen(); 393 var jfr = new SpaceEvent("existAny:tim", Integer.toString(keys != null ? keys.length : 0)); 394 jfr.begin(); 395 try { 396 long deadline = System.nanoTime() + timeout * ONE_MILLION; 397 long pollInterval = 10 * ONE_MILLION; 398 399 while (true) { 400 for (K key : keys) { 401 if (rdp(key) != null) 402 return true; 403 } 404 405 long remaining = deadline - System.nanoTime(); 406 if (remaining <= 0) 407 return false; 408 409 LockSupport.parkNanos(Math.min(pollInterval, remaining)); 410 } 411 } finally { 412 jfr.commit(); 413 } 414 } 415 416 @Override 417 public void run() { 418 // Scheduler ticks may race with close(); treat closed as a no-op. 419 if (closed.get()) 420 return; 421 try { 422 gc(); 423 } catch (Exception e) { 424 e.printStackTrace(); // should never happen 425 } 426 } 427 428 /** Runs a garbage-collection sweep to remove expired space entries. */ 429 public void gc() { 430 // Avoid work after close if a scheduled tick slips through. 431 if (closed.get()) 432 return; 433 434 gc(0); 435 if (System.nanoTime() - lastLongGC > GCLONG * ONE_MILLION) { 436 gc(1); 437 lastLongGC = System.nanoTime(); 438 } 439 } 440 441 private void gc(int generation) { 442 // gc() already guards closed; keep gc(int) lean. 443 var jfr = new SpaceEvent("gc", Integer.toString(generation)); 444 jfr.begin(); 445 446 Set<K> keysToCheck; 447 synchronized (expLocks[generation]) { 448 keysToCheck = new HashSet<>(expirables[generation]); 449 expirables[generation].clear(); 450 } 451 for (K key : keysToCheck) { 452 KeyEntry entry = entries.get(key); 453 if (entry == null) 454 continue; 455 456 entry.lock.lock(); 457 try { 458 boolean stillHasExpirable = false; 459 boolean sawAnyExpirable = false; 460 461 Iterator<Object> iterator = entry.queue.iterator(); 462 while (iterator.hasNext()) { 463 Object obj = iterator.next(); 464 if (obj instanceof Expirable) { 465 sawAnyExpirable = true; 466 Object value = ((Expirable) obj).getValue(); 467 if (value == null) { 468 iterator.remove(); 469 } else { 470 stillHasExpirable = true; 471 } 472 } 473 } 474 475 entry.hasExpirable = stillHasExpirable; 476 477 if (stillHasExpirable) { 478 synchronized (expLocks[generation]) { 479 expirables[generation].add(key); 480 } 481 // Queue might have changed (expired items removed), wake any rd/in waiters. 482 entry.hasValue.signalAll(); 483 } else { 484 // No longer has expirables anywhere; ensure we don't keep the key in either generation set. 485 if (sawAnyExpirable) 486 unregisterExpirable(key); 487 } 488 489 // Apply the same safe empty-entry policy used everywhere else. 490 if (entry.queue.isEmpty()) { 491 postFetchHousekeeping(key, entry); 492 } 493 } finally { 494 entry.lock.unlock(); 495 } 496 497 Thread.yield(); 498 } 499 500 if (sl != null && sl.getKeySet().isEmpty()) { 501 sl = null; 502 } 503 504 jfr.commit(); 505 } 506 507 @Override 508 public int size(Object key) { 509 ensureOpen(); 510 var jfr = new SpaceEvent("size", "" + key); 511 jfr.begin(); 512 513 int size = 0; 514 KeyEntry entry = entries.get((K) key); 515 if (entry != null) { 516 entry.lock.lock(); 517 try { 518 size = entry.queue.size(); 519 } finally { 520 entry.lock.unlock(); 521 } 522 } 523 524 jfr.commit(); 525 return size; 526 } 527 528 @Override 529 public void addListener(Object key, SpaceListener listener) { 530 ensureOpen(); 531 getSL().out((K) key, listener); 532 } 533 534 @Override 535 public void addListener(Object key, SpaceListener listener, long timeout) { 536 ensureOpen(); 537 getSL().out((K) key, listener, timeout); 538 } 539 540 @Override 541 public void removeListener(Object key, SpaceListener listener) { 542 ensureOpen(); 543 if (sl != null) { 544 sl.inp((K) new ObjectTemplate(key, listener)); 545 } 546 } 547 548 /** Returns true if this space contains no entries. 549 * @return true if empty 550 */ 551 public boolean isEmpty() { 552 ensureOpen(); 553 return entries.isEmpty(); 554 } 555 556 @Override 557 public Set<K> getKeySet() { 558 ensureOpen(); 559 return new HashSet<>(entries.keySet()); 560 } 561 562 /** Returns all current keys as a space-separated string. 563 * @return space-separated key list 564 */ 565 public String getKeysAsString() { 566 ensureOpen(); 567 StringBuilder sb = new StringBuilder(); 568 Object[] keys = entries.keySet().toArray(); 569 for (int i = 0; i < keys.length; i++) { 570 if (i > 0) 571 sb.append(' '); 572 sb.append(keys[i]); 573 } 574 return sb.toString(); 575 } 576 577 @Override 578 public void dump(PrintStream p, String indent) { 579 ensureOpen(); 580 var jfr = new SpaceEvent("dump", ""); 581 jfr.begin(); 582 583 int size = entries.size(); 584 if (size > MAX_ENTRIES_IN_DUMP * 100) { 585 p.printf("%sWARNING - space too big, size=%d%n", indent, size); 586 jfr.commit(); 587 return; 588 } 589 590 Object[] keys = entries.keySet().toArray(); 591 592 int i = 0; 593 for (Object key : keys) { 594 p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key); 595 if (i++ > MAX_ENTRIES_IN_DUMP) { 596 p.printf("%s...%n", indent); 597 p.printf("%s...%n", indent); 598 break; 599 } 600 } 601 p.printf("%s key-count: %d%n", indent, keys.length); 602 603 int exp0 = expirables[0].size(); 604 int exp1 = expirables[1].size(); 605 p.printf("%s gcinfo: %d,%d%n", indent, exp0, exp1); 606 607 jfr.commit(); 608 } 609 610 /** Notifies all registered listeners for the given key/value pair. 611 * @param key the space key 612 * @param value the new value 613 */ 614 public void notifyListeners(Object key, Object value) { 615 ensureOpen(); 616 var jfr = new SpaceEvent("notify", "" + key); 617 jfr.begin(); 618 LocalSpace<K, SpaceListener<K,V>> localSl = sl; // Capture volatile read once 619 if (localSl == null) { 620 jfr.commit(); 621 return; 622 } 623 Object[] listeners = null; 624 LSpace<K, SpaceListener<K,V>> lsl = (LSpace<K, SpaceListener<K,V>>) localSl; 625 KeyEntry slEntry = lsl.entries.get((K) key); 626 if (slEntry != null) { 627 slEntry.lock.lock(); 628 try { 629 listeners = slEntry.queue.toArray(); 630 } finally { 631 slEntry.lock.unlock(); 632 } 633 } 634 635 if (listeners != null) { 636 for (Object listener : listeners) { 637 Object o = listener; 638 if (o instanceof Expirable) 639 o = ((Expirable) o).getValue(); 640 if (o instanceof SpaceListener) 641 ((SpaceListener) o).notify(key, value); 642 } 643 } 644 645 jfr.commit(); 646 } 647 648 /** 649 * Non-standard method (required for space replication) - use with care. 650 * @return snapshot map of all entries 651 */ 652 public Map getEntries() { 653 ensureOpen(); 654 Map<K, List> result = new HashMap<>(); 655 for (var e : entries.entrySet()) { 656 KeyEntry entry = e.getValue(); 657 entry.lock.lock(); 658 try { 659 result.put(e.getKey(), new LinkedList<>(entry.queue)); 660 } finally { 661 entry.lock.unlock(); 662 } 663 } 664 return result; 665 } 666 667 /** 668 * Non-standard method (required for space replication) - use with care. 669 * @param entries the entries map to load into this space 670 */ 671 public void setEntries(Map entries) { 672 ensureOpen(); 673 this.entries.clear(); 674 for (var e : (Set<Map.Entry>)entries.entrySet()) { 675 K key = (K)e.getKey(); 676 List<V> list = (List<V>)e.getValue(); 677 KeyEntry entry = this.entries.computeIfAbsent(key, k -> new KeyEntry()); 678 entry.lock.lock(); 679 try { 680 entry.queue.clear(); 681 entry.queue.addAll(list); 682 // Conservatively: if replication injects Expirables, caller should also registerExpirable appropriately. 683 // We do not attempt to infer expirables here. 684 } finally { 685 entry.lock.unlock(); 686 } 687 } 688 } 689 690 /** 691 * Cancels the periodic GC task so this instance can be garbage-collected. 692 * Safe to call multiple times. 693 */ 694 @Override 695 public void close() { 696 if (!closed.compareAndSet(false, true)) 697 return; 698 699 if (gcFuture != null) { 700 gcFuture.cancel(false); 701 } 702 // If sl is an LSpace, allow it to release resources as well. 703 LocalSpace<K, SpaceListener<K,V>> s = sl; 704 if (s instanceof LSpace<?,?>) { 705 ((LSpace<?,?>) s).close(); 706 } 707 sl = null; 708 entries.clear(); 709 expirables[0].clear(); 710 expirables[1].clear(); 711 cleanable.clean(); // Eager cleanup 712 } 713 714 // ========== Blocking (deduplicated) ========== 715 716 private V inKey(K key) { 717 return awaitValue(key, entry -> getHead(entry, key, true), NO_TIMEOUT); 718 } 719 720 private V inKey(K key, long timeout) { 721 return awaitValue(key, entry -> getHead(entry, key, true), timeout); 722 } 723 724 private V rdKey(K key) { 725 return awaitValue(key, entry -> getHead(entry, key, false), NO_TIMEOUT); 726 } 727 728 private V rdKey(K key, long timeout) { 729 return awaitValue(key, entry -> getHead(entry, key, false), timeout); 730 } 731 732 private V inTemplate(Template tmpl) { 733 K key = (K) tmpl.getKey(); 734 return awaitValue(key, entry -> getObject(entry, key, tmpl, true), NO_TIMEOUT); 735 } 736 737 private V inTemplate(Template tmpl, long timeout) { 738 K key = (K) tmpl.getKey(); 739 return awaitValue(key, entry -> getObject(entry, key, tmpl, true), timeout); 740 } 741 742 private V rdTemplate(Template tmpl) { 743 K key = (K) tmpl.getKey(); 744 return awaitValue(key, entry -> getObject(entry, key, tmpl, false), NO_TIMEOUT); 745 } 746 747 private V rdTemplate(Template tmpl, long timeout) { 748 K key = (K) tmpl.getKey(); 749 return awaitValue(key, entry -> getObject(entry, key, tmpl, false), timeout); 750 } 751 752 // ========== Non-blocking helpers ========== 753 754 /** 755 * Get head of queue (non-blocking version for rdp/inp). 756 * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters. 757 */ 758 private Object getHeadNonBlocking(K key, boolean remove) { 759 KeyEntry entry = entries.get(key); 760 if (entry == null) 761 return null; 762 763 entry.lock.lock(); 764 try { 765 if (entries.get(key) != entry) 766 return null; 767 768 Object result = getHead(entry, key, remove); 769 770 if (remove) { 771 // If remove emptied the queue, postFetchHousekeeping will handle safe removal/signals. 772 postFetchHousekeeping(key, entry); 773 } 774 return result; 775 } finally { 776 entry.lock.unlock(); 777 } 778 } 779 780 /** 781 * Get object matching template (non-blocking version for rdp/inp). 782 * Uses safe empty-entry cleanup via postFetchHousekeeping to avoid orphaning waiters. 783 */ 784 private Object getObjectNonBlocking(Template tmpl, boolean remove) { 785 K key = (K) tmpl.getKey(); 786 KeyEntry entry = entries.get(key); 787 if (entry == null) 788 return null; 789 790 entry.lock.lock(); 791 try { 792 if (entries.get(key) != entry) 793 return null; 794 795 Object result = getObject(entry, key, tmpl, remove); 796 797 if (remove) { 798 postFetchHousekeeping(key, entry); 799 } 800 return result; 801 } finally { 802 entry.lock.unlock(); 803 } 804 } 805 806 /** 807 * Get head of queue. 808 * MUST be called with entry.lock held. 809 */ 810 private Object getHead(KeyEntry entry, K key, boolean remove) { 811 Object result = null; 812 boolean wasExpirable = false; 813 814 while (result == null && !entry.queue.isEmpty()) { 815 Object obj = entry.queue.getFirst(); 816 817 if (obj instanceof Expirable) { 818 Object value = ((Expirable) obj).getValue(); 819 wasExpirable = true; 820 821 if (value == null) { 822 entry.queue.removeFirst(); 823 continue; 824 } else { 825 result = value; 826 } 827 } else { 828 result = obj; 829 } 830 831 if (remove && result != null) { 832 entry.queue.removeFirst(); 833 } 834 } 835 836 if (entry.queue.isEmpty()) { 837 entry.hasExpirable = false; 838 if (wasExpirable) 839 unregisterExpirable(key); 840 } 841 842 return result; 843 } 844 845 /** 846 * Get object matching template. 847 * MUST be called with entry.lock held. 848 */ 849 private Object getObject(KeyEntry entry, K key, Template tmpl, boolean remove) { 850 Object result = null; 851 Iterator<Object> iterator = entry.queue.iterator(); 852 boolean wasExpirable = false; 853 854 while (iterator.hasNext()) { 855 Object obj = iterator.next(); 856 857 if (obj instanceof Expirable) { 858 Object value = ((Expirable) obj).getValue(); 859 if (value == null) { 860 iterator.remove(); 861 wasExpirable = true; 862 continue; 863 } else { 864 obj = value; 865 } 866 } 867 868 if (tmpl.equals(obj)) { 869 result = obj; 870 if (remove) 871 iterator.remove(); 872 break; 873 } 874 } 875 876 if (entry.queue.isEmpty()) { 877 entry.hasExpirable = false; 878 if (wasExpirable) 879 unregisterExpirable(key); 880 } 881 882 return result; 883 } 884 885 private void ensureOpen() { 886 if (closed.get()) 887 throw new IllegalStateException("LSpace is closed"); 888 } 889 890 // ========== Listener-space helpers ========== 891 892 private LocalSpace<K, SpaceListener<K,V>> getSL() { 893 ensureOpen(); 894 if (sl == null) { 895 synchronized (this) { 896 ensureOpen(); 897 if (sl == null) { 898 sl = new LSpace<>(); 899 cleaningState.sl = (AutoCloseable) sl; 900 } 901 } 902 } 903 return sl; 904 } 905 906 private void registerExpirable(K k, long t) { 907 int g = (t > GCLONG) ? 1 : 0; 908 synchronized (expLocks[g]) { 909 expirables[g].add(k); 910 } 911 } 912 913 private void unregisterExpirable(K k) { 914 synchronized (expLocks[0]) { 915 synchronized (expLocks[1]) { 916 expirables[0].remove(k); 917 expirables[1].remove(k); 918 } 919 } 920 } 921 922 // ========== Blocking core (shared) ========== 923 924 @FunctionalInterface 925 private interface Fetcher { 926 Object fetch(KeyEntry entry); 927 } 928 929 /** 930 * Common blocking wait-loop for rd/in operations (key or template). 931 * 932 * Ensures: 933 * - No premature null returns due to entry replacement (retries outer loop). 934 * - Timed variants remove empty computeIfAbsent-created entries on timeout/interrupt (postFetchHousekeeping). 935 * - Does not orphan waiters because postFetchHousekeeping refuses to remove if hasValue waiters exist. 936 */ 937 @SuppressWarnings("unchecked") 938 private V awaitValue(K key, Fetcher fetcher, long timeoutMillis) { 939 ensureOpen(); 940 941 final boolean timed = timeoutMillis != NO_TIMEOUT; 942 final long deadlineNanos = timed ? System.nanoTime() + timeoutMillis * ONE_MILLION : 0L; 943 944 for (;;) { 945 final KeyEntry entry = entries.computeIfAbsent(key, k -> new KeyEntry()); 946 947 entry.lock.lock(); 948 try { 949 if (entries.get(key) != entry) 950 continue; 951 952 for (;;) { 953 Object obj = fetcher.fetch(entry); 954 if (obj != null) { 955 postFetchHousekeeping(key, entry); 956 return (V) obj; 957 } 958 959 if (!timed) { 960 try { 961 entry.hasValue.await(); 962 } catch (InterruptedException ie) { 963 Thread.currentThread().interrupt(); 964 // Avoid leaking an empty entry created by computeIfAbsent for a waiter that got interrupted. 965 postFetchHousekeeping(key, entry); 966 break; 967 } 968 } else { 969 try { 970 long remaining = entry.hasValue.awaitNanos(deadlineNanos - System.nanoTime()); 971 if (remaining <= 0) { 972 // Avoid leaking empty entries created by computeIfAbsent when timing out. 973 postFetchHousekeeping(key, entry); 974 return null; 975 } 976 } catch (InterruptedException ie) { 977 Thread.currentThread().interrupt(); 978 postFetchHousekeeping(key, entry); 979 return null; 980 } 981 } 982 983 // If someone removed/replaced the entry, restart outer loop to bind to the current entry. 984 if (entries.get(key) != entry) 985 break; 986 } // inner loop 987 } finally { 988 entry.lock.unlock(); 989 } 990 } // outer loop 991 } 992 993 /** 994 * Housekeeping that must run under entry.lock: 995 * - Wake nrd waiters when queue becomes empty. 996 * - Remove entry only if queue is empty AND no waiters are parked on hasValue (prevents orphaning). 997 */ 998 private void postFetchHousekeeping(K key, KeyEntry entry) { 999 if (!entry.queue.isEmpty()) 1000 return; 1001 1002 // Always wake nrd waiters when empty. 1003 entry.isEmpty.signalAll(); 1004 1005 // Remove only when safe (no hasValue waiters). 1006 if (entries.get(key) == entry && !entry.lock.hasWaiters(entry.hasValue)) { 1007 entries.remove(key, entry); 1008 } 1009 } 1010 1011 /** 1012 * Expirable wrapper for values with timeout. 1013 */ 1014 static class Expirable implements Comparable, Serializable { 1015 private static final long serialVersionUID = 0xA7F22BF5; 1016 1017 Object value; 1018 long expires; 1019 1020 Expirable(Object value, long expires) { 1021 super(); 1022 this.value = value; 1023 this.expires = expires; 1024 } 1025 1026 boolean isExpired() { 1027 return (System.nanoTime() - expires) > 0; 1028 } 1029 1030 @Override 1031 public String toString() { 1032 return getClass().getName() 1033 + "@" + Integer.toHexString(hashCode()) 1034 + ",value=" + value.toString() 1035 + ",expired=" + isExpired(); 1036 } 1037 1038 Object getValue() { 1039 return isExpired() ? null : value; 1040 } 1041 1042 @Override 1043 public int compareTo(Object other) { 1044 long diff = this.expires - ((Expirable) other).expires; 1045 return diff > 0 ? 1 : diff < 0 ? -1 : 0; 1046 } 1047 } 1048 1049 private static final class CleaningState implements Runnable { 1050 private final ScheduledFuture<?> gcFuture; 1051 private final ConcurrentHashMap<?,?> entries; 1052 private final Set<?>[] expirables; 1053 1054 // We keep a reference to sl so we can cancel its scheduler too. 1055 // This does not introduce a new retention path; it already hangs off the parent space. 1056 private volatile AutoCloseable sl; // store as AutoCloseable to avoid generics pain 1057 1058 private final AtomicBoolean cleaned = new AtomicBoolean(false); 1059 1060 private CleaningState(ScheduledFuture<?> gcFuture, 1061 ConcurrentHashMap<?,?> entries, 1062 Set<?>[] expirables) { 1063 this.gcFuture = gcFuture; 1064 this.entries = entries; 1065 this.expirables = expirables; 1066 } 1067 1068 @Override 1069 public void run() { 1070 if (!cleaned.compareAndSet(false, true)) 1071 return; 1072 1073 try { 1074 if (gcFuture != null) 1075 gcFuture.cancel(false); 1076 } catch (Throwable ignored) { } 1077 1078 // Best-effort close of the listener space. 1079 AutoCloseable s = sl; 1080 if (s != null) { 1081 try { 1082 s.close(); 1083 } catch (Throwable ignored) { } 1084 sl = null; 1085 } 1086 1087 try { 1088 entries.clear(); 1089 } catch (Throwable ignored) { } 1090 1091 try { 1092 expirables[0].clear(); 1093 } catch (Throwable ignored) { } 1094 try { 1095 expirables[1].clear(); 1096 } catch (Throwable ignored) { } 1097 } 1098 } 1099 1100 // ========================= 1101 // Test-only visibility hooks 1102 // ========================= 1103 // Package-private on purpose (same package as tests). 1104 // These methods are intended strictly for unit tests that validate internal invariants. 1105 boolean isExpirableTrackedForTest(K key) { 1106 synchronized (expLocks[0]) { 1107 synchronized (expLocks[1]) { 1108 return expirables[0].contains(key) || expirables[1].contains(key); 1109 } 1110 } 1111 } 1112 1113 boolean isExpirableTrackedForTest(K key, int generation) { 1114 synchronized (expLocks[generation]) { 1115 return expirables[generation].contains(key); 1116 } 1117 } 1118 1119 void forceTrackExpirableForTest(K key, int generation) { 1120 synchronized (expLocks[generation]) { 1121 expirables[generation].add(key); 1122 } 1123 } 1124}