001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import java.io.*; 022import java.time.Duration; 023import java.time.Instant; 024import java.util.Map; 025import java.util.HashMap; 026import java.util.Set; 027import java.util.concurrent.Future; 028import java.util.concurrent.Semaphore; 029 030import com.sleepycat.je.*; 031import com.sleepycat.persist.EntityStore; 032import com.sleepycat.persist.StoreConfig; 033import com.sleepycat.persist.EntityCursor; 034import com.sleepycat.persist.PrimaryIndex; 035import com.sleepycat.persist.SecondaryIndex; 036import com.sleepycat.persist.model.Entity; 037import com.sleepycat.persist.model.Persistent; 038import com.sleepycat.persist.model.PrimaryKey; 039import com.sleepycat.persist.model.SecondaryKey; 040import com.sleepycat.persist.model.Relationship; 041import java.util.HashSet; 042import java.util.concurrent.TimeUnit; 043 044import org.jpos.iso.ISOUtil; 045import org.jpos.util.Log; 046import org.jpos.util.Loggeable; 047 048/** 049 * BerkeleyDB Jave Edition based persistent space implementation 050 * 051 * @author Alejandro Revilla 052 * @since 1.6.5 053 */ 054@SuppressWarnings("unchecked") 055public class JESpace<K,V> extends Log implements LocalSpace<K,V>, PersistentSpace, Loggeable, Runnable { 056 Environment dbe = null; 057 EntityStore store = null; 058 PrimaryIndex<Long, Ref> pIndex = null; 059 PrimaryIndex<Long,GCRef> gcpIndex = null; 060 SecondaryIndex<String,Long, Ref> sIndex = null; 061 SecondaryIndex<Long,Long,GCRef> gcsIndex = null; 062 Semaphore gcSem = new Semaphore(1); 063 LocalSpace<Object,SpaceListener> sl; 064 private static final long NRD_RESOLUTION = 500L; 065 public static final long GC_DELAY = 15*1000L; 066 public static final long DEFAULT_TXN_TIMEOUT = 30*1000L; 067 public static final long DEFAULT_LOCK_TIMEOUT = 120*1000L; 068 private Future gcTask; 069 070 static final Map<String,Space> spaceRegistrar = 071 new HashMap<String,Space> (); 072 073 public JESpace(String name, String params) throws SpaceError { 074 super(); 075 try { 076 EnvironmentConfig envConfig = new EnvironmentConfig(); 077 StoreConfig storeConfig = new StoreConfig(); 078 String[] p = ISOUtil.commaDecode(params); 079 String path = p[0]; 080 envConfig.setAllowCreate (true); 081 envConfig.setTransactional(true); 082 envConfig.setLockTimeout(getParam("lock.timeout", p, DEFAULT_LOCK_TIMEOUT), TimeUnit.MILLISECONDS); 083 envConfig.setTxnTimeout(getParam("txn.timeout", p, DEFAULT_TXN_TIMEOUT), TimeUnit.MILLISECONDS); 084 storeConfig.setAllowCreate (true); 085 storeConfig.setTransactional (true); 086 087 File dir = new File(path); 088 dir.mkdirs(); 089 090 dbe = new Environment (dir, envConfig); 091 store = new EntityStore (dbe, name, storeConfig); 092 pIndex = store.getPrimaryIndex (Long.class, Ref.class); 093 gcpIndex = store.getPrimaryIndex (Long.class, GCRef.class); 094 sIndex = store.getSecondaryIndex (pIndex, String.class, "key"); 095 gcsIndex = store.getSecondaryIndex (gcpIndex, Long.class, "expires"); 096 gcTask = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GC_DELAY, GC_DELAY, TimeUnit.MILLISECONDS); 097 } catch (Exception e) { 098 throw new SpaceError (e); 099 } 100 } 101 102 public void out (K key, V value) { 103 out (key, value, 0L); 104 } 105 public void out (K key, V value, long timeout) { 106 Transaction txn = null; 107 try { 108 txn = dbe.beginTransaction (null, null); 109 Ref ref = new Ref(key.toString(), value, timeout); 110 pIndex.put (ref); 111 if (timeout > 0L) 112 gcpIndex.putNoReturn ( 113 new GCRef (ref.getId(), ref.getExpiration()) 114 ); 115 txn.commit(); 116 txn = null; 117 synchronized (this) { 118 notifyAll (); 119 } 120 if (sl != null) 121 notifyListeners(key, value); 122 } catch (Exception e) { 123 throw new SpaceError (e); 124 } finally { 125 if (txn != null) 126 abort (txn); 127 } 128 } 129 public void push (K key, V value, long timeout) { 130 Transaction txn = null; 131 try { 132 txn = dbe.beginTransaction (null, null); 133 Ref ref = new Ref(key.toString(), value, timeout); 134 pIndex.put (ref); 135 pIndex.delete (ref.getId()); 136 ref.reverseId(); 137 pIndex.put (ref); 138 txn.commit(); 139 txn = null; 140 synchronized (this) { 141 notifyAll (); 142 } 143 if (sl != null) 144 notifyListeners(key, value); 145 } catch (Exception e) { 146 throw new SpaceError (e); 147 } finally { 148 if (txn != null) 149 abort (txn); 150 } 151 } 152 public void push (K key, V value) { 153 push (key, value, 0L); 154 } 155 @SuppressWarnings("unchecked") 156 public V rdp (Object key) { 157 try { 158 return (V) getObject (key, false); 159 } catch (DatabaseException e) { 160 throw new SpaceError (e); 161 } 162 } 163 164 @SuppressWarnings("unchecked") 165 public synchronized V in (Object key) { 166 Object obj; 167 while ((obj = inp (key)) == null) { 168 try { 169 this.wait (); 170 } catch (InterruptedException ignored) { } 171 } 172 return (V) obj; 173 } 174 @SuppressWarnings("unchecked") 175 public synchronized V in (Object key, long timeout) { 176 Object obj; 177 Instant now = Instant.now(); 178 long duration; 179 while ((obj = inp (key)) == null && 180 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 181 { 182 try { 183 this.wait (timeout - duration); 184 } catch (InterruptedException ignored) { } 185 } 186 return (V) obj; 187 } 188 189 @SuppressWarnings("unchecked") 190 public synchronized V rd (Object key) { 191 Object obj; 192 while ((obj = rdp (key)) == null) { 193 try { 194 this.wait (); 195 } catch (InterruptedException ignored) { } 196 } 197 return (V) obj; 198 } 199 @SuppressWarnings("unchecked") 200 public synchronized V rd (Object key, long timeout) { 201 Object obj; 202 Instant now = Instant.now(); 203 long duration; 204 while ((obj = rdp (key)) == null && 205 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 206 { 207 try { 208 this.wait (timeout - duration); 209 } catch (InterruptedException ignored) { } 210 } 211 return (V) obj; 212 } 213 public synchronized void nrd (Object key) { 214 while (rdp (key) != null) { 215 try { 216 this.wait (NRD_RESOLUTION); 217 } catch (InterruptedException ignored) { } 218 } 219 } 220 public synchronized V nrd (Object key, long timeout) { 221 Object obj; 222 Instant now = Instant.now(); 223 long duration; 224 while ((obj = rdp (key)) != null && 225 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 226 { 227 try { 228 this.wait (Math.min(NRD_RESOLUTION, timeout - duration)); 229 } catch (InterruptedException ignored) { } 230 } 231 return (V) obj; 232 } 233 @SuppressWarnings("unchecked") 234 public V inp (Object key) { 235 try { 236 return (V) getObject (key, true); 237 } catch (DatabaseException e) { 238 throw new SpaceError (e); 239 } 240 } 241 242 public boolean existAny (Object[] keys) { 243 for (Object key : keys) { 244 if (rdp(key) != null) { 245 return true; 246 } 247 } 248 return false; 249 } 250 public boolean existAny (Object[] keys, long timeout) { 251 Instant now = Instant.now(); 252 long duration; 253 while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) { 254 if (existAny (keys)) 255 return true; 256 synchronized (this) { 257 try { 258 wait (timeout - duration); 259 } catch (InterruptedException ignored) { } 260 } 261 } 262 return false; 263 } 264 public synchronized void put (K key, V value, long timeout) { 265 while (inp (key) != null) 266 ; // NOPMD 267 out (key, value, timeout); 268 } 269 public synchronized void put (K key, V value) { 270 while (inp (key) != null) 271 ; // NOPMD 272 out (key, value); 273 } 274 public void gc () throws DatabaseException { 275 Transaction txn = null; 276 EntityCursor<GCRef> cursor = null; 277 try { 278 if (!gcSem.tryAcquire()) 279 return; 280 txn = dbe.beginTransaction (null, null); 281 cursor = gcsIndex.entities ( 282 txn, 0L, true, Instant.now().toEpochMilli(), false, null 283 ); 284 for (GCRef gcRef: cursor) { 285 pIndex.delete (gcRef.getId()); 286 cursor.delete (); 287 } 288 cursor.close(); 289 cursor = null; 290 txn.commit(); 291 txn = null; 292 if (sl != null) { 293 synchronized (this) { 294 if (sl != null && sl.getKeySet().isEmpty()) 295 sl = null; 296 } 297 } 298 } finally { 299 if (cursor != null) 300 cursor.close(); 301 if (txn != null) 302 abort (txn); 303 gcSem.release(); 304 } 305 } 306 public void run() { 307 try { 308 gc(); 309 } catch (Exception e) { 310 warn(e); 311 } 312 } 313 public void close () throws DatabaseException { 314 gcSem.acquireUninterruptibly(); 315 gcTask.cancel(false); 316 while (!gcTask.isDone()) { 317 try { 318 Thread.sleep(500L); 319 } catch (InterruptedException ignored) { } 320 } 321 store.close (); 322 dbe.close(); 323 } 324 325 public synchronized static JESpace getSpace (String name, String path) 326 { 327 JESpace sp = (JESpace) spaceRegistrar.get (name); 328 if (sp == null) { 329 sp = new JESpace(name, path); 330 spaceRegistrar.put (name, sp); 331 } 332 return sp; 333 } 334 public static JESpace getSpace (String name) { 335 return getSpace (name, name); 336 } 337 private Object getObject (Object key, boolean remove) throws DatabaseException { 338 Transaction txn = null; 339 EntityCursor<Ref> cursor = null; 340 Template tmpl = null; 341 if (key instanceof Template) { 342 tmpl = (Template) key; 343 key = tmpl.getKey(); 344 } 345 try { 346 txn = dbe.beginTransaction (null, null); 347 cursor = sIndex.subIndex(key.toString()).entities(txn, null); 348 for (Ref ref : cursor) { 349 if (ref.isActive()) { 350 if (tmpl != null && !tmpl.equals (ref.getValue())) 351 continue; 352 if (remove) { 353 cursor.delete(); 354 if (ref.hasExpiration()) 355 gcpIndex.delete (txn, ref.getId()); 356 } 357 cursor.close(); cursor = null; 358 txn.commit(); txn = null; 359 return ref.getValue(); 360 } 361 else { 362 cursor.delete(); 363 if (ref.hasExpiration()) 364 gcpIndex.delete (txn, ref.getId()); 365 } 366 } 367 cursor.close(); cursor = null; 368 txn.commit(); txn = null; 369 return null; 370 } finally { 371 if (cursor != null) 372 cursor.close (); 373 if (txn != null) 374 txn.abort(); 375 } 376 } 377 private void abort (Transaction txn) throws SpaceError { 378 try { 379 txn.abort(); 380 } catch (DatabaseException e) { 381 throw new SpaceError (e); 382 } 383 } 384 385 private LocalSpace<Object,SpaceListener> getSL() { 386 synchronized (this) { 387 if (sl == null) 388 sl = new TSpace<Object,SpaceListener>(); 389 } 390 return sl; 391 } 392 393 private void notifyListeners (Object key, Object value) { 394 Set<SpaceListener> listeners = new HashSet<SpaceListener>(); 395 synchronized (this) { 396 if (sl == null) 397 return; 398 SpaceListener s = null; 399 while ((s = sl.inp(key)) != null) 400 listeners.add(s); 401 for (SpaceListener spl: listeners) 402 sl.out(key, spl); 403 } 404 for (SpaceListener spl: listeners) 405 spl.notify (key, value); 406 } 407 408 public synchronized void addListener(Object key, SpaceListener listener) { 409 getSL().out (key, listener); 410 } 411 412 public synchronized void addListener(Object key, SpaceListener listener, long timeout) { 413 getSL().out (key, listener); 414 } 415 416 public synchronized void removeListener(Object key, SpaceListener listener) { 417 if (sl != null) 418 sl.inp (new ObjectTemplate (key, listener)); 419 } 420 421 public Set getKeySet() { 422 Set res = new HashSet(); 423 Transaction txn = null; 424 EntityCursor<Ref> cursor = null; 425 try { 426 txn = dbe.beginTransaction (null, null); 427 cursor = sIndex.entities(txn, null); 428 for (Ref ref : cursor) 429 res.add(ref.getKey()); 430 cursor.close(); 431 cursor = null; 432 txn.commit(); 433 txn = null; 434 } catch (IllegalStateException ex) { 435 warn (ex); 436 } finally { 437 if (cursor != null) 438 cursor.close (); 439 if (txn != null) 440 txn.abort(); 441 } 442 443 return res; 444 } 445 446 public int size(Object key) { 447 Transaction txn = null; 448 EntityCursor<Ref> cursor = null; 449 try { 450 txn = dbe.beginTransaction (null, null); 451 cursor = sIndex.subIndex(key.toString()).entities(txn, null); 452 int keyCount = 0; 453 for (Ref ref : cursor) 454 if (ref.isActive()) 455 keyCount++; 456 cursor.close(); 457 cursor = null; 458 txn.commit(); 459 txn = null; 460 return keyCount; 461 } catch (IllegalStateException e) { 462 return -1; 463 } finally { 464 if (cursor != null) 465 cursor.close (); 466 if (txn != null) 467 txn.abort(); 468 } 469 } 470 471 @Entity 472 public static class Ref { 473 @PrimaryKey(sequence="Ref.id") 474 private long id; 475 476 @SecondaryKey(relate= Relationship.MANY_TO_ONE) 477 private String key; 478 479 private long expires; 480 private Object value; 481 482 public Ref () { 483 super(); 484 } 485 public Ref (String key, Object value, long timeout) { 486 this.key = key; 487 this.value = serialize (value); 488 if (timeout > 0L) 489 this.expires = Instant.now().toEpochMilli() + timeout; 490 } 491 public long getId() { 492 return id; 493 } 494 public void reverseId() { 495 this.id = -this.id; 496 } 497 public boolean isExpired () { 498 return expires > 0L && expires < Instant.now().toEpochMilli(); 499 } 500 public boolean isActive () { 501 return !isExpired(); 502 } 503 public Object getKey () { 504 return key; 505 } 506 public Object getValue () { 507 return deserialize(value); 508 } 509 public long getExpiration () { 510 return expires; 511 } 512 public boolean hasExpiration () { 513 return expires > 0L; 514 } 515 private boolean isPersistent (Class c) { 516 return 517 c.isPrimitive() || 518 c.isAnnotationPresent(Entity.class) || 519 c.isAnnotationPresent(Persistent.class); 520 } 521 private Object serialize (Object obj) { 522 Class cls = obj.getClass(); 523 if (isPersistent (cls)) 524 return obj; 525 526 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 527 try { 528 ObjectOutputStream os = new ObjectOutputStream(baos); 529 os.writeObject(obj); 530 obj = baos.toByteArray(); 531 } catch (IOException e) { 532 throw new SpaceError (e); 533 } 534 return obj; 535 } 536 private Object deserialize (Object obj) { 537 Class cls = obj.getClass(); 538 if (isPersistent (cls)) 539 return obj; 540 541 ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) obj); 542 try { 543 ObjectInputStream is = new ObjectInputStream (bais); 544 return is.readObject(); 545 } catch (Exception e) { 546 throw new SpaceError (e); 547 } 548 549 } 550 } 551 552 public void dump(PrintStream p, String indent) { 553 Transaction txn = null; 554 EntityCursor<Ref> cursor = null; 555 int count = 0; 556 try { 557 txn = dbe.beginTransaction (null, null); 558 cursor = sIndex.entities(txn, null); 559 String key = null; 560 int keyCount = 0; 561 for (Ref ref : cursor) { 562 if (ref.getKey().equals(key)) { 563 keyCount++; 564 } else { 565 if (key != null) { 566 dumpKey (p, indent, key, keyCount); 567 count++; 568 } 569 keyCount = 1; 570 key = ref.getKey().toString(); 571 } 572 } 573 if (key != null) { 574 dumpKey (p, indent, key, keyCount); 575 count++; 576 } 577 p.println(indent+"<keycount>"+count+"</keycount>"); 578 cursor.close(); cursor = null; 579 txn.commit(); txn = null; 580 } catch (IllegalStateException e) { 581 //Empty Cursor 582 p.println(indent+"<keycount>0</keycount>"); 583 } finally { 584 if (cursor != null) 585 cursor.close (); 586 if (txn != null) 587 txn.abort(); 588 } 589 } 590 591 private void dumpKey (PrintStream p, String indent, String key, int count) { 592 if (count > 0) 593 p.printf ("%s<key size='%d'>%s</key>\n", indent, count, key); 594 else 595 p.printf ("%s<key>%s</key>\n", indent, key); 596 } 597 598 private long getParam (String name, String[] params, long defaultValue) { 599 for (String s : params) { 600 if (s.contains(name)) { 601 int pos = s.indexOf('='); 602 if (pos >=0 && s.length() > pos) 603 return Long.valueOf(s.substring(pos+1).trim()); 604 } 605 } 606 return defaultValue; 607 } 608 609 @Entity 610 public static class GCRef { 611 @PrimaryKey 612 private long id; 613 614 @SecondaryKey(relate=Relationship.MANY_TO_ONE) 615 private long expires; 616 public GCRef () { 617 super(); 618 } 619 public GCRef (long id, long expires) { 620 this.id = id; 621 this.expires = expires; 622 } 623 public long getId() { 624 return id; 625 } 626 public boolean isExpired () { 627 return expires > 0L && expires < Instant.now().toEpochMilli(); 628 } 629 public long getExpiration () { 630 return expires; 631 } 632 } 633}