001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import jdbm.RecordManager; 022import jdbm.RecordManagerFactory; 023import jdbm.RecordManagerOptions; 024import jdbm.helper.FastIterator; 025import jdbm.helper.Serializer; 026import jdbm.htree.HTree; 027import org.jpos.util.DefaultTimer; 028 029import java.io.Externalizable; 030import java.io.IOException; 031import java.io.ObjectInput; 032import java.io.ObjectOutput; 033import java.time.Duration; 034import java.time.Instant; 035import java.util.*; 036 037/** 038 * JDBM based persistent space implementation 039 * 040 * @param <K> key type stored in this space 041 * @param <V> value type stored in this space 042 * @author Alejandro Revilla 043 * @author Kris Leite 044 * @version $Revision$ $Date$ 045 * @since 1.4.7 046 */ 047@SuppressWarnings("unchecked") 048public class JDBMSpace<K,V> extends TimerTask implements Space<K,V>, PersistentSpace { 049 /** JDBM hash tree storing the space entries. */ 050 protected HTree htree; 051 /** Underlying JDBM record manager used to persist {@link #htree}. */ 052 protected RecordManager recman; 053 /** Shared serializer used for {@code Ref} values stored in {@link #htree}. */ 054 protected static final Serializer refSerializer = new Ref (); 055 /** Cache of named JDBM spaces, keyed by space name. */ 056 protected static final Map<String,Space> spaceRegistrar = new HashMap<String,Space> (); 057 /** Whether mutating operations should commit immediately. */ 058 protected boolean autoCommit = true; 059 /** Space name used for registration. */ 060 protected String name; 061 /** Periodic interval, in milliseconds, between background GC sweeps. */ 062 public static final long GCDELAY = 5*60*1000; 063 private static final long NRD_RESOLUTION = 500L; 064 065 /** 066 * protected constructor. 067 * @param name Space Name 068 * @param filename underlying JDBM filename 069 */ 070 protected JDBMSpace (String name, String filename) { 071 super(); 072 this.name = name; 073 try { 074 Properties props = new Properties(); 075 props.put (RecordManagerOptions.CACHE_SIZE, "512"); 076 recman = RecordManagerFactory.createRecordManager (filename, props); 077 long recid = recman.getNamedObject ("space"); 078 if (recid != 0) { 079 htree = HTree.load (recman, recid); 080 } else { 081 htree = HTree.createInstance (recman); 082 recman.setNamedObject ("space", htree.getRecid()); 083 } 084 recman.commit (); 085 } catch (IOException e) { 086 throw new SpaceError (e); 087 } 088 DefaultTimer.getTimer().schedule (this, GCDELAY, GCDELAY); 089 } 090 /** 091 * Returns a reference to the default JDBMSpace, backed by a file named {@code space}. 092 * 093 * @return reference to default JDBMSpace 094 */ 095 public static JDBMSpace getSpace() { 096 return getSpace ("space"); 097 } 098 /** 099 * creates a named JDBMSpace 100 * (filename used for storage is the same as the given name) 101 * @param name the Space name 102 * @return reference to named JDBMSpace 103 */ 104 public static JDBMSpace getSpace(String name) { 105 return getSpace(name, name); 106 } 107 /** 108 * creates a named JDBMSpace 109 * @param name the Space name 110 * @param filename the storage file name 111 * @return reference to named JDBMSpace 112 */ 113 public synchronized static JDBMSpace 114 getSpace (String name, String filename) 115 { 116 JDBMSpace sp = (JDBMSpace) spaceRegistrar.get (name); 117 if (sp == null) { 118 sp = new JDBMSpace (name, filename); 119 spaceRegistrar.put (name, sp); 120 } 121 return sp; 122 } 123 /** 124 * Use with utmost care and at your own risk. 125 * 126 * If you are to perform several operations on the space you 127 * should synchronize on the space, i.e: 128 * <pre> 129 * synchronized (sp) { 130 * sp.setAutoCommit (false); 131 * sp.out (..., ...) 132 * sp.out (..., ...) 133 * ... 134 * ... 135 * sp.inp (...); 136 * sp.commit (); // or sp.rollback (); 137 * sp.setAutoCommit (true); 138 * } 139 * </pre> 140 * @param b true or false 141 */ 142 public void setAutoCommit (boolean b) { 143 this.autoCommit = b; 144 } 145 /** 146 * force commit 147 */ 148 public void commit () { 149 try { 150 recman.commit (); 151 this.notifyAll (); 152 } catch (IOException e) { 153 throw new SpaceError (e); 154 } 155 } 156 /** 157 * force rollback 158 */ 159 public void rollback () { 160 try { 161 recman.rollback (); 162 } catch (IOException e) { 163 throw new SpaceError (e); 164 } 165 } 166 /** 167 * close this space - use with care 168 */ 169 public void close () { 170 synchronized (JDBMSpace.class) { 171 spaceRegistrar.remove (name); 172 } 173 synchronized (this) { 174 try { 175 recman.close (); 176 recman = null; 177 } catch (IOException e) { 178 throw new SpaceError (e); 179 } 180 } 181 } 182 /** 183 * Write a new entry into the Space 184 * @param key Entry's key 185 * @param value Object value 186 */ 187 public void out (K key, V value) { 188 out (key, value, -1); 189 } 190 /** 191 * Write a new entry into the Space 192 * The entry will timeout after the specified period 193 * @param key Entry's key 194 * @param value Object value 195 * @param timeout entry timeout in millis 196 */ 197 public void out (K key, V value, long timeout) { 198 if (key == null || value == null) 199 throw new NullPointerException ("key=" + key + ", value=" + value); 200 try { 201 synchronized (this) { 202 long recid = recman.insert (value); 203 204 long expiration = timeout == -1 ? Long.MAX_VALUE : 205 Instant.now().toEpochMilli() + timeout; 206 Ref dataRef = new Ref (recid, expiration); 207 long dataRefRecId = recman.insert (dataRef, refSerializer); 208 209 Head head = (Head) htree.get (key); 210 if (head == null) { 211 head = new Head (); 212 head.first = dataRefRecId; 213 head.last = dataRefRecId; 214 head.count = 1; 215 } else { 216 long previousLast = head.last; 217 Ref lastRef = 218 (Ref) recman.fetch (previousLast, refSerializer); 219 lastRef.next = dataRefRecId; 220 head.last = dataRefRecId; 221 head.count++; 222 recman.update (previousLast, lastRef, refSerializer); 223 } 224 htree.put (key, head); 225 if (autoCommit) { 226 recman.commit (); 227 this.notifyAll (); 228 } 229 } 230 } catch (IOException e) { 231 throw new SpaceError (e); 232 } 233 } 234 public void push (K key, V value) { 235 push (key, value, -1); 236 } 237 /** 238 * Write a new entry into the Space at the head of a queue 239 * The entry will timeout after the specified period 240 * @param key Entry's key 241 * @param value Object value 242 * @param timeout entry timeout in millis 243 */ 244 public void push (Object key, Object value, long timeout) { 245 if (key == null || value == null) 246 throw new NullPointerException ("key=" + key + ", value=" + value); 247 try { 248 synchronized (this) { 249 long recid = recman.insert (value); 250 long expiration = timeout == -1 ? Long.MAX_VALUE : 251 Instant.now().toEpochMilli() + timeout; 252 Ref dataRef = new Ref (recid, expiration); 253 254 Head head = (Head) htree.get (key); 255 if (head == null) { 256 head = new Head (); 257 head.first = head.last = recman.insert (dataRef, refSerializer); 258 } else { 259 dataRef.next = head.first; 260 head.first = recman.insert (dataRef, refSerializer); 261 } 262 head.count++; 263 htree.put (key, head); 264 if (autoCommit) { 265 recman.commit (); 266 this.notifyAll (); 267 } 268 } 269 } catch (IOException e) { 270 throw new SpaceError (e); 271 } 272 } 273 /** 274 * Read probe reads an entry from the space if one exists, 275 * return null otherwise. 276 * @param key Entry's key 277 * @return value or null 278 */ 279 public synchronized V rdp (Object key) { 280 try { 281 if (key instanceof Template) 282 return (V) getObject ((Template) key, false); 283 284 Object obj = null; 285 Ref ref = getFirst (key, false); 286 if (ref != null) 287 obj = recman.fetch (ref.recid); 288 if (autoCommit) 289 recman.commit (); 290 return (V) obj; 291 } catch (IOException e) { 292 throw new SpaceError (e); 293 } 294 } 295 296 /** 297 * In probe takes an entry from the space if one exists, 298 * return null otherwise. 299 * @param key Entry's key 300 * @return value or null 301 */ 302 public synchronized V inp (Object key) { 303 try { 304 if (key instanceof Template) 305 return (V) getObject ((Template) key, true); 306 307 Object obj = null; 308 Ref ref = getFirst (key, true); 309 if (ref != null) { 310 obj = recman.fetch (ref.recid); 311 recman.delete (ref.recid); 312 } 313 if (autoCommit) 314 recman.commit (); 315 return (V) obj; 316 } catch (IOException e) { 317 throw new SpaceError (e); 318 } 319 } 320 public synchronized V in (Object key) { 321 Object obj; 322 while ((obj = inp (key)) == null) { 323 try { 324 this.wait (); 325 } catch (InterruptedException ignored) { } 326 } 327 return (V) obj; 328 } 329 /** 330 * Take an entry from the space, waiting forever until one exists. 331 * @param key Entry's key 332 * @return value 333 */ 334 public synchronized V in (Object key, long timeout) { 335 Object obj; 336 Instant now = Instant.now(); 337 long duration; 338 while ((obj = inp (key)) == null && 339 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 340 { 341 try { 342 this.wait (timeout - duration); 343 } catch (InterruptedException ignored) { } 344 } 345 return (V) obj; 346 } 347 348 /** 349 * Read an entry from the space, waiting forever until one exists. 350 * @param key Entry's key 351 * @return value 352 */ 353 public synchronized V rd (Object key) { 354 Object obj; 355 while ((obj = rdp (key)) == null) { 356 try { 357 this.wait (); 358 } catch (InterruptedException ignored) { } 359 } 360 return (V) obj; 361 } 362 363 /** 364 * Read an entry from the space, waiting a limited amount of time 365 * until one exists. 366 * @param key Entry's key 367 * @param timeout millis to wait 368 * @return value or null 369 */ 370 public synchronized V rd (Object key, long timeout) { 371 Object obj; 372 Instant now = Instant.now(); 373 long duration; 374 while ((obj = rdp (key)) == null && 375 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 376 { 377 try { 378 this.wait (timeout - duration); 379 } catch (InterruptedException ignored) { } 380 } 381 return (V) obj; 382 } 383 public synchronized void nrd (Object key) { 384 while (rdp (key) != null) { 385 try { 386 this.wait (NRD_RESOLUTION); 387 } catch (InterruptedException ignored) { } 388 } 389 } 390 public synchronized V nrd (Object key, long timeout) { 391 Object obj; 392 Instant now = Instant.now(); 393 long duration; 394 while ((obj = rdp (key)) != null && 395 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 396 { 397 try { 398 this.wait (Math.min(NRD_RESOLUTION, timeout - duration)); 399 } catch (InterruptedException ignored) { } 400 } 401 return (V) obj; 402 } 403 404 /** 405 * Returns the approximate number of entries currently stored under {@code key}. 406 * 407 * @param key the Key 408 * @return aproximately queue size 409 */ 410 public long size (Object key) { 411 try { 412 Head head = (Head) htree.get (key); 413 return head != null ? head.count : 0; 414 } catch (IOException e) { 415 throw new SpaceError (e); 416 } 417 } 418 public boolean existAny (Object[] keys) { 419 for (Object key : keys) { 420 if (rdp(key) != null) 421 return true; 422 } 423 return false; 424 } 425 public boolean existAny (Object[] keys, long timeout) { 426 Instant now = Instant.now(); 427 long duration; 428 while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) { 429 if (existAny (keys)) 430 return true; 431 synchronized (this) { 432 try { 433 wait (timeout - duration); 434 } catch (InterruptedException ignored) { } 435 } 436 } 437 return false; 438 } 439 public synchronized void put (K key, V value, long timeout) { 440 while (inp (key) != null) 441 ; // NOPMD 442 out (key, value, timeout); 443 } 444 public synchronized void put (K key, V value) { 445 while (inp (key) != null) 446 ; // NOPMD 447 out (key, value); 448 } 449 private void purge (Object key) throws IOException { 450 Head head = (Head) htree.get (key); 451 Ref previousRef = null; 452 if (head != null) { 453 for (long recid = head.first; recid >= 0; ) { 454 Ref r = (Ref) recman.fetch (recid, refSerializer); 455 if (r.isExpired ()) { 456 recman.delete (r.recid); 457 recman.delete (recid); 458 head.count--; 459 if (previousRef == null) { 460 head.first = r.next; 461 } else { 462 previousRef.next = r.next; 463 recman.update ( 464 head.last, previousRef, refSerializer 465 ); 466 } 467 } else { 468 previousRef = r; 469 head.last = recid; 470 } 471 recid = r.next; 472 } 473 if (head.first == -1) { 474 htree.remove (key); 475 } 476 else { 477 htree.put (key, head); 478 } 479 } 480 } 481 482 @Override 483 public void run () { 484 try { 485 gc(); 486 } catch (Exception | SpaceError ex) { 487 // this happens when e.g. the jdbm file is corrupted 488 ex.printStackTrace(); 489 } 490 } 491 /** 492 * garbage collector. 493 * removes expired entries 494 */ 495 public void gc () { 496 final String GCKEY = "GC$" + Integer.toString (hashCode()); 497 final long TIMEOUT = 24 * 3600 * 1000; 498 Object obj; 499 try { 500 synchronized (this) { 501 // avoid concurrent gc 502 if (rdp (GCKEY) != null) 503 return; 504 ((Space)this).out (GCKEY, Boolean.TRUE, TIMEOUT); 505 } 506 FastIterator iter = htree.keys (); 507 508 try { 509 while ( (obj = iter.next()) != null) { 510 ((Space)this).out (GCKEY, obj, TIMEOUT); 511 Thread.yield (); 512 } 513 } catch (ConcurrentModificationException e) { 514 // ignore, we may have better luck on next try 515 } 516 while ( (obj = inp (GCKEY)) != null) { 517 synchronized (this) { 518 purge (obj); 519 recman.commit (); 520 } 521 Thread.yield (); 522 } 523 } catch (IOException e) { 524 throw new SpaceError (e); 525 } 526 } 527 /** 528 * Returns a space-separated list of every key currently stored in this space. 529 * 530 * @return all keys, joined by single-space separators 531 */ 532 public String getKeys () { 533 StringBuilder sb = new StringBuilder(); 534 try { 535 FastIterator iter = htree.keys (); 536 Object obj; 537 while ( (obj = iter.next()) != null) { 538 if (sb.length() > 0) 539 sb.append (' '); 540 sb.append (obj.toString()); 541 } 542 } catch (IOException e) { 543 throw new SpaceError (e); 544 } 545 return sb.toString(); 546 } 547 548 private Ref getFirst (Object key, boolean remove) throws IOException { 549 Head head = (Head) htree.get (key); 550 Ref ref = null; 551 if (head != null) { 552 long recid; 553 for (recid = head.first; recid >= 0; ) { 554 Ref r = (Ref) recman.fetch (recid, refSerializer); 555 if (r.isExpired ()) { 556 recman.delete (r.recid); 557 recman.delete (recid); 558 recid = r.next; 559 head.count--; 560 } else { 561 ref = r; 562 if (remove) { 563 recman.delete (recid); 564 recid = ref.next; 565 head.count--; 566 } 567 break; 568 } 569 } 570 if (head.first != recid) { 571 if (recid < 0) 572 htree.remove (key); 573 else { 574 head.first = recid; 575 htree.put (key, head); 576 } 577 } 578 } 579 return ref; 580 } 581 private void unlinkRef 582 (long recid, Head head, Ref r, Ref previousRef, long previousRecId) 583 throws IOException 584 { 585 recman.delete (r.recid); 586 recman.delete (recid); 587 head.count--; 588 if (previousRef == null) 589 head.first = r.next; 590 else { 591 previousRef.next = r.next; 592 recman.update ( 593 previousRecId, previousRef, refSerializer 594 ); 595 } 596 } 597 private Object getObject (Template tmpl, boolean remove) 598 throws IOException 599 { 600 Object obj = null; 601 Object key = tmpl.getKey(); 602 Head head = (Head) htree.get (key); 603 Ref previousRef = null; 604 long previousRecId = 0; 605 int unlinkCount = 0; 606 if (head != null) { 607 for (long recid = head.first; recid >= 0; ) { 608 Ref r = (Ref) recman.fetch (recid, refSerializer); 609 if (r.isExpired ()) { 610 unlinkRef (recid, head, r, previousRef, previousRecId); 611 unlinkCount++; 612 } else { 613 Object o = recman.fetch (r.recid); 614 if (o != null && tmpl.equals(o)) { 615 obj = o; 616 if (remove) { 617 unlinkRef ( 618 recid, head, r, previousRef, previousRecId 619 ); 620 unlinkCount++; 621 } 622 break; 623 } 624 previousRef = r; 625 previousRecId = recid; 626 } 627 recid = r.next; 628 } 629 if (unlinkCount > 0) { 630 if (head.first == -1) { 631 htree.remove (key); 632 } 633 else { 634 htree.put (key, head); 635 } 636 } 637 } 638 return obj; 639 } 640 static class Head implements Externalizable { 641 public long first; 642 public long last; 643 public long count; 644 static final long serialVersionUID = 2L; 645 646 public Head () { 647 super (); 648 first = -1; 649 last = -1; 650 } 651 public void writeExternal (ObjectOutput out) throws IOException { 652 out.writeLong (first); 653 out.writeLong (last); 654 out.writeLong (count); 655 } 656 public void readExternal (ObjectInput in) throws IOException { 657 first = in.readLong (); 658 last = in.readLong (); 659 count = in.readLong (); 660 } 661 public String toString() { 662 return getClass().getName() 663 + "@" + Integer.toHexString(hashCode()) 664 + ":[first=" + first 665 + ",last=" + last 666 + "]"; 667 } 668 } 669 static class Ref implements Serializer { 670 long recid; 671 long expires; 672 long next; 673 static final long serialVersionUID = 1L; 674 675 public Ref () { 676 super(); 677 } 678 public Ref (long recid, long expires) { 679 super(); 680 this.recid = recid; 681 this.expires = expires; 682 this.next = -1; 683 } 684 685 public boolean isExpired () { 686 return expires < Instant.now().toEpochMilli(); 687 } 688 public String toString() { 689 return getClass().getName() 690 + "@" + Integer.toHexString(hashCode()) 691 + ":[recid=" + recid 692 + ",next=" + next 693 + ",expired=" + isExpired () 694 + "]"; 695 } 696 public byte[] serialize (Object obj) 697 throws IOException 698 { 699 Ref d = (Ref) obj; 700 701 byte[] buf = new byte [24]; 702 putLong (buf, 0, d.recid); 703 putLong (buf, 8, d.next); 704 putLong (buf,16, d.expires); 705 return buf; 706 } 707 public Object deserialize (byte[] serialized) 708 throws IOException 709 { 710 Ref d = new Ref (); 711 d.recid = getLong (serialized, 0); 712 d.next = getLong (serialized, 8); 713 d.expires = getLong (serialized, 16); 714 return d; 715 } 716 } 717 static void putLong (byte[] b, int off, long val) { 718 b[off+7] = (byte) val; 719 b[off+6] = (byte) (val >>> 8); 720 b[off+5] = (byte) (val >>> 16); 721 b[off+4] = (byte) (val >>> 24); 722 b[off+3] = (byte) (val >>> 32); 723 b[off+2] = (byte) (val >>> 40); 724 b[off+1] = (byte) (val >>> 48); 725 b[off] = (byte) (val >>> 56); 726 } 727 static long getLong (byte[] b, int off) { 728 return (b[off+7] & 0xFFL) + 729 ((b[off+6] & 0xFFL) << 8) + 730 ((b[off+5] & 0xFFL) << 16) + 731 ((b[off+4] & 0xFFL) << 24) + 732 ((b[off+3] & 0xFFL) << 32) + 733 ((b[off+2] & 0xFFL) << 40) + 734 ((b[off+1] & 0xFFL) << 48) + 735 ((b[off] & 0xFFL) << 56); 736 } 737}