001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import jdbm.RecordManager;
022import jdbm.RecordManagerFactory;
023import jdbm.RecordManagerOptions;
024import jdbm.helper.FastIterator;
025import jdbm.helper.Serializer;
026import jdbm.htree.HTree;
027import org.jpos.util.DefaultTimer;
028
029import java.io.Externalizable;
030import java.io.IOException;
031import java.io.ObjectInput;
032import java.io.ObjectOutput;
033import java.time.Duration;
034import java.time.Instant;
035import java.util.*;
036
037/**
038 * JDBM based persistent space implementation
039 *
040 * @author Alejandro Revilla
041 * @author Kris Leite
042 * @version $Revision$ $Date$
043 * @since 1.4.7
044 */
045@SuppressWarnings("unchecked")
046public class JDBMSpace<K,V> extends TimerTask implements Space<K,V>, PersistentSpace {
047    protected HTree htree;
048    protected RecordManager recman;
049    protected static final Serializer refSerializer = new Ref ();
050    protected static final Map<String,Space> spaceRegistrar = new HashMap<String,Space> ();
051    protected boolean autoCommit = true;
052    protected String name;
053    public static final long GCDELAY = 5*60*1000;
054    private static final long NRD_RESOLUTION = 500L;
055
056    /**
057     * protected constructor.
058     * @param name Space Name
059     * @param filename underlying JDBM filename
060     */
061    protected JDBMSpace (String name, String filename) {
062        super();
063        this.name = name;
064        try {
065            Properties props = new Properties();
066            props.put (RecordManagerOptions.CACHE_SIZE, "512");
067            recman = RecordManagerFactory.createRecordManager (filename, props);
068            long recid = recman.getNamedObject ("space");
069            if (recid != 0) {
070                htree = HTree.load (recman, recid);
071            } else {
072                htree = HTree.createInstance (recman);
073                recman.setNamedObject ("space", htree.getRecid());
074            }
075            recman.commit ();
076        } catch (IOException e) {
077            throw new SpaceError (e);
078        }
079        DefaultTimer.getTimer().schedule (this, GCDELAY, GCDELAY);
080    }
081    /**
082     * @return reference to default JDBMSpace
083     */
084    public static JDBMSpace getSpace() {
085        return getSpace ("space");
086    }
087    /**
088     * creates a named JDBMSpace 
089     * (filename used for storage is the same as the given name)
090     * @param name the Space name
091     * @return reference to named JDBMSpace
092     */
093    public static JDBMSpace getSpace(String name) {
094        return getSpace(name, name);
095    }
096    /**
097     * creates a named JDBMSpace
098     * @param name the Space name
099     * @param filename the storage file name
100     * @return reference to named JDBMSpace
101     */
102    public synchronized static JDBMSpace
103        getSpace (String name, String filename) 
104    {
105        JDBMSpace sp = (JDBMSpace) spaceRegistrar.get (name);
106        if (sp == null) {
107            sp = new JDBMSpace (name, filename);
108            spaceRegistrar.put (name, sp);
109        }
110        return sp;
111    }
112    /**
113     * Use with utmost care and at your own risk.
114     *
115     * If you are to perform several operations on the space you
116     * should synchronize on the space, i.e:
117     * <pre>
118     *   synchronized (sp) {
119     *     sp.setAutoCommit (false);
120     *     sp.out (..., ...)
121     *     sp.out (..., ...)
122     *     ...
123     *     ...
124     *     sp.inp (...);
125     *     sp.commit ();    // or sp.rollback ();
126     *     sp.setAutoCommit (true);
127     *   }
128     * </pre>
129     * @param b true or false
130     */
131    public void setAutoCommit (boolean b) {
132        this.autoCommit = b;
133    }
134    /**
135     * force commit
136     */
137    public void commit () {
138        try {
139            recman.commit ();
140            this.notifyAll ();
141        } catch (IOException e) {
142            throw new SpaceError (e);
143        }
144    }
145    /**
146     * force rollback
147     */
148    public void rollback () {
149        try {
150            recman.rollback ();
151        } catch (IOException e) {
152            throw new SpaceError (e);
153        }
154    }
155    /**
156     * close this space - use with care
157     */
158    public void close () {
159        synchronized (JDBMSpace.class) {
160            spaceRegistrar.remove (name);
161        }
162        synchronized (this) {
163            try {
164                recman.close ();
165                recman = null;
166            } catch (IOException e) {
167                throw new SpaceError (e);
168            }
169        }
170    }
171    /**
172     * Write a new entry into the Space
173     * @param key Entry's key
174     * @param value Object value
175     */
176    public void out (K key, V value) {
177        out (key, value, -1);
178    }
179    /**
180     * Write a new entry into the Space
181     * The entry will timeout after the specified period
182     * @param key Entry's key
183     * @param value Object value
184     * @param timeout entry timeout in millis
185     */
186    public void out (K key, V value, long timeout) {
187        if (key == null || value == null)
188            throw new NullPointerException ("key=" + key + ", value=" + value);
189        try {
190            synchronized (this) {
191                long recid = recman.insert (value);
192
193                long expiration = timeout == -1 ? Long.MAX_VALUE :
194                        Instant.now().toEpochMilli() + timeout;
195                Ref dataRef = new Ref (recid, expiration);
196                long dataRefRecId = recman.insert (dataRef, refSerializer);
197
198                Head head = (Head) htree.get (key);
199                if (head == null) {
200                    head = new Head ();
201                    head.first = dataRefRecId;
202                    head.last  = dataRefRecId;
203                    head.count = 1;
204                } else {
205                    long previousLast = head.last;
206                    Ref lastRef   = 
207                        (Ref) recman.fetch (previousLast, refSerializer);
208                    lastRef.next      = dataRefRecId;
209                    head.last         = dataRefRecId;
210                    head.count++;
211                    recman.update (previousLast, lastRef, refSerializer);
212                }
213                htree.put (key, head);
214                if (autoCommit) {
215                    recman.commit ();
216                    this.notifyAll ();
217                }
218            }
219        } catch (IOException e) {
220            throw new SpaceError (e);
221        }
222    }
223    public void push (K key, V value) {
224        push (key, value, -1);
225    }
226    /**
227     * Write a new entry into the Space at the head of a queue
228     * The entry will timeout after the specified period
229     * @param key Entry's key
230     * @param value Object value
231     * @param timeout entry timeout in millis
232     */
233    public void push (Object key, Object value, long timeout) {
234        if (key == null || value == null)
235            throw new NullPointerException ("key=" + key + ", value=" + value);
236        try {
237            synchronized (this) {
238                long recid = recman.insert (value);
239                long expiration = timeout == -1 ? Long.MAX_VALUE :
240                        Instant.now().toEpochMilli() + timeout;
241                Ref dataRef = new Ref (recid, expiration);
242
243                Head head = (Head) htree.get (key);
244                if (head == null) {
245                    head = new Head ();
246                    head.first = head.last = recman.insert (dataRef, refSerializer);
247                } else {
248                    dataRef.next = head.first;
249                    head.first   = recman.insert (dataRef, refSerializer);
250                }
251                head.count++;
252                htree.put (key, head);
253                if (autoCommit) {
254                    recman.commit ();
255                    this.notifyAll ();
256                }
257            }
258        } catch (IOException e) {
259            throw new SpaceError (e);
260        }
261    }
262    /**
263     * Read probe reads an entry from the space if one exists, 
264     * return null otherwise.
265     * @param key Entry's key
266     * @return value or null
267     */
268    public synchronized V rdp (Object key) {
269        try {
270            if (key instanceof Template) 
271                return (V) getObject ((Template) key, false);
272
273            Object obj = null;
274            Ref ref = getFirst (key, false);
275            if (ref != null) 
276                obj = recman.fetch (ref.recid);
277            if (autoCommit)
278                recman.commit ();
279            return (V) obj;
280        } catch (IOException e) {
281            throw new SpaceError (e);
282        }
283    }
284
285    /**
286     * In probe takes an entry from the space if one exists, 
287     * return null otherwise.
288     * @param key Entry's key
289     * @return value or null
290     */
291    public synchronized V inp (Object key) {
292        try {
293            if (key instanceof Template) 
294                return (V) getObject ((Template) key, true);
295
296            Object obj = null;
297            Ref ref = getFirst (key, true);
298            if (ref != null) {
299                obj = recman.fetch (ref.recid);
300                recman.delete (ref.recid);
301            }
302            if (autoCommit)
303                recman.commit ();
304            return (V) obj;
305        } catch (IOException e) {
306            throw new SpaceError (e);
307        }
308    }
309    public synchronized V in (Object key) {
310        Object obj;
311        while ((obj = inp (key)) == null) {
312            try {
313                this.wait ();
314            } catch (InterruptedException ignored) { }
315        }
316        return (V) obj;
317    }
318    /**
319     * Take an entry from the space, waiting forever until one exists.
320     * @param key Entry's key
321     * @return value
322     */
323    public synchronized V in (Object key, long timeout) {
324        Object obj;
325        Instant now = Instant.now();
326        long duration;
327        while ((obj = inp (key)) == null &&
328                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
329        {
330            try {
331                this.wait (timeout - duration);
332            } catch (InterruptedException ignored) { }
333        }
334        return (V) obj;
335    }
336
337    /**
338     * Read an entry from the space, waiting forever until one exists.
339     * @param key Entry's key
340     * @return value
341     */
342    public synchronized V rd  (Object key) {
343        Object obj;
344        while ((obj = rdp (key)) == null) {
345            try {
346                this.wait ();
347            } catch (InterruptedException ignored) { }
348        }
349        return (V) obj;
350    }
351
352    /**
353     * Read an entry from the space, waiting a limited amount of time
354     * until one exists.
355     * @param key Entry's key
356     * @param timeout millis to wait
357     * @return value or null
358     */
359    public synchronized V rd  (Object key, long timeout) {
360        Object obj;
361        Instant now = Instant.now();
362        long duration;
363        while ((obj = rdp (key)) == null &&
364                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
365        {
366            try {
367                this.wait (timeout - duration);
368            } catch (InterruptedException ignored) { }
369        }
370        return (V) obj;
371    }
372    public synchronized void nrd  (Object key) {
373        while (rdp (key) != null) {
374            try {
375                this.wait (NRD_RESOLUTION);
376            } catch (InterruptedException ignored) { }
377        }
378    }
379    public synchronized V nrd  (Object key, long timeout) {
380        Object obj;
381        Instant now = Instant.now();
382        long duration;
383        while ((obj = rdp (key)) != null &&
384                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
385        {
386            try {
387                this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
388            } catch (InterruptedException ignored) { }
389        }
390        return (V) obj;
391    }
392
393    /**
394     * @param key the Key
395     * @return aproximately queue size
396     */
397    public long size (Object key) {
398        try {
399            Head head = (Head) htree.get (key);
400            return head != null ? head.count : 0;
401        } catch (IOException e) {
402            throw new SpaceError (e);
403        }
404    }
405    public boolean existAny (Object[] keys) {
406        for (Object key : keys) {
407            if (rdp(key) != null)
408                return true;
409        }
410        return false;
411    }
412    public boolean existAny (Object[] keys, long timeout) {
413        Instant now = Instant.now();
414        long duration;
415        while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
416            if (existAny (keys))
417                return true;
418            synchronized (this) {
419                try {
420                    wait (timeout - duration);
421                } catch (InterruptedException ignored) { }
422            }
423        }
424        return false;
425    }
426    public synchronized void put (K key, V value, long timeout) {
427        while (inp (key) != null)
428            ; // NOPMD
429        out (key, value, timeout);
430    }
431    public synchronized void put (K key, V value) {
432        while (inp (key) != null)
433            ; // NOPMD
434        out (key, value);
435    }
436    private void purge (Object key) throws IOException {
437        Head head = (Head) htree.get (key);
438        Ref previousRef = null;
439        if (head != null) {
440            for (long recid = head.first; recid >= 0; ) {
441                Ref r = (Ref) recman.fetch (recid, refSerializer);
442                if (r.isExpired ()) {
443                    recman.delete (r.recid);
444                    recman.delete (recid);
445                    head.count--;
446                    if (previousRef == null) {
447                        head.first = r.next;
448                    } else {
449                        previousRef.next = r.next;
450                        recman.update (
451                            head.last, previousRef, refSerializer
452                        );
453                    }
454                } else {
455                    previousRef   = r;
456                    head.last     = recid;
457                }
458                recid = r.next;
459            }
460            if (head.first == -1)  {
461                htree.remove (key);
462            }
463            else {
464                htree.put (key, head);
465            }
466        }
467    }
468
469    @Override
470    public void run () {
471        try {
472            gc();
473        } catch (Exception | SpaceError ex) {
474            // this happens when e.g. the jdbm file is corrupted
475            ex.printStackTrace();
476        }
477    }
478    /**
479     * garbage collector.
480     * removes expired entries
481     */
482    public void gc () {
483        final String GCKEY = "GC$" + Integer.toString (hashCode());
484        final long TIMEOUT = 24 * 3600 * 1000;
485        Object obj;
486        try {
487            synchronized (this) {
488                // avoid concurrent gc
489                if (rdp (GCKEY) != null) 
490                    return;
491                ((Space)this).out (GCKEY, Boolean.TRUE, TIMEOUT);  
492            }
493            FastIterator iter = htree.keys ();
494
495            try {
496                while ( (obj = iter.next()) != null) {
497                    ((Space)this).out (GCKEY, obj, TIMEOUT);
498                    Thread.yield ();
499                }
500            } catch (ConcurrentModificationException e) {
501                // ignore, we may have better luck on next try
502            }
503            while ( (obj = inp (GCKEY)) != null) {
504                synchronized (this) {
505                    purge (obj);
506                    recman.commit ();
507                }
508                Thread.yield ();
509            }    
510        } catch (IOException e) {
511            throw new SpaceError (e);
512        }
513    }
514    public String getKeys () {
515        StringBuilder sb = new StringBuilder();
516        try {
517            FastIterator iter = htree.keys ();
518            Object obj;
519            while ( (obj = iter.next()) != null) {
520                if (sb.length() > 0)
521                    sb.append (' ');
522                sb.append (obj.toString());
523            }
524        } catch (IOException e) {
525            throw new SpaceError (e);
526        }
527        return sb.toString();
528    }
529    
530    private Ref getFirst (Object key, boolean remove) throws IOException {
531        Head head = (Head) htree.get (key);
532        Ref ref = null;
533        if (head != null) {
534            long recid;
535            for (recid = head.first; recid >= 0; ) {
536                Ref r = (Ref) recman.fetch (recid, refSerializer);
537                if (r.isExpired ()) {
538                    recman.delete (r.recid);
539                    recman.delete (recid);
540                    recid = r.next;
541                    head.count--;
542                } else  {
543                    ref = r;
544                    if (remove) {
545                        recman.delete (recid);
546                        recid = ref.next;
547                        head.count--;
548                    }
549                    break;
550                }
551            } 
552            if (head.first != recid) {
553                if (recid < 0)
554                    htree.remove (key);
555                else {
556                    head.first = recid;
557                    htree.put (key, head);
558                }
559            }
560        }
561        return ref;
562    }
563    private void unlinkRef
564        (long recid, Head head, Ref r, Ref previousRef, long previousRecId) 
565        throws IOException
566    {
567        recman.delete (r.recid);
568        recman.delete (recid);
569        head.count--;
570        if (previousRef == null)
571            head.first = r.next;
572        else {
573            previousRef.next = r.next;
574            recman.update (
575                previousRecId, previousRef, refSerializer
576            );
577        }
578    }
579    private Object getObject (Template tmpl, boolean remove) 
580        throws IOException 
581    {
582        Object obj = null;
583        Object key = tmpl.getKey();
584        Head head = (Head) htree.get (key);
585        Ref previousRef = null;
586        long previousRecId = 0;
587        int unlinkCount = 0;
588        if (head != null) {
589            for (long recid = head.first; recid >= 0; ) {
590                Ref r = (Ref) recman.fetch (recid, refSerializer);
591                if (r.isExpired ()) {
592                    unlinkRef (recid, head, r, previousRef, previousRecId);
593                    unlinkCount++;
594                } else  {
595                    Object o = recman.fetch (r.recid);
596                    if (o != null && tmpl.equals(o)) {
597                        obj = o;
598                        if (remove) {
599                            unlinkRef (
600                                recid, head, r, previousRef, previousRecId
601                            );
602                            unlinkCount++;
603                        }
604                        break;
605                    }
606                    previousRef = r;
607                    previousRecId = recid;
608                }
609                recid = r.next;
610            } 
611            if (unlinkCount > 0) {
612                if (head.first == -1)  {
613                    htree.remove (key);
614                }
615                else {
616                    htree.put (key, head);
617                }
618            }
619        }
620        return obj;
621    }
622    static class Head implements Externalizable {
623        public long first;
624        public long last;
625        public long count;
626        static final long serialVersionUID = 2L;
627
628        public Head () {
629            super ();
630            first = -1;
631            last  = -1;
632        }
633        public void writeExternal (ObjectOutput out) throws IOException {
634            out.writeLong (first);
635            out.writeLong (last);
636            out.writeLong (count);
637        }
638        public void readExternal (ObjectInput in) throws IOException {
639            first = in.readLong ();
640            last  = in.readLong ();
641            count = in.readLong ();
642        }
643        public String toString() {
644            return getClass().getName() 
645                + "@" + Integer.toHexString(hashCode())
646                + ":[first=" + first 
647                + ",last=" + last
648                + "]";
649        }
650    }
651    static class Ref implements Serializer {
652        long recid;
653        long expires;
654        long next;
655        static final long serialVersionUID = 1L;
656
657        public Ref () {
658            super();
659        }
660        public Ref (long recid, long expires) {
661            super();
662            this.recid   = recid;
663            this.expires = expires;
664            this.next    = -1;
665        }
666
667        public boolean isExpired () {
668            return expires < Instant.now().toEpochMilli();
669        }
670        public String toString() {
671            return getClass().getName() 
672                + "@" + Integer.toHexString(hashCode())
673                + ":[recid=" + recid
674                + ",next=" + next
675                + ",expired=" + isExpired ()
676                + "]";
677        }
678        public byte[] serialize (Object obj) 
679            throws IOException
680        {
681            Ref d = (Ref) obj;
682
683            byte[] buf = new byte [24];
684            putLong (buf, 0, d.recid);
685            putLong (buf, 8, d.next);
686            putLong (buf,16, d.expires);
687            return buf;
688        }
689        public Object deserialize (byte[] serialized) 
690            throws IOException
691        {
692            Ref d = new Ref ();
693            d.recid   = getLong (serialized,  0);
694            d.next    = getLong (serialized,  8);
695            d.expires = getLong (serialized, 16);
696            return d;
697        }
698    }
699    static void putLong (byte[] b, int off, long val) {
700        b[off+7] = (byte) val;
701        b[off+6] = (byte) (val >>>  8);
702        b[off+5] = (byte) (val >>> 16);
703        b[off+4] = (byte) (val >>> 24);
704        b[off+3] = (byte) (val >>> 32);
705        b[off+2] = (byte) (val >>> 40);
706        b[off+1] = (byte) (val >>> 48);
707        b[off] = (byte) (val >>> 56);
708    }
709    static long getLong (byte[] b, int off) {
710        return (b[off+7] & 0xFFL) +
711               ((b[off+6] & 0xFFL) << 8) +
712               ((b[off+5] & 0xFFL) << 16) +
713               ((b[off+4] & 0xFFL) << 24) +
714               ((b[off+3] & 0xFFL) << 32) +
715               ((b[off+2] & 0xFFL) << 40) +
716               ((b[off+1] & 0xFFL) << 48) +
717               ((b[off] & 0xFFL) << 56);
718    }
719}