001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.space; 020 021import jdbm.RecordManager; 022import jdbm.RecordManagerFactory; 023import jdbm.RecordManagerOptions; 024import jdbm.helper.FastIterator; 025import jdbm.helper.Serializer; 026import jdbm.htree.HTree; 027import org.jpos.util.DefaultTimer; 028 029import java.io.Externalizable; 030import java.io.IOException; 031import java.io.ObjectInput; 032import java.io.ObjectOutput; 033import java.time.Duration; 034import java.time.Instant; 035import java.util.*; 036 037/** 038 * JDBM based persistent space implementation 039 * 040 * @author Alejandro Revilla 041 * @author Kris Leite 042 * @version $Revision$ $Date$ 043 * @since 1.4.7 044 */ 045@SuppressWarnings("unchecked") 046public class JDBMSpace<K,V> extends TimerTask implements Space<K,V>, PersistentSpace { 047 protected HTree htree; 048 protected RecordManager recman; 049 protected static final Serializer refSerializer = new Ref (); 050 protected static final Map<String,Space> spaceRegistrar = new HashMap<String,Space> (); 051 protected boolean autoCommit = true; 052 protected String name; 053 public static final long GCDELAY = 5*60*1000; 054 private static final long NRD_RESOLUTION = 500L; 055 056 /** 057 * protected constructor. 058 * @param name Space Name 059 * @param filename underlying JDBM filename 060 */ 061 protected JDBMSpace (String name, String filename) { 062 super(); 063 this.name = name; 064 try { 065 Properties props = new Properties(); 066 props.put (RecordManagerOptions.CACHE_SIZE, "512"); 067 recman = RecordManagerFactory.createRecordManager (filename, props); 068 long recid = recman.getNamedObject ("space"); 069 if (recid != 0) { 070 htree = HTree.load (recman, recid); 071 } else { 072 htree = HTree.createInstance (recman); 073 recman.setNamedObject ("space", htree.getRecid()); 074 } 075 recman.commit (); 076 } catch (IOException e) { 077 throw new SpaceError (e); 078 } 079 DefaultTimer.getTimer().schedule (this, GCDELAY, GCDELAY); 080 } 081 /** 082 * @return reference to default JDBMSpace 083 */ 084 public static JDBMSpace getSpace() { 085 return getSpace ("space"); 086 } 087 /** 088 * creates a named JDBMSpace 089 * (filename used for storage is the same as the given name) 090 * @param name the Space name 091 * @return reference to named JDBMSpace 092 */ 093 public static JDBMSpace getSpace(String name) { 094 return getSpace(name, name); 095 } 096 /** 097 * creates a named JDBMSpace 098 * @param name the Space name 099 * @param filename the storage file name 100 * @return reference to named JDBMSpace 101 */ 102 public synchronized static JDBMSpace 103 getSpace (String name, String filename) 104 { 105 JDBMSpace sp = (JDBMSpace) spaceRegistrar.get (name); 106 if (sp == null) { 107 sp = new JDBMSpace (name, filename); 108 spaceRegistrar.put (name, sp); 109 } 110 return sp; 111 } 112 /** 113 * Use with utmost care and at your own risk. 114 * 115 * If you are to perform several operations on the space you 116 * should synchronize on the space, i.e: 117 * <pre> 118 * synchronized (sp) { 119 * sp.setAutoCommit (false); 120 * sp.out (..., ...) 121 * sp.out (..., ...) 122 * ... 123 * ... 124 * sp.inp (...); 125 * sp.commit (); // or sp.rollback (); 126 * sp.setAutoCommit (true); 127 * } 128 * </pre> 129 * @param b true or false 130 */ 131 public void setAutoCommit (boolean b) { 132 this.autoCommit = b; 133 } 134 /** 135 * force commit 136 */ 137 public void commit () { 138 try { 139 recman.commit (); 140 this.notifyAll (); 141 } catch (IOException e) { 142 throw new SpaceError (e); 143 } 144 } 145 /** 146 * force rollback 147 */ 148 public void rollback () { 149 try { 150 recman.rollback (); 151 } catch (IOException e) { 152 throw new SpaceError (e); 153 } 154 } 155 /** 156 * close this space - use with care 157 */ 158 public void close () { 159 synchronized (JDBMSpace.class) { 160 spaceRegistrar.remove (name); 161 } 162 synchronized (this) { 163 try { 164 recman.close (); 165 recman = null; 166 } catch (IOException e) { 167 throw new SpaceError (e); 168 } 169 } 170 } 171 /** 172 * Write a new entry into the Space 173 * @param key Entry's key 174 * @param value Object value 175 */ 176 public void out (K key, V value) { 177 out (key, value, -1); 178 } 179 /** 180 * Write a new entry into the Space 181 * The entry will timeout after the specified period 182 * @param key Entry's key 183 * @param value Object value 184 * @param timeout entry timeout in millis 185 */ 186 public void out (K key, V value, long timeout) { 187 if (key == null || value == null) 188 throw new NullPointerException ("key=" + key + ", value=" + value); 189 try { 190 synchronized (this) { 191 long recid = recman.insert (value); 192 193 long expiration = timeout == -1 ? Long.MAX_VALUE : 194 Instant.now().toEpochMilli() + timeout; 195 Ref dataRef = new Ref (recid, expiration); 196 long dataRefRecId = recman.insert (dataRef, refSerializer); 197 198 Head head = (Head) htree.get (key); 199 if (head == null) { 200 head = new Head (); 201 head.first = dataRefRecId; 202 head.last = dataRefRecId; 203 head.count = 1; 204 } else { 205 long previousLast = head.last; 206 Ref lastRef = 207 (Ref) recman.fetch (previousLast, refSerializer); 208 lastRef.next = dataRefRecId; 209 head.last = dataRefRecId; 210 head.count++; 211 recman.update (previousLast, lastRef, refSerializer); 212 } 213 htree.put (key, head); 214 if (autoCommit) { 215 recman.commit (); 216 this.notifyAll (); 217 } 218 } 219 } catch (IOException e) { 220 throw new SpaceError (e); 221 } 222 } 223 public void push (K key, V value) { 224 push (key, value, -1); 225 } 226 /** 227 * Write a new entry into the Space at the head of a queue 228 * The entry will timeout after the specified period 229 * @param key Entry's key 230 * @param value Object value 231 * @param timeout entry timeout in millis 232 */ 233 public void push (Object key, Object value, long timeout) { 234 if (key == null || value == null) 235 throw new NullPointerException ("key=" + key + ", value=" + value); 236 try { 237 synchronized (this) { 238 long recid = recman.insert (value); 239 long expiration = timeout == -1 ? Long.MAX_VALUE : 240 Instant.now().toEpochMilli() + timeout; 241 Ref dataRef = new Ref (recid, expiration); 242 243 Head head = (Head) htree.get (key); 244 if (head == null) { 245 head = new Head (); 246 head.first = head.last = recman.insert (dataRef, refSerializer); 247 } else { 248 dataRef.next = head.first; 249 head.first = recman.insert (dataRef, refSerializer); 250 } 251 head.count++; 252 htree.put (key, head); 253 if (autoCommit) { 254 recman.commit (); 255 this.notifyAll (); 256 } 257 } 258 } catch (IOException e) { 259 throw new SpaceError (e); 260 } 261 } 262 /** 263 * Read probe reads an entry from the space if one exists, 264 * return null otherwise. 265 * @param key Entry's key 266 * @return value or null 267 */ 268 public synchronized V rdp (Object key) { 269 try { 270 if (key instanceof Template) 271 return (V) getObject ((Template) key, false); 272 273 Object obj = null; 274 Ref ref = getFirst (key, false); 275 if (ref != null) 276 obj = recman.fetch (ref.recid); 277 if (autoCommit) 278 recman.commit (); 279 return (V) obj; 280 } catch (IOException e) { 281 throw new SpaceError (e); 282 } 283 } 284 285 /** 286 * In probe takes an entry from the space if one exists, 287 * return null otherwise. 288 * @param key Entry's key 289 * @return value or null 290 */ 291 public synchronized V inp (Object key) { 292 try { 293 if (key instanceof Template) 294 return (V) getObject ((Template) key, true); 295 296 Object obj = null; 297 Ref ref = getFirst (key, true); 298 if (ref != null) { 299 obj = recman.fetch (ref.recid); 300 recman.delete (ref.recid); 301 } 302 if (autoCommit) 303 recman.commit (); 304 return (V) obj; 305 } catch (IOException e) { 306 throw new SpaceError (e); 307 } 308 } 309 public synchronized V in (Object key) { 310 Object obj; 311 while ((obj = inp (key)) == null) { 312 try { 313 this.wait (); 314 } catch (InterruptedException ignored) { } 315 } 316 return (V) obj; 317 } 318 /** 319 * Take an entry from the space, waiting forever until one exists. 320 * @param key Entry's key 321 * @return value 322 */ 323 public synchronized V in (Object key, long timeout) { 324 Object obj; 325 Instant now = Instant.now(); 326 long duration; 327 while ((obj = inp (key)) == null && 328 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 329 { 330 try { 331 this.wait (timeout - duration); 332 } catch (InterruptedException ignored) { } 333 } 334 return (V) obj; 335 } 336 337 /** 338 * Read an entry from the space, waiting forever until one exists. 339 * @param key Entry's key 340 * @return value 341 */ 342 public synchronized V rd (Object key) { 343 Object obj; 344 while ((obj = rdp (key)) == null) { 345 try { 346 this.wait (); 347 } catch (InterruptedException ignored) { } 348 } 349 return (V) obj; 350 } 351 352 /** 353 * Read an entry from the space, waiting a limited amount of time 354 * until one exists. 355 * @param key Entry's key 356 * @param timeout millis to wait 357 * @return value or null 358 */ 359 public synchronized V rd (Object key, long timeout) { 360 Object obj; 361 Instant now = Instant.now(); 362 long duration; 363 while ((obj = rdp (key)) == null && 364 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 365 { 366 try { 367 this.wait (timeout - duration); 368 } catch (InterruptedException ignored) { } 369 } 370 return (V) obj; 371 } 372 public synchronized void nrd (Object key) { 373 while (rdp (key) != null) { 374 try { 375 this.wait (NRD_RESOLUTION); 376 } catch (InterruptedException ignored) { } 377 } 378 } 379 public synchronized V nrd (Object key, long timeout) { 380 Object obj; 381 Instant now = Instant.now(); 382 long duration; 383 while ((obj = rdp (key)) != null && 384 (duration = Duration.between(now, Instant.now()).toMillis()) < timeout) 385 { 386 try { 387 this.wait (Math.min(NRD_RESOLUTION, timeout - duration)); 388 } catch (InterruptedException ignored) { } 389 } 390 return (V) obj; 391 } 392 393 /** 394 * @param key the Key 395 * @return aproximately queue size 396 */ 397 public long size (Object key) { 398 try { 399 Head head = (Head) htree.get (key); 400 return head != null ? head.count : 0; 401 } catch (IOException e) { 402 throw new SpaceError (e); 403 } 404 } 405 public boolean existAny (Object[] keys) { 406 for (Object key : keys) { 407 if (rdp(key) != null) 408 return true; 409 } 410 return false; 411 } 412 public boolean existAny (Object[] keys, long timeout) { 413 Instant now = Instant.now(); 414 long duration; 415 while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) { 416 if (existAny (keys)) 417 return true; 418 synchronized (this) { 419 try { 420 wait (timeout - duration); 421 } catch (InterruptedException ignored) { } 422 } 423 } 424 return false; 425 } 426 public synchronized void put (K key, V value, long timeout) { 427 while (inp (key) != null) 428 ; // NOPMD 429 out (key, value, timeout); 430 } 431 public synchronized void put (K key, V value) { 432 while (inp (key) != null) 433 ; // NOPMD 434 out (key, value); 435 } 436 private void purge (Object key) throws IOException { 437 Head head = (Head) htree.get (key); 438 Ref previousRef = null; 439 if (head != null) { 440 for (long recid = head.first; recid >= 0; ) { 441 Ref r = (Ref) recman.fetch (recid, refSerializer); 442 if (r.isExpired ()) { 443 recman.delete (r.recid); 444 recman.delete (recid); 445 head.count--; 446 if (previousRef == null) { 447 head.first = r.next; 448 } else { 449 previousRef.next = r.next; 450 recman.update ( 451 head.last, previousRef, refSerializer 452 ); 453 } 454 } else { 455 previousRef = r; 456 head.last = recid; 457 } 458 recid = r.next; 459 } 460 if (head.first == -1) { 461 htree.remove (key); 462 } 463 else { 464 htree.put (key, head); 465 } 466 } 467 } 468 469 @Override 470 public void run () { 471 try { 472 gc(); 473 } catch (Exception | SpaceError ex) { 474 // this happens when e.g. the jdbm file is corrupted 475 ex.printStackTrace(); 476 } 477 } 478 /** 479 * garbage collector. 480 * removes expired entries 481 */ 482 public void gc () { 483 final String GCKEY = "GC$" + Integer.toString (hashCode()); 484 final long TIMEOUT = 24 * 3600 * 1000; 485 Object obj; 486 try { 487 synchronized (this) { 488 // avoid concurrent gc 489 if (rdp (GCKEY) != null) 490 return; 491 ((Space)this).out (GCKEY, Boolean.TRUE, TIMEOUT); 492 } 493 FastIterator iter = htree.keys (); 494 495 try { 496 while ( (obj = iter.next()) != null) { 497 ((Space)this).out (GCKEY, obj, TIMEOUT); 498 Thread.yield (); 499 } 500 } catch (ConcurrentModificationException e) { 501 // ignore, we may have better luck on next try 502 } 503 while ( (obj = inp (GCKEY)) != null) { 504 synchronized (this) { 505 purge (obj); 506 recman.commit (); 507 } 508 Thread.yield (); 509 } 510 } catch (IOException e) { 511 throw new SpaceError (e); 512 } 513 } 514 public String getKeys () { 515 StringBuilder sb = new StringBuilder(); 516 try { 517 FastIterator iter = htree.keys (); 518 Object obj; 519 while ( (obj = iter.next()) != null) { 520 if (sb.length() > 0) 521 sb.append (' '); 522 sb.append (obj.toString()); 523 } 524 } catch (IOException e) { 525 throw new SpaceError (e); 526 } 527 return sb.toString(); 528 } 529 530 private Ref getFirst (Object key, boolean remove) throws IOException { 531 Head head = (Head) htree.get (key); 532 Ref ref = null; 533 if (head != null) { 534 long recid; 535 for (recid = head.first; recid >= 0; ) { 536 Ref r = (Ref) recman.fetch (recid, refSerializer); 537 if (r.isExpired ()) { 538 recman.delete (r.recid); 539 recman.delete (recid); 540 recid = r.next; 541 head.count--; 542 } else { 543 ref = r; 544 if (remove) { 545 recman.delete (recid); 546 recid = ref.next; 547 head.count--; 548 } 549 break; 550 } 551 } 552 if (head.first != recid) { 553 if (recid < 0) 554 htree.remove (key); 555 else { 556 head.first = recid; 557 htree.put (key, head); 558 } 559 } 560 } 561 return ref; 562 } 563 private void unlinkRef 564 (long recid, Head head, Ref r, Ref previousRef, long previousRecId) 565 throws IOException 566 { 567 recman.delete (r.recid); 568 recman.delete (recid); 569 head.count--; 570 if (previousRef == null) 571 head.first = r.next; 572 else { 573 previousRef.next = r.next; 574 recman.update ( 575 previousRecId, previousRef, refSerializer 576 ); 577 } 578 } 579 private Object getObject (Template tmpl, boolean remove) 580 throws IOException 581 { 582 Object obj = null; 583 Object key = tmpl.getKey(); 584 Head head = (Head) htree.get (key); 585 Ref previousRef = null; 586 long previousRecId = 0; 587 int unlinkCount = 0; 588 if (head != null) { 589 for (long recid = head.first; recid >= 0; ) { 590 Ref r = (Ref) recman.fetch (recid, refSerializer); 591 if (r.isExpired ()) { 592 unlinkRef (recid, head, r, previousRef, previousRecId); 593 unlinkCount++; 594 } else { 595 Object o = recman.fetch (r.recid); 596 if (o != null && tmpl.equals(o)) { 597 obj = o; 598 if (remove) { 599 unlinkRef ( 600 recid, head, r, previousRef, previousRecId 601 ); 602 unlinkCount++; 603 } 604 break; 605 } 606 previousRef = r; 607 previousRecId = recid; 608 } 609 recid = r.next; 610 } 611 if (unlinkCount > 0) { 612 if (head.first == -1) { 613 htree.remove (key); 614 } 615 else { 616 htree.put (key, head); 617 } 618 } 619 } 620 return obj; 621 } 622 static class Head implements Externalizable { 623 public long first; 624 public long last; 625 public long count; 626 static final long serialVersionUID = 2L; 627 628 public Head () { 629 super (); 630 first = -1; 631 last = -1; 632 } 633 public void writeExternal (ObjectOutput out) throws IOException { 634 out.writeLong (first); 635 out.writeLong (last); 636 out.writeLong (count); 637 } 638 public void readExternal (ObjectInput in) throws IOException { 639 first = in.readLong (); 640 last = in.readLong (); 641 count = in.readLong (); 642 } 643 public String toString() { 644 return getClass().getName() 645 + "@" + Integer.toHexString(hashCode()) 646 + ":[first=" + first 647 + ",last=" + last 648 + "]"; 649 } 650 } 651 static class Ref implements Serializer { 652 long recid; 653 long expires; 654 long next; 655 static final long serialVersionUID = 1L; 656 657 public Ref () { 658 super(); 659 } 660 public Ref (long recid, long expires) { 661 super(); 662 this.recid = recid; 663 this.expires = expires; 664 this.next = -1; 665 } 666 667 public boolean isExpired () { 668 return expires < Instant.now().toEpochMilli(); 669 } 670 public String toString() { 671 return getClass().getName() 672 + "@" + Integer.toHexString(hashCode()) 673 + ":[recid=" + recid 674 + ",next=" + next 675 + ",expired=" + isExpired () 676 + "]"; 677 } 678 public byte[] serialize (Object obj) 679 throws IOException 680 { 681 Ref d = (Ref) obj; 682 683 byte[] buf = new byte [24]; 684 putLong (buf, 0, d.recid); 685 putLong (buf, 8, d.next); 686 putLong (buf,16, d.expires); 687 return buf; 688 } 689 public Object deserialize (byte[] serialized) 690 throws IOException 691 { 692 Ref d = new Ref (); 693 d.recid = getLong (serialized, 0); 694 d.next = getLong (serialized, 8); 695 d.expires = getLong (serialized, 16); 696 return d; 697 } 698 } 699 static void putLong (byte[] b, int off, long val) { 700 b[off+7] = (byte) val; 701 b[off+6] = (byte) (val >>> 8); 702 b[off+5] = (byte) (val >>> 16); 703 b[off+4] = (byte) (val >>> 24); 704 b[off+3] = (byte) (val >>> 32); 705 b[off+2] = (byte) (val >>> 40); 706 b[off+1] = (byte) (val >>> 48); 707 b[off] = (byte) (val >>> 56); 708 } 709 static long getLong (byte[] b, int off) { 710 return (b[off+7] & 0xFFL) + 711 ((b[off+6] & 0xFFL) << 8) + 712 ((b[off+5] & 0xFFL) << 16) + 713 ((b[off+4] & 0xFFL) << 24) + 714 ((b[off+3] & 0xFFL) << 32) + 715 ((b[off+2] & 0xFFL) << 40) + 716 ((b[off+1] & 0xFFL) << 48) + 717 ((b[off] & 0xFFL) << 56); 718 } 719}