001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020import org.jpos.jfr.SpaceEvent; 021import org.jpos.util.Loggeable; 022import java.io.PrintStream; 023import java.io.Serializable; 024import java.time.Duration; 025import java.time.Instant; 026import java.util.*; 027import java.util.concurrent.TimeUnit; 028 029/** 030 * TSpace implementation 031 * @author Alejandro Revilla 032 * @version $Revision$ $Date$ 033 * @since !.4.9 034 */ 035 036@SuppressWarnings("unchecked") 037public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable { 038 protected Map entries; 039 protected TSpace sl; // space listeners 040 public static final long GCDELAY = 5*1000; 041 private static final long GCLONG = 60_000L; 042 private static final long NRD_RESOLUTION = 500L; 043 private static final int MAX_ENTRIES_IN_DUMP = 1000; 044 private static final long ONE_MILLION = 1_000_000L; // multiplier millis --> nanos 045 private final Set[] expirables; 046 private long lastLongGC = System.nanoTime(); 047 048 public TSpace () { 049 super(); 050 entries = new HashMap (); 051 expirables = new Set[] { new HashSet<K>(), new HashSet<K>() }; 052 SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS); 053 } 054 055 @Override 056 public void out (K key, V value) { 057 var jfr = new SpaceEvent("out", "" + key); 058 jfr.begin(); 059 if (key == null || value == null) 060 throw new NullPointerException ("key=" + key + ", value=" + value); 061 synchronized(this) { 062 List l = getList(key); 063 l.add (value); 064 if (l.size() == 1) 065 this.notifyAll (); 066 } 067 if (sl != null) 068 notifyListeners(key, value); 069 jfr.commit(); 070 } 071 072 @Override 073 public void out (K key, V value, long timeout) { 074 var jfr = new SpaceEvent("out:tim", "" + key); 075 jfr.begin(); 076 077 if (key == null || value == null) { 078 jfr.commit(); 079 throw new NullPointerException("key=" + key + ", value=" + value); 080 } 081 Object v = value; 082 if (timeout > 0) { 083 v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION)); 084 } 085 synchronized (this) { 086 List l = getList(key); 087 l.add(v); 088 if (l.size() == 1) 089 this.notifyAll (); 090 if (timeout > 0) { 091 registerExpirable(key, timeout); 092 } 093 } 094 if (sl != null) 095 notifyListeners(key, value); 096 jfr.commit(); 097 } 098 099 @Override 100 public synchronized V rdp (Object key) { 101 var jfr = new SpaceEvent("rdp", "" + key); 102 jfr.begin(); 103 try { 104 if (key instanceof Template) 105 return (V) getObject ((Template) key, false); 106 return (V) getHead (key, false); 107 } finally { 108 jfr.commit(); 109 } 110 } 111 112 @Override 113 public synchronized V inp (Object key) { 114 var jfr = new SpaceEvent("inp", "" + key); 115 jfr.begin(); 116 try { 117 if (key instanceof Template) 118 return (V) getObject ((Template) key, true); 119 return (V) getHead (key, true); 120 } finally { 121 jfr.commit(); 122 } 123 } 124 125 @Override 126 public synchronized V in (Object key) { 127 Object obj; 128 while ((obj = inp (key)) == null) { 129 try { 130 this.wait (); 131 } catch (InterruptedException e) { } 132 } 133 return (V) obj; 134 } 135 136 @Override 137 public synchronized V in (Object key, long timeout) { 138 V obj; 139 long now = System.nanoTime(); 140 long to = now + timeout * ONE_MILLION; 141 long waitFor; 142 while ( (obj = inp (key)) == null && 143 (waitFor = (to - System.nanoTime())) >= 0 ) 144 { 145 try { 146 this.wait(Math.max(waitFor / ONE_MILLION, 1L)); 147 } catch (InterruptedException e) { } 148 } 149 return obj; 150 } 151 152 @Override 153 public synchronized V rd (Object key) { 154 Object obj; 155 while ((obj = rdp (key)) == null) { 156 try { 157 this.wait (); 158 } catch (InterruptedException e) { } 159 } 160 return (V) obj; 161 } 162 163 @Override 164 public synchronized V rd (Object key, long timeout) { 165 V obj; 166 long now = System.nanoTime(); 167 long to = now + (timeout * ONE_MILLION); 168 long waitFor; 169 while ( (obj = rdp (key)) == null && 170 (waitFor = (to - System.nanoTime())) >= 0 ) 171 { 172 try { 173 this.wait(Math.max(waitFor / ONE_MILLION, 1L)); 174 } catch (InterruptedException e) { } 175 } 176 return obj; 177 } 178 179 @Override 180 public synchronized void nrd (Object key) { 181 while (rdp (key) != null) { 182 try { 183 this.wait (NRD_RESOLUTION); 184 } catch (InterruptedException ignored) { } 185 } 186 } 187 188 @Override 189 public synchronized V nrd (Object key, long timeout) { 190 V obj; 191 long now = System.nanoTime(); 192 long to = now + (timeout * ONE_MILLION); 193 long waitFor; 194 while ( (obj = rdp (key)) != null && 195 (waitFor = (to - System.nanoTime())) >= 0 ) 196 { 197 try { 198 this.wait(Math.min(NRD_RESOLUTION, 199 Math.max(waitFor / ONE_MILLION, 1L))); 200 } catch (InterruptedException ignored) { } 201 } 202 return obj; 203 } 204 205 @Override 206 public void run () { 207 try { 208 gc(); 209 } catch (Exception e) { 210 e.printStackTrace(); // this should never happen 211 } 212 } 213 214 public void gc () { 215 gc(0); 216 if (System.nanoTime() - lastLongGC > GCLONG*ONE_MILLION) { 217 gc(1); 218 lastLongGC = System.nanoTime(); 219 } 220 } 221 222 private void gc (int generation) { 223 var jfr = new SpaceEvent("gc", Integer.toString(generation)); 224 jfr.begin(); 225 226 Set<K> exps; 227 synchronized (this) { 228 exps = expirables[generation]; 229 expirables[generation] = new HashSet<K>(); 230 } 231 for (K k : exps) { 232 if (rdp(k) != null) { 233 synchronized (this) { 234 expirables[generation].add(k); 235 } 236 } 237 Thread.yield (); 238 } 239 if (sl != null) { 240 synchronized (this) { 241 if (sl != null && sl.isEmpty()) 242 sl = null; 243 } 244 } 245 jfr.commit(); 246 } 247 248 @Override 249 public synchronized int size (Object key) { 250 var jfr = new SpaceEvent("size", "" + key); 251 jfr.begin(); 252 253 int size = 0; 254 List l = (List) entries.get (key); 255 if (l != null) 256 size = l.size(); 257 jfr.commit(); 258 return size; 259 } 260 261 @Override 262 public synchronized void addListener (Object key, SpaceListener listener) { 263 getSL().out (key, listener); 264 } 265 266 @Override 267 public synchronized void addListener 268 (Object key, SpaceListener listener, long timeout) 269 { 270 getSL().out (key, listener, timeout); 271 } 272 273 @Override 274 public synchronized void removeListener 275 (Object key, SpaceListener listener) 276 { 277 if (sl != null) { 278 sl.inp (new ObjectTemplate (key, listener)); 279 } 280 } 281 public boolean isEmpty() { 282 return entries.isEmpty(); 283 } 284 285 @Override 286 public synchronized Set<K> getKeySet() { 287 return new HashSet<K>(entries.keySet()); 288 } 289 290 public String getKeysAsString () { 291 StringBuilder sb = new StringBuilder(); 292 Object[] keys; 293 synchronized (this) { 294 keys = entries.keySet().toArray(); 295 } 296 for (int i=0; i<keys.length; i++) { 297 if (i > 0) 298 sb.append (' '); 299 sb.append (keys[i]); 300 } 301 return sb.toString(); 302 } 303 304 @Override 305 public void dump(PrintStream p, String indent) { 306 var jfr = new SpaceEvent("dump", ""); 307 jfr.begin(); 308 309 Object[] keys; 310 int size = entries.size(); 311 if (size > MAX_ENTRIES_IN_DUMP * 100) { 312 p.printf ("%sWARNING - space too big, size=%d%n", indent, size); 313 jfr.commit(); 314 return; 315 } 316 synchronized (this) { 317 keys = entries.keySet().toArray(); 318 } 319 int i=0; 320 for (Object key : keys) { 321 p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key); 322 if (i++ > MAX_ENTRIES_IN_DUMP) { 323 p.printf ("%s...%n", indent); 324 p.printf ("%s...%n", indent); 325 break; 326 } 327 } 328 p.printf("%s key-count: %d%n", indent, keys.length); 329 int exp0, exp1; 330 synchronized (this) { 331 exp0 = expirables[0].size(); 332 exp1 = expirables[1].size(); 333 } 334 p.printf("%s gcinfo: %d,%d%n", indent, exp0, exp1); 335 jfr.commit(); 336 } 337 338 public void notifyListeners (Object key, Object value) { 339 var jfr = new SpaceEvent("notify", "" + key); 340 jfr.begin(); 341 342 Object[] listeners = null; 343 synchronized (this) { 344 if (sl == null) 345 return; 346 List l = (List) sl.entries.get (key); 347 if (l != null) 348 listeners = l.toArray(); 349 } 350 if (listeners != null) { 351 for (Object listener : listeners) { 352 Object o = listener; 353 if (o instanceof Expirable) 354 o = ((Expirable) o).getValue(); 355 if (o instanceof SpaceListener) 356 ((SpaceListener) o).notify(key, value); 357 } 358 } 359 jfr.commit(); 360 } 361 362 @Override 363 public void push (K key, V value) { 364 if (key == null || value == null) 365 throw new NullPointerException ("key=" + key + ", value=" + value); 366 var jfr = new SpaceEvent("push", "" + key); 367 jfr.begin(); 368 synchronized(this) { 369 List l = getList(key); 370 boolean wasEmpty = l.isEmpty(); 371 l.add (0, value); 372 if (wasEmpty) 373 this.notifyAll (); 374 } 375 if (sl != null) 376 notifyListeners(key, value); 377 jfr.commit(); 378 } 379 380 @Override 381 public void push (K key, V value, long timeout) { 382 if (key == null || value == null) 383 throw new NullPointerException ("key=" + key + ", value=" + value); 384 var jfr = new SpaceEvent("push:tim", "" + key); 385 jfr.begin(); 386 Object v = value; 387 if (timeout > 0) { 388 v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION)); 389 } 390 synchronized (this) { 391 List l = getList(key); 392 boolean wasEmpty = l.isEmpty(); 393 l.add (0, v); 394 if (wasEmpty) 395 this.notifyAll (); 396 if (timeout > 0) { 397 registerExpirable(key, timeout); 398 } 399 } 400 if (sl != null) 401 notifyListeners(key, value); 402 jfr.commit(); 403 } 404 405 @Override 406 public void put (K key, V value) { 407 if (key == null || value == null) 408 throw new NullPointerException ("key=" + key + ", value=" + value); 409 410 var jfr = new SpaceEvent("put", "" + key); 411 jfr.begin(); 412 synchronized (this) { 413 List l = new LinkedList(); 414 l.add (value); 415 entries.put (key, l); 416 this.notifyAll (); 417 } 418 if (sl != null) 419 notifyListeners(key, value); 420 jfr.commit(); 421 } 422 423 @Override 424 public void put (K key, V value, long timeout) { 425 if (key == null || value == null) 426 throw new NullPointerException ("key=" + key + ", value=" + value); 427 var jfr = new SpaceEvent("put:tim", "" + key); 428 jfr.begin(); 429 430 Object v = value; 431 if (timeout > 0) { 432 v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION)); 433 } 434 synchronized (this) { 435 List l = new LinkedList(); 436 l.add (v); 437 entries.put (key, l); 438 this.notifyAll (); 439 if (timeout > 0) { 440 registerExpirable(key, timeout); 441 } 442 } 443 if (sl != null) 444 notifyListeners(key, value); 445 jfr.commit(); 446 } 447 448 @Override 449 public boolean existAny (K[] keys) { 450 for (K key : keys) { 451 if (rdp(key) != null) 452 return true; 453 } 454 return false; 455 } 456 457 @Override 458 public boolean existAny (K[] keys, long timeout) { 459 long now = System.nanoTime(); 460 long to = now + (timeout * ONE_MILLION); 461 long waitFor; 462 while ((waitFor = (to - System.nanoTime())) >= 0) { 463 if (existAny (keys)) 464 return true; 465 synchronized (this) { 466 try { 467 this.wait(Math.max(waitFor / ONE_MILLION, 1L)); 468 } catch (InterruptedException e) { } 469 } 470 } 471 return false; 472 } 473 474 /** 475 * unstandard method (required for space replication) - use with care 476 * @return underlying entry map 477 */ 478 public Map getEntries () { 479 return entries; 480 } 481 482 /** 483 * unstandard method (required for space replication) - use with care 484 * @param entries underlying entry map 485 */ 486 public void setEntries (Map entries) { 487 this.entries = entries; 488 } 489 490 private List getList (Object key) { 491 List l = (List) entries.get (key); 492 if (l == null) 493 entries.put (key, l = new LinkedList()); 494 return l; 495 } 496 497 private Object getHead (Object key, boolean remove) { 498 Object obj = null; 499 List l = (List) entries.get (key); 500 boolean wasExpirable = false; 501 while (obj == null && l != null && l.size() > 0) { 502 obj = l.get(0); 503 if (obj instanceof Expirable) { 504 obj = ((Expirable) obj).getValue(); 505 wasExpirable = true; 506 } 507 if (obj == null) { 508 l.remove (0); 509 if (l.isEmpty()) { 510 entries.remove (key); 511 } 512 } 513 } 514 if (l != null) { 515 if (remove && obj != null) 516 l.remove (0); 517 if (l.isEmpty()) { 518 entries.remove (key); 519 if (wasExpirable) 520 unregisterExpirable(key); 521 } 522 } 523 return obj; 524 } 525 526 private Object getObject (Template tmpl, boolean remove) { 527 Object obj = null; 528 Object key = tmpl.getKey(); 529 List l = (List) entries.get (key); 530 if (l != null) { 531 Iterator iter = l.iterator(); 532 boolean wasExpirable = false; 533 while (iter.hasNext()) { 534 obj = iter.next(); 535 if (obj instanceof Expirable) { 536 obj = ((Expirable) obj).getValue(); 537 if (obj == null) { 538 iter.remove(); 539 wasExpirable = true; 540 continue; 541 } 542 } 543 if (tmpl.equals (obj)) { 544 if (remove) 545 iter.remove(); 546 break; 547 } else 548 obj = null; 549 } 550 if (l.isEmpty()) { 551 entries.remove (key); 552 if (wasExpirable) 553 unregisterExpirable(key); 554 } 555 } 556 return obj; 557 } 558 559 private TSpace getSL() { 560 synchronized (this) { 561 if (sl == null) 562 sl = new TSpace(); 563 } 564 return sl; 565 } 566 567 private void registerExpirable(K k, long t) { 568 expirables[t > GCLONG ? 1 : 0].add(k); 569 } 570 571 private void unregisterExpirable(Object k) { 572 for (Set<K> s : expirables) 573 s.remove(k); 574 } 575 576 static class Expirable implements Comparable, Serializable { 577 578 private static final long serialVersionUID = 0xA7F22BF5; 579 580 Object value; 581 582 /** 583 * When to expire, in the future, as given by monotonic System.nanoTime().<br> 584 * IMPORTANT: always use a nanosec offset from System.nanoTime()! 585 */ 586 long expires; 587 588 Expirable (Object value, long expires) { 589 super(); 590 this.value = value; 591 this.expires = expires; 592 } 593 594 boolean isExpired () { 595 return (System.nanoTime() - expires) > 0; 596 } 597 598 @Override 599 public String toString() { 600 return getClass().getName() 601 + "@" + Integer.toHexString(hashCode()) 602 + ",value=" + value.toString() 603 + ",expired=" + isExpired (); 604 } 605 606 Object getValue() { 607 return isExpired() ? null : value; 608 } 609 610 @Override 611 public int compareTo (Object other) { 612 long diff = this.expires - ((Expirable)other).expires; 613 return diff > 0 ? 1 : 614 diff < 0 ? -1 : 615 0; 616 } 617 } 618}