001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import java.io.*; 022import java.time.Duration; 023import java.time.Instant; 024import java.util.Map; 025import java.util.HashMap; 026import java.util.Set; 027import java.util.concurrent.Future; 028import java.util.concurrent.Semaphore; 029 030import com.sleepycat.je.*; 031import com.sleepycat.persist.EntityStore; 032import com.sleepycat.persist.StoreConfig; 033import com.sleepycat.persist.EntityCursor; 034import com.sleepycat.persist.PrimaryIndex; 035import com.sleepycat.persist.SecondaryIndex; 036import com.sleepycat.persist.model.Entity; 037import com.sleepycat.persist.model.Persistent; 038import com.sleepycat.persist.model.PrimaryKey; 039import com.sleepycat.persist.model.SecondaryKey; 040import com.sleepycat.persist.model.Relationship; 041import java.util.HashSet; 042import java.util.concurrent.TimeUnit; 043 044import org.jpos.iso.ISOUtil; 045import org.jpos.util.Log; 046import org.jpos.util.Loggeable; 047 048/** 049 * BerkeleyDB Jave Edition based persistent space implementation 050 * 051 * @author Alejandro Revilla 052 * @since 1.6.5 053 054 * @param <K> the key type 055 * @param <V> the value type 056 */ 057@SuppressWarnings("unchecked") 058public class JESpace<K,V> extends Log implements LocalSpace<K,V>, PersistentSpace, Loggeable, Runnable { 059 /** BerkeleyDB JE environment instance. */ 060 Environment dbe = null; 061 /** BerkeleyDB JE entity store. */ 062 EntityStore store = null; 063 /** Primary index for Ref entities. */ 064 PrimaryIndex<Long, Ref> pIndex = null; 065 /** Primary index for GCRef entities. */ 066 PrimaryIndex<Long,GCRef> gcpIndex = null; 067 /** Secondary index for Ref entities by key. */ 068 SecondaryIndex<String,Long, Ref> sIndex = null; 069 /** Secondary index for GCRef entities by expiration time. */ 070 SecondaryIndex<Long,Long,GCRef> gcsIndex = null; 071 /** Semaphore used to prevent concurrent GC runs. */ 072 Semaphore gcSem = new Semaphore(1); 073 /** Local space used to manage space listeners. */ 074 LocalSpace<Object,SpaceListener> sl; 075 /** Resolution in milliseconds for non-blocking read polling. */ 076 private static final long NRD_RESOLUTION = 500L; 077 /** Delay in milliseconds between GC runs. */ 078 public static final long GC_DELAY = 15*1000L; 079 /** Default transaction timeout in milliseconds. */ 080 public static final long DEFAULT_TXN_TIMEOUT = 30*1000L; 081 /** Default lock timeout in milliseconds. */ 082 public static final long DEFAULT_LOCK_TIMEOUT = 120*1000L; 083 /** Future handle for the scheduled GC task. */ 084 private Future gcTask; 085 086 /** Registry mapping space names to their JESpace instances. */ 087 static final Map<String,Space> spaceRegistrar = 088 new HashMap<String,Space> (); 089 090 /** 091 * Constructs a JESpace with the given name and path/parameter string. 092 * 093 * @param name the space name (also used as the entity store name) 094 * @param params comma-separated parameters; first element is the directory path 095 * @throws SpaceError if the BerkeleyDB environment or store cannot be opened 096 */ 097 public JESpace(String name, String params) throws SpaceError { 098 super(); 099 try { 100 EnvironmentConfig envConfig = new EnvironmentConfig(); 101 StoreConfig storeConfig = new StoreConfig(); 102 String[] p = ISOUtil.commaDecode(params); 103 String path = p[0]; 104 envConfig.setAllowCreate (true); 105 envConfig.setTransactional(true); 106 envConfig.setLockTimeout(getParam("lock.timeout", p, DEFAULT_LOCK_TIMEOUT), TimeUnit.MILLISECONDS); 107 envConfig.setTxnTimeout(getParam("txn.timeout", p, DEFAULT_TXN_TIMEOUT), TimeUnit.MILLISECONDS); 108 storeConfig.setAllowCreate (true); 109 storeConfig.setTransactional (true); 110 111 File dir = new File(path); 112 dir.mkdirs(); 113 114 dbe = new Environment (dir, envConfig); 115 store = new EntityStore (dbe, name, storeConfig); 116 pIndex = store.getPrimaryIndex (Long.class, Ref.class); 117 gcpIndex = store.getPrimaryIndex (Long.class, GCRef.class); 118 sIndex = store.getSecondaryIndex (pIndex, String.class, "key"); 119 gcsIndex = store.getSecondaryIndex (gcpIndex, Long.class, "expires"); 120 gcTask = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GC_DELAY, GC_DELAY, TimeUnit.MILLISECONDS); 121 } catch (Exception e) { 122 throw new SpaceError (e); 123 } 124 } 125 126 public void out (K key, V value) { 127 out (key, value, 0L); 128 } 129 public void out (K key, V value, long timeout) { 130 Transaction txn = null; 131 try { 132 txn = dbe.beginTransaction (null, null); 133 Ref ref = new Ref(key.toString(), value, timeout); 134 pIndex.put (ref); 135 if (timeout > 0L) 136 gcpIndex.putNoReturn ( 137 new GCRef (ref.getId(), ref.getExpiration()) 138 ); 139 txn.commit(); 140 txn = null; 141 synchronized (this) { 142 notifyAll (); 143 } 144 if (sl != null) 145 notifyListeners(key, value); 146 } catch (Exception e) { 147 throw new SpaceError (e); 148 } finally { 149 if (txn != null) 150 abort (txn); 151 } 152 } 153 public void push (K key, V value, long timeout) { 154 Transaction txn = null; 155 try { 156 txn = dbe.beginTransaction (null, null); 157 Ref ref = new Ref(key.toString(), value, timeout); 158 pIndex.put (ref); 159 pIndex.delete (ref.getId()); 160 ref.reverseId(); 161 pIndex.put (ref); 162 txn.commit(); 163 txn = null; 164 synchronized (this) { 165 notifyAll (); 166 } 167 if (sl != null) 168 notifyListeners(key, value); 169 } catch (Exception e) { 170 throw new SpaceError (e); 171 } finally { 172 if (txn != null) 173 abort (txn); 174 } 175 } 176 public void push (K key, V value) { 177 push (key, value, 0L); 178 } 179 @SuppressWarnings("unchecked") 180 public V rdp (Object key) { 181 try { 182 return (V) getObject (key, false); 183 } catch (DatabaseException e) { 184 throw new SpaceError (e); 185 } 186 } 187 188 @SuppressWarnings("unchecked") 189 public synchronized V in (Object key) { 190 Object obj; 191 while ((obj = inp (key)) == null) { 192 try { 193 this.wait (); 194 } catch (InterruptedException ignored) { } 195 } 196 return (V) obj; 197 } 198 @SuppressWarnings("unchecked") 199 public synchronized V in (Object key, long timeout) { 200 Object obj; 201 Instant now = Instant.now(); 202 long duration; 203 while ((obj = inp (key)) == null && 204 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 205 { 206 try { 207 this.wait (timeout - duration); 208 } catch (InterruptedException ignored) { } 209 } 210 return (V) obj; 211 } 212 213 @SuppressWarnings("unchecked") 214 public synchronized V rd (Object key) { 215 Object obj; 216 while ((obj = rdp (key)) == null) { 217 try { 218 this.wait (); 219 } catch (InterruptedException ignored) { } 220 } 221 return (V) obj; 222 } 223 @SuppressWarnings("unchecked") 224 public synchronized V rd (Object key, long timeout) { 225 Object obj; 226 Instant now = Instant.now(); 227 long duration; 228 while ((obj = rdp (key)) == null && 229 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 230 { 231 try { 232 this.wait (timeout - duration); 233 } catch (InterruptedException ignored) { } 234 } 235 return (V) obj; 236 } 237 public synchronized void nrd (Object key) { 238 while (rdp (key) != null) { 239 try { 240 this.wait (NRD_RESOLUTION); 241 } catch (InterruptedException ignored) { } 242 } 243 } 244 public synchronized V nrd (Object key, long timeout) { 245 Object obj; 246 Instant now = Instant.now(); 247 long duration; 248 while ((obj = rdp (key)) != null && 249 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 250 { 251 try { 252 this.wait (Math.min(NRD_RESOLUTION, timeout - duration)); 253 } catch (InterruptedException ignored) { } 254 } 255 return (V) obj; 256 } 257 @SuppressWarnings("unchecked") 258 public V inp (Object key) { 259 try { 260 return (V) getObject (key, true); 261 } catch (DatabaseException e) { 262 throw new SpaceError (e); 263 } 264 } 265 266 public boolean existAny (Object[] keys) { 267 for (Object key : keys) { 268 if (rdp(key) != null) { 269 return true; 270 } 271 } 272 return false; 273 } 274 public boolean existAny (Object[] keys, long timeout) { 275 Instant now = Instant.now(); 276 long duration; 277 while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) { 278 if (existAny (keys)) 279 return true; 280 synchronized (this) { 281 try { 282 wait (timeout - duration); 283 } catch (InterruptedException ignored) { } 284 } 285 } 286 return false; 287 } 288 public synchronized void put (K key, V value, long timeout) { 289 while (inp (key) != null) 290 ; // NOPMD 291 out (key, value, timeout); 292 } 293 /** Removes all existing entries for the key then writes a single entry (head-of-queue replacement). 294 * @param key the entry key 295 * @param value the new value 296 */ 297 public synchronized void put (K key, V value) { 298 while (inp (key) != null) 299 ; // NOPMD 300 out (key, value); 301 } 302 /** Runs a garbage-collection pass removing expired entries from the BDB JE store. 303 * @throws DatabaseException on BDB error 304 */ 305 public void gc () throws DatabaseException { 306 Transaction txn = null; 307 EntityCursor<GCRef> cursor = null; 308 try { 309 if (!gcSem.tryAcquire()) 310 return; 311 txn = dbe.beginTransaction (null, null); 312 cursor = gcsIndex.entities ( 313 txn, 0L, true, Instant.now().toEpochMilli(), false, null 314 ); 315 for (GCRef gcRef: cursor) { 316 pIndex.delete (gcRef.getId()); 317 cursor.delete (); 318 } 319 cursor.close(); 320 cursor = null; 321 txn.commit(); 322 txn = null; 323 if (sl != null) { 324 synchronized (this) { 325 if (sl != null && sl.getKeySet().isEmpty()) 326 sl = null; 327 } 328 } 329 } finally { 330 if (cursor != null) 331 cursor.close(); 332 if (txn != null) 333 abort (txn); 334 gcSem.release(); 335 } 336 } 337 public void run() { 338 try { 339 gc(); 340 } catch (Exception e) { 341 warn(e); 342 } 343 } 344 public void close () throws DatabaseException { 345 gcSem.acquireUninterruptibly(); 346 gcTask.cancel(false); 347 while (!gcTask.isDone()) { 348 try { 349 Thread.sleep(500L); 350 } catch (InterruptedException ignored) { } 351 } 352 store.close (); 353 dbe.close(); 354 } 355 356 /** Returns (or creates) the named JESpace stored at the given path. 357 * @param name space name 358 * @param path filesystem path for BDB JE environment 359 * @return the JESpace instance 360 */ 361 public synchronized static JESpace getSpace (String name, String path) 362 { 363 JESpace sp = (JESpace) spaceRegistrar.get (name); 364 if (sp == null) { 365 sp = new JESpace(name, path); 366 spaceRegistrar.put (name, sp); 367 } 368 return sp; 369 } 370 /** Returns (or creates) the named JESpace using the name as the storage path. 371 * @param name space name and storage path 372 * @return the JESpace instance 373 */ 374 public static JESpace getSpace (String name) { 375 return getSpace (name, name); 376 } 377 private Object getObject (Object key, boolean remove) throws DatabaseException { 378 Transaction txn = null; 379 EntityCursor<Ref> cursor = null; 380 Template tmpl = null; 381 if (key instanceof Template) { 382 tmpl = (Template) key; 383 key = tmpl.getKey(); 384 } 385 try { 386 txn = dbe.beginTransaction (null, null); 387 cursor = sIndex.subIndex(key.toString()).entities(txn, null); 388 for (Ref ref : cursor) { 389 if (ref.isActive()) { 390 if (tmpl != null && !tmpl.equals (ref.getValue())) 391 continue; 392 if (remove) { 393 cursor.delete(); 394 if (ref.hasExpiration()) 395 gcpIndex.delete (txn, ref.getId()); 396 } 397 cursor.close(); cursor = null; 398 txn.commit(); txn = null; 399 return ref.getValue(); 400 } 401 else { 402 cursor.delete(); 403 if (ref.hasExpiration()) 404 gcpIndex.delete (txn, ref.getId()); 405 } 406 } 407 cursor.close(); cursor = null; 408 txn.commit(); txn = null; 409 return null; 410 } finally { 411 if (cursor != null) 412 cursor.close (); 413 if (txn != null) 414 txn.abort(); 415 } 416 } 417 private void abort (Transaction txn) throws SpaceError { 418 try { 419 txn.abort(); 420 } catch (DatabaseException e) { 421 throw new SpaceError (e); 422 } 423 } 424 425 private LocalSpace<Object,SpaceListener> getSL() { 426 synchronized (this) { 427 if (sl == null) 428 sl = new TSpace<Object,SpaceListener>(); 429 } 430 return sl; 431 } 432 433 private void notifyListeners (Object key, Object value) { 434 Set<SpaceListener> listeners = new HashSet<SpaceListener>(); 435 synchronized (this) { 436 if (sl == null) 437 return; 438 SpaceListener s = null; 439 while ((s = sl.inp(key)) != null) 440 listeners.add(s); 441 for (SpaceListener spl: listeners) 442 sl.out(key, spl); 443 } 444 for (SpaceListener spl: listeners) 445 spl.notify (key, value); 446 } 447 448 public synchronized void addListener(Object key, SpaceListener listener) { 449 getSL().out (key, listener); 450 } 451 452 public synchronized void addListener(Object key, SpaceListener listener, long timeout) { 453 getSL().out (key, listener); 454 } 455 456 public synchronized void removeListener(Object key, SpaceListener listener) { 457 if (sl != null) 458 sl.inp (new ObjectTemplate (key, listener)); 459 } 460 461 public Set getKeySet() { 462 Set res = new HashSet(); 463 Transaction txn = null; 464 EntityCursor<Ref> cursor = null; 465 try { 466 txn = dbe.beginTransaction (null, null); 467 cursor = sIndex.entities(txn, null); 468 for (Ref ref : cursor) 469 res.add(ref.getKey()); 470 cursor.close(); 471 cursor = null; 472 txn.commit(); 473 txn = null; 474 } catch (IllegalStateException ex) { 475 warn (ex); 476 } finally { 477 if (cursor != null) 478 cursor.close (); 479 if (txn != null) 480 txn.abort(); 481 } 482 483 return res; 484 } 485 486 public int size(Object key) { 487 Transaction txn = null; 488 EntityCursor<Ref> cursor = null; 489 try { 490 txn = dbe.beginTransaction (null, null); 491 cursor = sIndex.subIndex(key.toString()).entities(txn, null); 492 int keyCount = 0; 493 for (Ref ref : cursor) 494 if (ref.isActive()) 495 keyCount++; 496 cursor.close(); 497 cursor = null; 498 txn.commit(); 499 txn = null; 500 return keyCount; 501 } catch (IllegalStateException e) { 502 return -1; 503 } finally { 504 if (cursor != null) 505 cursor.close (); 506 if (txn != null) 507 txn.abort(); 508 } 509 } 510 511 /** 512 * Persistent entity representing a single space entry (key/value with optional expiration). 513 */ 514 @Entity 515 public static class Ref { 516 @PrimaryKey(sequence="Ref.id") 517 /** Auto-generated primary key for this Ref. */ 518 private long id; 519 520 @SecondaryKey(relate= Relationship.MANY_TO_ONE) 521 /** The space key associated with this Ref. */ 522 private String key; 523 524 /** Expiration timestamp in epoch milliseconds, or 0 if no expiration. */ 525 private long expires; 526 /** The serialized or native value stored in the space. */ 527 private Object value; 528 529 /** Default constructor required by BerkeleyDB JE. */ 530 public Ref () { 531 super(); 532 } 533 534 /** 535 * Constructs a Ref for the given key, value and timeout. 536 * 537 * @param key the space key 538 * @param value the value to store 539 * @param timeout timeout in milliseconds, or 0 for no expiration 540 */ 541 public Ref (String key, Object value, long timeout) { 542 this.key = key; 543 this.value = serialize (value); 544 if (timeout > 0L) 545 this.expires = Instant.now().toEpochMilli() + timeout; 546 } 547 548 /** 549 * Returns the primary key id of this Ref. 550 * 551 * @return the primary key 552 */ 553 public long getId() { 554 return id; 555 } 556 557 /** 558 * Negates the id to push this entry to the front of an ordered scan (push semantics). 559 */ 560 public void reverseId() { 561 this.id = -this.id; 562 } 563 564 /** 565 * Returns {@code true} if this Ref has passed its expiration time. 566 * 567 * @return {@code true} if expired 568 */ 569 public boolean isExpired () { 570 return expires > 0L && expires < Instant.now().toEpochMilli(); 571 } 572 573 /** 574 * Returns {@code true} if this Ref has not yet expired. 575 * 576 * @return {@code true} if still active 577 */ 578 public boolean isActive () { 579 return !isExpired(); 580 } 581 582 /** 583 * Returns the space key of this Ref. 584 * 585 * @return the key string 586 */ 587 public Object getKey () { 588 return key; 589 } 590 591 /** 592 * Returns the deserialized value of this Ref. 593 * 594 * @return the stored value 595 */ 596 public Object getValue () { 597 return deserialize(value); 598 } 599 600 /** 601 * Returns the expiration timestamp in epoch milliseconds. 602 * 603 * @return expiration time, or 0 if no expiration is set 604 */ 605 public long getExpiration () { 606 return expires; 607 } 608 609 /** 610 * Returns {@code true} if this Ref has an expiration set. 611 * 612 * @return {@code true} if an expiration time is set 613 */ 614 public boolean hasExpiration () { 615 return expires > 0L; 616 } 617 private boolean isPersistent (Class c) { 618 return 619 c.isPrimitive() || 620 c.isAnnotationPresent(Entity.class) || 621 c.isAnnotationPresent(Persistent.class); 622 } 623 private Object serialize (Object obj) { 624 Class cls = obj.getClass(); 625 if (isPersistent (cls)) 626 return obj; 627 628 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 629 try { 630 ObjectOutputStream os = new ObjectOutputStream(baos); 631 os.writeObject(obj); 632 obj = baos.toByteArray(); 633 } catch (IOException e) { 634 throw new SpaceError (e); 635 } 636 return obj; 637 } 638 private Object deserialize (Object obj) { 639 Class cls = obj.getClass(); 640 if (isPersistent (cls)) 641 return obj; 642 643 ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) obj); 644 try { 645 ObjectInputStream is = org.jpos.util.Serializer.createSafeObjectInputStream(bais); 646 return is.readObject(); 647 } catch (Exception e) { 648 throw new SpaceError (e); 649 } 650 651 } 652 } 653 654 public void dump(PrintStream p, String indent) { 655 Transaction txn = null; 656 EntityCursor<Ref> cursor = null; 657 int count = 0; 658 try { 659 txn = dbe.beginTransaction (null, null); 660 cursor = sIndex.entities(txn, null); 661 String key = null; 662 int keyCount = 0; 663 for (Ref ref : cursor) { 664 if (ref.getKey().equals(key)) { 665 keyCount++; 666 } else { 667 if (key != null) { 668 dumpKey (p, indent, key, keyCount); 669 count++; 670 } 671 keyCount = 1; 672 key = ref.getKey().toString(); 673 } 674 } 675 if (key != null) { 676 dumpKey (p, indent, key, keyCount); 677 count++; 678 } 679 p.println(indent+"<keycount>"+count+"</keycount>"); 680 cursor.close(); cursor = null; 681 txn.commit(); txn = null; 682 } catch (IllegalStateException e) { 683 //Empty Cursor 684 p.println(indent+"<keycount>0</keycount>"); 685 } finally { 686 if (cursor != null) 687 cursor.close (); 688 if (txn != null) 689 txn.abort(); 690 } 691 } 692 693 private void dumpKey (PrintStream p, String indent, String key, int count) { 694 if (count > 0) 695 p.printf ("%s<key size='%d'>%s</key>\n", indent, count, key); 696 else 697 p.printf ("%s<key>%s</key>\n", indent, key); 698 } 699 700 private long getParam (String name, String[] params, long defaultValue) { 701 for (String s : params) { 702 if (s.contains(name)) { 703 int pos = s.indexOf('='); 704 if (pos >=0 && s.length() > pos) 705 return Long.valueOf(s.substring(pos+1).trim()); 706 } 707 } 708 return defaultValue; 709 } 710 711 /** 712 * Persistent entity used by the garbage collector to track expiring Ref entries. 713 */ 714 @Entity 715 public static class GCRef { 716 @PrimaryKey 717 /** The id of the corresponding Ref entry to be garbage collected. */ 718 private long id; 719 720 @SecondaryKey(relate=Relationship.MANY_TO_ONE) 721 /** Expiration timestamp in epoch milliseconds used to order GC candidates. */ 722 private long expires; 723 724 /** Default constructor required by BerkeleyDB JE. */ 725 public GCRef () { 726 super(); 727 } 728 729 /** 730 * Constructs a GCRef for the given Ref id and expiration time. 731 * 732 * @param id the id of the Ref to be collected 733 * @param expires the expiration timestamp in epoch milliseconds 734 */ 735 public GCRef (long id, long expires) { 736 this.id = id; 737 this.expires = expires; 738 } 739 740 /** 741 * Returns the id of the corresponding Ref entry. 742 * 743 * @return the Ref primary key 744 */ 745 public long getId() { 746 return id; 747 } 748 749 /** 750 * Returns {@code true} if the expiration time has passed. 751 * 752 * @return {@code true} if this GCRef is expired 753 */ 754 public boolean isExpired () { 755 return expires > 0L && expires < Instant.now().toEpochMilli(); 756 } 757 758 /** 759 * Returns the expiration timestamp in epoch milliseconds. 760 * 761 * @return expiration time 762 */ 763 public long getExpiration () { 764 return expires; 765 } 766 } 767}