001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020import org.jpos.jfr.SpaceEvent; 021import org.jpos.util.Loggeable; 022import java.io.PrintStream; 023import java.io.Serializable; 024import java.time.Duration; 025import java.time.Instant; 026import java.util.*; 027import java.util.concurrent.TimeUnit; 028 029/** 030 * TSpace implementation 031 * @author Alejandro Revilla 032 * @version $Revision$ $Date$ 033 * @since !.4.9 034 035 * @param <K> the key type 036 * @param <V> the value type 037 */ 038 039@SuppressWarnings("unchecked") 040public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable { 041 /** Backing map keyed by user-supplied keys; values are entries or lists of entries. */ 042 protected Map entries; 043 /** Per-key space listeners; itself a {@link TSpace} so it inherits the dispatch model. */ 044 protected TSpace sl; // space listeners 045 /** Periodic interval, in milliseconds, between background GC sweeps. */ 046 public static final long GCDELAY = 5*1000; 047 private static final long GCLONG = 60_000L; 048 private static final long NRD_RESOLUTION = 500L; 049 private static final int MAX_ENTRIES_IN_DUMP = 1000; 050 private static final long ONE_MILLION = 1_000_000L; // multiplier millis --> nanos 051 private final Set[] expirables; 052 private long lastLongGC = System.nanoTime(); 053 054 /** Default constructor. */ 055 public TSpace () { 056 super(); 057 entries = new HashMap (); 058 expirables = new Set[] { new HashSet<K>(), new HashSet<K>() }; 059 SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS); 060 } 061 062 @Override 063 public void out (K key, V value) { 064 var jfr = new SpaceEvent("out", "" + key); 065 jfr.begin(); 066 if (key == null || value == null) 067 throw new NullPointerException ("key=" + key + ", value=" + value); 068 synchronized(this) { 069 List l = getList(key); 070 l.add (value); 071 if (l.size() == 1) 072 this.notifyAll (); 073 } 074 if (sl != null) 075 notifyListeners(key, value); 076 jfr.commit(); 077 } 078 079 @Override 080 public void out (K key, V value, long timeout) { 081 var jfr = new SpaceEvent("out:tim", "" + key); 082 jfr.begin(); 083 084 if (key == null || value == null) { 085 jfr.commit(); 086 throw new NullPointerException("key=" + key + ", value=" + value); 087 } 088 Object v = value; 089 if (timeout > 0) { 090 v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION)); 091 } 092 synchronized (this) { 093 List l = getList(key); 094 l.add(v); 095 if (l.size() == 1) 096 this.notifyAll (); 097 if (timeout > 0) { 098 registerExpirable(key, timeout); 099 } 100 } 101 if (sl != null) 102 notifyListeners(key, value); 103 jfr.commit(); 104 } 105 106 @Override 107 public synchronized V rdp (Object key) { 108 var jfr = new SpaceEvent("rdp", "" + key); 109 jfr.begin(); 110 try { 111 if (key instanceof Template) 112 return (V) getObject ((Template) key, false); 113 return (V) getHead (key, false); 114 } finally { 115 jfr.commit(); 116 } 117 } 118 119 @Override 120 public synchronized V inp (Object key) { 121 var jfr = new SpaceEvent("inp", "" + key); 122 jfr.begin(); 123 try { 124 if (key instanceof Template) 125 return (V) getObject ((Template) key, true); 126 return (V) getHead (key, true); 127 } finally { 128 jfr.commit(); 129 } 130 } 131 132 @Override 133 public synchronized V in (Object key) { 134 Object obj; 135 while ((obj = inp (key)) == null) { 136 try { 137 this.wait (); 138 } catch (InterruptedException e) { } 139 } 140 return (V) obj; 141 } 142 143 @Override 144 public synchronized V in (Object key, long timeout) { 145 V obj; 146 long now = System.nanoTime(); 147 long to = now + timeout * ONE_MILLION; 148 long waitFor; 149 while ( (obj = inp (key)) == null && 150 (waitFor = (to - System.nanoTime())) >= 0 ) 151 { 152 try { 153 this.wait(Math.max(waitFor / ONE_MILLION, 1L)); 154 } catch (InterruptedException e) { } 155 } 156 return obj; 157 } 158 159 @Override 160 public synchronized V rd (Object key) { 161 Object obj; 162 while ((obj = rdp (key)) == null) { 163 try { 164 this.wait (); 165 } catch (InterruptedException e) { } 166 } 167 return (V) obj; 168 } 169 170 @Override 171 public synchronized V rd (Object key, long timeout) { 172 V obj; 173 long now = System.nanoTime(); 174 long to = now + (timeout * ONE_MILLION); 175 long waitFor; 176 while ( (obj = rdp (key)) == null && 177 (waitFor = (to - System.nanoTime())) >= 0 ) 178 { 179 try { 180 this.wait(Math.max(waitFor / ONE_MILLION, 1L)); 181 } catch (InterruptedException e) { } 182 } 183 return obj; 184 } 185 186 @Override 187 public synchronized void nrd (Object key) { 188 while (rdp (key) != null) { 189 try { 190 this.wait (NRD_RESOLUTION); 191 } catch (InterruptedException ignored) { } 192 } 193 } 194 195 @Override 196 public synchronized V nrd (Object key, long timeout) { 197 V obj; 198 long now = System.nanoTime(); 199 long to = now + (timeout * ONE_MILLION); 200 long waitFor; 201 while ( (obj = rdp (key)) != null && 202 (waitFor = (to - System.nanoTime())) >= 0 ) 203 { 204 try { 205 this.wait(Math.min(NRD_RESOLUTION, 206 Math.max(waitFor / ONE_MILLION, 1L))); 207 } catch (InterruptedException ignored) { } 208 } 209 return obj; 210 } 211 212 @Override 213 public void run () { 214 try { 215 gc(); 216 } catch (Exception e) { 217 e.printStackTrace(); // this should never happen 218 } 219 } 220 221 /** 222 * Sweeps the short-lived expirable set, and the long-lived set when its 223 * sweep interval has elapsed. 224 */ 225 public void gc () { 226 gc(0); 227 if (System.nanoTime() - lastLongGC > GCLONG*ONE_MILLION) { 228 gc(1); 229 lastLongGC = System.nanoTime(); 230 } 231 } 232 233 private void gc (int generation) { 234 var jfr = new SpaceEvent("gc", Integer.toString(generation)); 235 jfr.begin(); 236 237 Set<K> exps; 238 synchronized (this) { 239 exps = expirables[generation]; 240 expirables[generation] = new HashSet<K>(); 241 } 242 for (K k : exps) { 243 if (rdp(k) != null) { 244 synchronized (this) { 245 expirables[generation].add(k); 246 } 247 } 248 Thread.yield (); 249 } 250 if (sl != null) { 251 synchronized (this) { 252 if (sl != null && sl.isEmpty()) 253 sl = null; 254 } 255 } 256 jfr.commit(); 257 } 258 259 @Override 260 public synchronized int size (Object key) { 261 var jfr = new SpaceEvent("size", "" + key); 262 jfr.begin(); 263 264 int size = 0; 265 List l = (List) entries.get (key); 266 if (l != null) 267 size = l.size(); 268 jfr.commit(); 269 return size; 270 } 271 272 @Override 273 public synchronized void addListener (Object key, SpaceListener listener) { 274 getSL().out (key, listener); 275 } 276 277 @Override 278 public synchronized void addListener 279 (Object key, SpaceListener listener, long timeout) 280 { 281 getSL().out (key, listener, timeout); 282 } 283 284 @Override 285 public synchronized void removeListener 286 (Object key, SpaceListener listener) 287 { 288 if (sl != null) { 289 sl.inp (new ObjectTemplate (key, listener)); 290 } 291 } 292 /** 293 * Indicates whether the space currently holds any entries. 294 * 295 * @return {@code true} if no entries are stored 296 */ 297 public boolean isEmpty() { 298 return entries.isEmpty(); 299 } 300 301 @Override 302 public synchronized Set<K> getKeySet() { 303 return new HashSet<K>(entries.keySet()); 304 } 305 306 /** 307 * Returns a space-separated list of every key currently stored. 308 * 309 * @return all keys, joined by single-space separators 310 */ 311 public String getKeysAsString () { 312 StringBuilder sb = new StringBuilder(); 313 Object[] keys; 314 synchronized (this) { 315 keys = entries.keySet().toArray(); 316 } 317 for (int i=0; i<keys.length; i++) { 318 if (i > 0) 319 sb.append (' '); 320 sb.append (keys[i]); 321 } 322 return sb.toString(); 323 } 324 325 @Override 326 public void dump(PrintStream p, String indent) { 327 var jfr = new SpaceEvent("dump", ""); 328 jfr.begin(); 329 330 Object[] keys; 331 int size = entries.size(); 332 if (size > MAX_ENTRIES_IN_DUMP * 100) { 333 p.printf ("%sWARNING - space too big, size=%d%n", indent, size); 334 jfr.commit(); 335 return; 336 } 337 synchronized (this) { 338 keys = entries.keySet().toArray(); 339 } 340 int i=0; 341 for (Object key : keys) { 342 p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key); 343 if (i++ > MAX_ENTRIES_IN_DUMP) { 344 p.printf ("%s...%n", indent); 345 p.printf ("%s...%n", indent); 346 break; 347 } 348 } 349 p.printf("%s key-count: %d%n", indent, keys.length); 350 int exp0, exp1; 351 synchronized (this) { 352 exp0 = expirables[0].size(); 353 exp1 = expirables[1].size(); 354 } 355 p.printf("%s gcinfo: %d,%d%n", indent, exp0, exp1); 356 jfr.commit(); 357 } 358 359 /** 360 * Notifies every listener registered against {@code key} of an entry change. 361 * 362 * @param key entry key 363 * @param value the value just written or {@code null} when entries were removed 364 */ 365 public void notifyListeners (Object key, Object value) { 366 var jfr = new SpaceEvent("notify", "" + key); 367 jfr.begin(); 368 369 Object[] listeners = null; 370 synchronized (this) { 371 if (sl == null) 372 return; 373 List l = (List) sl.entries.get (key); 374 if (l != null) 375 listeners = l.toArray(); 376 } 377 if (listeners != null) { 378 for (Object listener : listeners) { 379 Object o = listener; 380 if (o instanceof Expirable) 381 o = ((Expirable) o).getValue(); 382 if (o instanceof SpaceListener) 383 ((SpaceListener) o).notify(key, value); 384 } 385 } 386 jfr.commit(); 387 } 388 389 @Override 390 public void push (K key, V value) { 391 if (key == null || value == null) 392 throw new NullPointerException ("key=" + key + ", value=" + value); 393 var jfr = new SpaceEvent("push", "" + key); 394 jfr.begin(); 395 synchronized(this) { 396 List l = getList(key); 397 boolean wasEmpty = l.isEmpty(); 398 l.add (0, value); 399 if (wasEmpty) 400 this.notifyAll (); 401 } 402 if (sl != null) 403 notifyListeners(key, value); 404 jfr.commit(); 405 } 406 407 @Override 408 public void push (K key, V value, long timeout) { 409 if (key == null || value == null) 410 throw new NullPointerException ("key=" + key + ", value=" + value); 411 var jfr = new SpaceEvent("push:tim", "" + key); 412 jfr.begin(); 413 Object v = value; 414 if (timeout > 0) { 415 v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION)); 416 } 417 synchronized (this) { 418 List l = getList(key); 419 boolean wasEmpty = l.isEmpty(); 420 l.add (0, v); 421 if (wasEmpty) 422 this.notifyAll (); 423 if (timeout > 0) { 424 registerExpirable(key, timeout); 425 } 426 } 427 if (sl != null) 428 notifyListeners(key, value); 429 jfr.commit(); 430 } 431 432 @Override 433 public void put (K key, V value) { 434 if (key == null || value == null) 435 throw new NullPointerException ("key=" + key + ", value=" + value); 436 437 var jfr = new SpaceEvent("put", "" + key); 438 jfr.begin(); 439 synchronized (this) { 440 List l = new LinkedList(); 441 l.add (value); 442 entries.put (key, l); 443 this.notifyAll (); 444 } 445 if (sl != null) 446 notifyListeners(key, value); 447 jfr.commit(); 448 } 449 450 @Override 451 public void put (K key, V value, long timeout) { 452 if (key == null || value == null) 453 throw new NullPointerException ("key=" + key + ", value=" + value); 454 var jfr = new SpaceEvent("put:tim", "" + key); 455 jfr.begin(); 456 457 Object v = value; 458 if (timeout > 0) { 459 v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION)); 460 } 461 synchronized (this) { 462 List l = new LinkedList(); 463 l.add (v); 464 entries.put (key, l); 465 this.notifyAll (); 466 if (timeout > 0) { 467 registerExpirable(key, timeout); 468 } 469 } 470 if (sl != null) 471 notifyListeners(key, value); 472 jfr.commit(); 473 } 474 475 @Override 476 public boolean existAny (K[] keys) { 477 for (K key : keys) { 478 if (rdp(key) != null) 479 return true; 480 } 481 return false; 482 } 483 484 @Override 485 public boolean existAny (K[] keys, long timeout) { 486 long now = System.nanoTime(); 487 long to = now + (timeout * ONE_MILLION); 488 long waitFor; 489 while ((waitFor = (to - System.nanoTime())) >= 0) { 490 if (existAny (keys)) 491 return true; 492 synchronized (this) { 493 try { 494 this.wait(Math.max(waitFor / ONE_MILLION, 1L)); 495 } catch (InterruptedException e) { } 496 } 497 } 498 return false; 499 } 500 501 /** 502 * unstandard method (required for space replication) - use with care 503 * @return underlying entry map 504 */ 505 public Map getEntries () { 506 return entries; 507 } 508 509 /** 510 * unstandard method (required for space replication) - use with care 511 * @param entries underlying entry map 512 */ 513 public void setEntries (Map entries) { 514 this.entries = entries; 515 } 516 517 private List getList (Object key) { 518 List l = (List) entries.get (key); 519 if (l == null) 520 entries.put (key, l = new LinkedList()); 521 return l; 522 } 523 524 private Object getHead (Object key, boolean remove) { 525 Object obj = null; 526 List l = (List) entries.get (key); 527 boolean wasExpirable = false; 528 while (obj == null && l != null && l.size() > 0) { 529 obj = l.get(0); 530 if (obj instanceof Expirable) { 531 obj = ((Expirable) obj).getValue(); 532 wasExpirable = true; 533 } 534 if (obj == null) { 535 l.remove (0); 536 if (l.isEmpty()) { 537 entries.remove (key); 538 } 539 } 540 } 541 if (l != null) { 542 if (remove && obj != null) 543 l.remove (0); 544 if (l.isEmpty()) { 545 entries.remove (key); 546 if (wasExpirable) 547 unregisterExpirable(key); 548 } 549 } 550 return obj; 551 } 552 553 private Object getObject (Template tmpl, boolean remove) { 554 Object obj = null; 555 Object key = tmpl.getKey(); 556 List l = (List) entries.get (key); 557 if (l != null) { 558 Iterator iter = l.iterator(); 559 boolean wasExpirable = false; 560 while (iter.hasNext()) { 561 obj = iter.next(); 562 if (obj instanceof Expirable) { 563 obj = ((Expirable) obj).getValue(); 564 if (obj == null) { 565 iter.remove(); 566 wasExpirable = true; 567 continue; 568 } 569 } 570 if (tmpl.equals (obj)) { 571 if (remove) 572 iter.remove(); 573 break; 574 } else 575 obj = null; 576 } 577 if (l.isEmpty()) { 578 entries.remove (key); 579 if (wasExpirable) 580 unregisterExpirable(key); 581 } 582 } 583 return obj; 584 } 585 586 private TSpace getSL() { 587 synchronized (this) { 588 if (sl == null) 589 sl = new TSpace(); 590 } 591 return sl; 592 } 593 594 private void registerExpirable(K k, long t) { 595 expirables[t > GCLONG ? 1 : 0].add(k); 596 } 597 598 private void unregisterExpirable(Object k) { 599 for (Set<K> s : expirables) 600 s.remove(k); 601 } 602 603 static class Expirable implements Comparable, Serializable { 604 605 private static final long serialVersionUID = 0xA7F22BF5; 606 607 Object value; 608 609 /** 610 * When to expire, in the future, as given by monotonic System.nanoTime().<br> 611 * IMPORTANT: always use a nanosec offset from System.nanoTime()! 612 */ 613 long expires; 614 615 Expirable (Object value, long expires) { 616 super(); 617 this.value = value; 618 this.expires = expires; 619 } 620 621 boolean isExpired () { 622 return (System.nanoTime() - expires) > 0; 623 } 624 625 @Override 626 public String toString() { 627 return getClass().getName() 628 + "@" + Integer.toHexString(hashCode()) 629 + ",value=" + value.toString() 630 + ",expired=" + isExpired (); 631 } 632 633 Object getValue() { 634 return isExpired() ? null : value; 635 } 636 637 @Override 638 public int compareTo (Object other) { 639 long diff = this.expires - ((Expirable)other).expires; 640 return diff > 0 ? 1 : 641 diff < 0 ? -1 : 642 0; 643 } 644 } 645}