001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import jdbm.RecordManager;
022import jdbm.RecordManagerFactory;
023import jdbm.RecordManagerOptions;
024import jdbm.helper.FastIterator;
025import jdbm.helper.Serializer;
026import jdbm.htree.HTree;
027import org.jpos.util.DefaultTimer;
028
029import java.io.Externalizable;
030import java.io.IOException;
031import java.io.ObjectInput;
032import java.io.ObjectOutput;
033import java.time.Duration;
034import java.time.Instant;
035import java.util.*;
036
037/**
038 * JDBM based persistent space implementation
039 *
040 * @param <K> key type stored in this space
041 * @param <V> value type stored in this space
042 * @author Alejandro Revilla
043 * @author Kris Leite
044 * @version $Revision$ $Date$
045 * @since 1.4.7
046 */
047@SuppressWarnings("unchecked")
048public class JDBMSpace<K,V> extends TimerTask implements Space<K,V>, PersistentSpace {
049    /** JDBM hash tree storing the space entries. */
050    protected HTree htree;
051    /** Underlying JDBM record manager used to persist {@link #htree}. */
052    protected RecordManager recman;
053    /** Shared serializer used for {@code Ref} values stored in {@link #htree}. */
054    protected static final Serializer refSerializer = new Ref ();
055    /** Cache of named JDBM spaces, keyed by space name. */
056    protected static final Map<String,Space> spaceRegistrar = new HashMap<String,Space> ();
057    /** Whether mutating operations should commit immediately. */
058    protected boolean autoCommit = true;
059    /** Space name used for registration. */
060    protected String name;
061    /** Periodic interval, in milliseconds, between background GC sweeps. */
062    public static final long GCDELAY = 5*60*1000;
063    private static final long NRD_RESOLUTION = 500L;
064
065    /**
066     * protected constructor.
067     * @param name Space Name
068     * @param filename underlying JDBM filename
069     */
070    protected JDBMSpace (String name, String filename) {
071        super();
072        this.name = name;
073        try {
074            Properties props = new Properties();
075            props.put (RecordManagerOptions.CACHE_SIZE, "512");
076            recman = RecordManagerFactory.createRecordManager (filename, props);
077            long recid = recman.getNamedObject ("space");
078            if (recid != 0) {
079                htree = HTree.load (recman, recid);
080            } else {
081                htree = HTree.createInstance (recman);
082                recman.setNamedObject ("space", htree.getRecid());
083            }
084            recman.commit ();
085        } catch (IOException e) {
086            throw new SpaceError (e);
087        }
088        DefaultTimer.getTimer().schedule (this, GCDELAY, GCDELAY);
089    }
090    /**
091     * Returns a reference to the default JDBMSpace, backed by a file named {@code space}.
092     *
093     * @return reference to default JDBMSpace
094     */
095    public static JDBMSpace getSpace() {
096        return getSpace ("space");
097    }
098    /**
099     * creates a named JDBMSpace 
100     * (filename used for storage is the same as the given name)
101     * @param name the Space name
102     * @return reference to named JDBMSpace
103     */
104    public static JDBMSpace getSpace(String name) {
105        return getSpace(name, name);
106    }
107    /**
108     * creates a named JDBMSpace
109     * @param name the Space name
110     * @param filename the storage file name
111     * @return reference to named JDBMSpace
112     */
113    public synchronized static JDBMSpace
114        getSpace (String name, String filename) 
115    {
116        JDBMSpace sp = (JDBMSpace) spaceRegistrar.get (name);
117        if (sp == null) {
118            sp = new JDBMSpace (name, filename);
119            spaceRegistrar.put (name, sp);
120        }
121        return sp;
122    }
123    /**
124     * Use with utmost care and at your own risk.
125     *
126     * If you are to perform several operations on the space you
127     * should synchronize on the space, i.e:
128     * <pre>
129     *   synchronized (sp) {
130     *     sp.setAutoCommit (false);
131     *     sp.out (..., ...)
132     *     sp.out (..., ...)
133     *     ...
134     *     ...
135     *     sp.inp (...);
136     *     sp.commit ();    // or sp.rollback ();
137     *     sp.setAutoCommit (true);
138     *   }
139     * </pre>
140     * @param b true or false
141     */
142    public void setAutoCommit (boolean b) {
143        this.autoCommit = b;
144    }
145    /**
146     * force commit
147     */
148    public void commit () {
149        try {
150            recman.commit ();
151            this.notifyAll ();
152        } catch (IOException e) {
153            throw new SpaceError (e);
154        }
155    }
156    /**
157     * force rollback
158     */
159    public void rollback () {
160        try {
161            recman.rollback ();
162        } catch (IOException e) {
163            throw new SpaceError (e);
164        }
165    }
166    /**
167     * close this space - use with care
168     */
169    public void close () {
170        synchronized (JDBMSpace.class) {
171            spaceRegistrar.remove (name);
172        }
173        synchronized (this) {
174            try {
175                recman.close ();
176                recman = null;
177            } catch (IOException e) {
178                throw new SpaceError (e);
179            }
180        }
181    }
182    /**
183     * Write a new entry into the Space
184     * @param key Entry's key
185     * @param value Object value
186     */
187    public void out (K key, V value) {
188        out (key, value, -1);
189    }
190    /**
191     * Write a new entry into the Space
192     * The entry will timeout after the specified period
193     * @param key Entry's key
194     * @param value Object value
195     * @param timeout entry timeout in millis
196     */
197    public void out (K key, V value, long timeout) {
198        if (key == null || value == null)
199            throw new NullPointerException ("key=" + key + ", value=" + value);
200        try {
201            synchronized (this) {
202                long recid = recman.insert (value);
203
204                long expiration = timeout == -1 ? Long.MAX_VALUE :
205                        Instant.now().toEpochMilli() + timeout;
206                Ref dataRef = new Ref (recid, expiration);
207                long dataRefRecId = recman.insert (dataRef, refSerializer);
208
209                Head head = (Head) htree.get (key);
210                if (head == null) {
211                    head = new Head ();
212                    head.first = dataRefRecId;
213                    head.last  = dataRefRecId;
214                    head.count = 1;
215                } else {
216                    long previousLast = head.last;
217                    Ref lastRef   = 
218                        (Ref) recman.fetch (previousLast, refSerializer);
219                    lastRef.next      = dataRefRecId;
220                    head.last         = dataRefRecId;
221                    head.count++;
222                    recman.update (previousLast, lastRef, refSerializer);
223                }
224                htree.put (key, head);
225                if (autoCommit) {
226                    recman.commit ();
227                    this.notifyAll ();
228                }
229            }
230        } catch (IOException e) {
231            throw new SpaceError (e);
232        }
233    }
234    public void push (K key, V value) {
235        push (key, value, -1);
236    }
237    /**
238     * Write a new entry into the Space at the head of a queue
239     * The entry will timeout after the specified period
240     * @param key Entry's key
241     * @param value Object value
242     * @param timeout entry timeout in millis
243     */
244    public void push (Object key, Object value, long timeout) {
245        if (key == null || value == null)
246            throw new NullPointerException ("key=" + key + ", value=" + value);
247        try {
248            synchronized (this) {
249                long recid = recman.insert (value);
250                long expiration = timeout == -1 ? Long.MAX_VALUE :
251                        Instant.now().toEpochMilli() + timeout;
252                Ref dataRef = new Ref (recid, expiration);
253
254                Head head = (Head) htree.get (key);
255                if (head == null) {
256                    head = new Head ();
257                    head.first = head.last = recman.insert (dataRef, refSerializer);
258                } else {
259                    dataRef.next = head.first;
260                    head.first   = recman.insert (dataRef, refSerializer);
261                }
262                head.count++;
263                htree.put (key, head);
264                if (autoCommit) {
265                    recman.commit ();
266                    this.notifyAll ();
267                }
268            }
269        } catch (IOException e) {
270            throw new SpaceError (e);
271        }
272    }
273    /**
274     * Read probe reads an entry from the space if one exists, 
275     * return null otherwise.
276     * @param key Entry's key
277     * @return value or null
278     */
279    public synchronized V rdp (Object key) {
280        try {
281            if (key instanceof Template) 
282                return (V) getObject ((Template) key, false);
283
284            Object obj = null;
285            Ref ref = getFirst (key, false);
286            if (ref != null) 
287                obj = recman.fetch (ref.recid);
288            if (autoCommit)
289                recman.commit ();
290            return (V) obj;
291        } catch (IOException e) {
292            throw new SpaceError (e);
293        }
294    }
295
296    /**
297     * In probe takes an entry from the space if one exists, 
298     * return null otherwise.
299     * @param key Entry's key
300     * @return value or null
301     */
302    public synchronized V inp (Object key) {
303        try {
304            if (key instanceof Template) 
305                return (V) getObject ((Template) key, true);
306
307            Object obj = null;
308            Ref ref = getFirst (key, true);
309            if (ref != null) {
310                obj = recman.fetch (ref.recid);
311                recman.delete (ref.recid);
312            }
313            if (autoCommit)
314                recman.commit ();
315            return (V) obj;
316        } catch (IOException e) {
317            throw new SpaceError (e);
318        }
319    }
320    public synchronized V in (Object key) {
321        Object obj;
322        while ((obj = inp (key)) == null) {
323            try {
324                this.wait ();
325            } catch (InterruptedException ignored) { }
326        }
327        return (V) obj;
328    }
329    /**
330     * Take an entry from the space, waiting forever until one exists.
331     * @param key Entry's key
332     * @return value
333     */
334    public synchronized V in (Object key, long timeout) {
335        Object obj;
336        Instant now = Instant.now();
337        long duration;
338        while ((obj = inp (key)) == null &&
339                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
340        {
341            try {
342                this.wait (timeout - duration);
343            } catch (InterruptedException ignored) { }
344        }
345        return (V) obj;
346    }
347
348    /**
349     * Read an entry from the space, waiting forever until one exists.
350     * @param key Entry's key
351     * @return value
352     */
353    public synchronized V rd  (Object key) {
354        Object obj;
355        while ((obj = rdp (key)) == null) {
356            try {
357                this.wait ();
358            } catch (InterruptedException ignored) { }
359        }
360        return (V) obj;
361    }
362
363    /**
364     * Read an entry from the space, waiting a limited amount of time
365     * until one exists.
366     * @param key Entry's key
367     * @param timeout millis to wait
368     * @return value or null
369     */
370    public synchronized V rd  (Object key, long timeout) {
371        Object obj;
372        Instant now = Instant.now();
373        long duration;
374        while ((obj = rdp (key)) == null &&
375                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
376        {
377            try {
378                this.wait (timeout - duration);
379            } catch (InterruptedException ignored) { }
380        }
381        return (V) obj;
382    }
383    public synchronized void nrd  (Object key) {
384        while (rdp (key) != null) {
385            try {
386                this.wait (NRD_RESOLUTION);
387            } catch (InterruptedException ignored) { }
388        }
389    }
390    public synchronized V nrd  (Object key, long timeout) {
391        Object obj;
392        Instant now = Instant.now();
393        long duration;
394        while ((obj = rdp (key)) != null &&
395                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
396        {
397            try {
398                this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
399            } catch (InterruptedException ignored) { }
400        }
401        return (V) obj;
402    }
403
404    /**
405     * Returns the approximate number of entries currently stored under {@code key}.
406     *
407     * @param key the Key
408     * @return aproximately queue size
409     */
410    public long size (Object key) {
411        try {
412            Head head = (Head) htree.get (key);
413            return head != null ? head.count : 0;
414        } catch (IOException e) {
415            throw new SpaceError (e);
416        }
417    }
418    public boolean existAny (Object[] keys) {
419        for (Object key : keys) {
420            if (rdp(key) != null)
421                return true;
422        }
423        return false;
424    }
425    public boolean existAny (Object[] keys, long timeout) {
426        Instant now = Instant.now();
427        long duration;
428        while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
429            if (existAny (keys))
430                return true;
431            synchronized (this) {
432                try {
433                    wait (timeout - duration);
434                } catch (InterruptedException ignored) { }
435            }
436        }
437        return false;
438    }
439    public synchronized void put (K key, V value, long timeout) {
440        while (inp (key) != null)
441            ; // NOPMD
442        out (key, value, timeout);
443    }
444    public synchronized void put (K key, V value) {
445        while (inp (key) != null)
446            ; // NOPMD
447        out (key, value);
448    }
449    private void purge (Object key) throws IOException {
450        Head head = (Head) htree.get (key);
451        Ref previousRef = null;
452        if (head != null) {
453            for (long recid = head.first; recid >= 0; ) {
454                Ref r = (Ref) recman.fetch (recid, refSerializer);
455                if (r.isExpired ()) {
456                    recman.delete (r.recid);
457                    recman.delete (recid);
458                    head.count--;
459                    if (previousRef == null) {
460                        head.first = r.next;
461                    } else {
462                        previousRef.next = r.next;
463                        recman.update (
464                            head.last, previousRef, refSerializer
465                        );
466                    }
467                } else {
468                    previousRef   = r;
469                    head.last     = recid;
470                }
471                recid = r.next;
472            }
473            if (head.first == -1)  {
474                htree.remove (key);
475            }
476            else {
477                htree.put (key, head);
478            }
479        }
480    }
481
482    @Override
483    public void run () {
484        try {
485            gc();
486        } catch (Exception | SpaceError ex) {
487            // this happens when e.g. the jdbm file is corrupted
488            ex.printStackTrace();
489        }
490    }
491    /**
492     * garbage collector.
493     * removes expired entries
494     */
495    public void gc () {
496        final String GCKEY = "GC$" + Integer.toString (hashCode());
497        final long TIMEOUT = 24 * 3600 * 1000;
498        Object obj;
499        try {
500            synchronized (this) {
501                // avoid concurrent gc
502                if (rdp (GCKEY) != null) 
503                    return;
504                ((Space)this).out (GCKEY, Boolean.TRUE, TIMEOUT);  
505            }
506            FastIterator iter = htree.keys ();
507
508            try {
509                while ( (obj = iter.next()) != null) {
510                    ((Space)this).out (GCKEY, obj, TIMEOUT);
511                    Thread.yield ();
512                }
513            } catch (ConcurrentModificationException e) {
514                // ignore, we may have better luck on next try
515            }
516            while ( (obj = inp (GCKEY)) != null) {
517                synchronized (this) {
518                    purge (obj);
519                    recman.commit ();
520                }
521                Thread.yield ();
522            }    
523        } catch (IOException e) {
524            throw new SpaceError (e);
525        }
526    }
527    /**
528     * Returns a space-separated list of every key currently stored in this space.
529     *
530     * @return all keys, joined by single-space separators
531     */
532    public String getKeys () {
533        StringBuilder sb = new StringBuilder();
534        try {
535            FastIterator iter = htree.keys ();
536            Object obj;
537            while ( (obj = iter.next()) != null) {
538                if (sb.length() > 0)
539                    sb.append (' ');
540                sb.append (obj.toString());
541            }
542        } catch (IOException e) {
543            throw new SpaceError (e);
544        }
545        return sb.toString();
546    }
547    
548    private Ref getFirst (Object key, boolean remove) throws IOException {
549        Head head = (Head) htree.get (key);
550        Ref ref = null;
551        if (head != null) {
552            long recid;
553            for (recid = head.first; recid >= 0; ) {
554                Ref r = (Ref) recman.fetch (recid, refSerializer);
555                if (r.isExpired ()) {
556                    recman.delete (r.recid);
557                    recman.delete (recid);
558                    recid = r.next;
559                    head.count--;
560                } else  {
561                    ref = r;
562                    if (remove) {
563                        recman.delete (recid);
564                        recid = ref.next;
565                        head.count--;
566                    }
567                    break;
568                }
569            } 
570            if (head.first != recid) {
571                if (recid < 0)
572                    htree.remove (key);
573                else {
574                    head.first = recid;
575                    htree.put (key, head);
576                }
577            }
578        }
579        return ref;
580    }
581    private void unlinkRef
582        (long recid, Head head, Ref r, Ref previousRef, long previousRecId) 
583        throws IOException
584    {
585        recman.delete (r.recid);
586        recman.delete (recid);
587        head.count--;
588        if (previousRef == null)
589            head.first = r.next;
590        else {
591            previousRef.next = r.next;
592            recman.update (
593                previousRecId, previousRef, refSerializer
594            );
595        }
596    }
597    private Object getObject (Template tmpl, boolean remove) 
598        throws IOException 
599    {
600        Object obj = null;
601        Object key = tmpl.getKey();
602        Head head = (Head) htree.get (key);
603        Ref previousRef = null;
604        long previousRecId = 0;
605        int unlinkCount = 0;
606        if (head != null) {
607            for (long recid = head.first; recid >= 0; ) {
608                Ref r = (Ref) recman.fetch (recid, refSerializer);
609                if (r.isExpired ()) {
610                    unlinkRef (recid, head, r, previousRef, previousRecId);
611                    unlinkCount++;
612                } else  {
613                    Object o = recman.fetch (r.recid);
614                    if (o != null && tmpl.equals(o)) {
615                        obj = o;
616                        if (remove) {
617                            unlinkRef (
618                                recid, head, r, previousRef, previousRecId
619                            );
620                            unlinkCount++;
621                        }
622                        break;
623                    }
624                    previousRef = r;
625                    previousRecId = recid;
626                }
627                recid = r.next;
628            } 
629            if (unlinkCount > 0) {
630                if (head.first == -1)  {
631                    htree.remove (key);
632                }
633                else {
634                    htree.put (key, head);
635                }
636            }
637        }
638        return obj;
639    }
640    static class Head implements Externalizable {
641        public long first;
642        public long last;
643        public long count;
644        static final long serialVersionUID = 2L;
645
646        public Head () {
647            super ();
648            first = -1;
649            last  = -1;
650        }
651        public void writeExternal (ObjectOutput out) throws IOException {
652            out.writeLong (first);
653            out.writeLong (last);
654            out.writeLong (count);
655        }
656        public void readExternal (ObjectInput in) throws IOException {
657            first = in.readLong ();
658            last  = in.readLong ();
659            count = in.readLong ();
660        }
661        public String toString() {
662            return getClass().getName() 
663                + "@" + Integer.toHexString(hashCode())
664                + ":[first=" + first 
665                + ",last=" + last
666                + "]";
667        }
668    }
669    static class Ref implements Serializer {
670        long recid;
671        long expires;
672        long next;
673        static final long serialVersionUID = 1L;
674
675        public Ref () {
676            super();
677        }
678        public Ref (long recid, long expires) {
679            super();
680            this.recid   = recid;
681            this.expires = expires;
682            this.next    = -1;
683        }
684
685        public boolean isExpired () {
686            return expires < Instant.now().toEpochMilli();
687        }
688        public String toString() {
689            return getClass().getName() 
690                + "@" + Integer.toHexString(hashCode())
691                + ":[recid=" + recid
692                + ",next=" + next
693                + ",expired=" + isExpired ()
694                + "]";
695        }
696        public byte[] serialize (Object obj) 
697            throws IOException
698        {
699            Ref d = (Ref) obj;
700
701            byte[] buf = new byte [24];
702            putLong (buf, 0, d.recid);
703            putLong (buf, 8, d.next);
704            putLong (buf,16, d.expires);
705            return buf;
706        }
707        public Object deserialize (byte[] serialized) 
708            throws IOException
709        {
710            Ref d = new Ref ();
711            d.recid   = getLong (serialized,  0);
712            d.next    = getLong (serialized,  8);
713            d.expires = getLong (serialized, 16);
714            return d;
715        }
716    }
717    static void putLong (byte[] b, int off, long val) {
718        b[off+7] = (byte) val;
719        b[off+6] = (byte) (val >>>  8);
720        b[off+5] = (byte) (val >>> 16);
721        b[off+4] = (byte) (val >>> 24);
722        b[off+3] = (byte) (val >>> 32);
723        b[off+2] = (byte) (val >>> 40);
724        b[off+1] = (byte) (val >>> 48);
725        b[off] = (byte) (val >>> 56);
726    }
727    static long getLong (byte[] b, int off) {
728        return (b[off+7] & 0xFFL) +
729               ((b[off+6] & 0xFFL) << 8) +
730               ((b[off+5] & 0xFFL) << 16) +
731               ((b[off+4] & 0xFFL) << 24) +
732               ((b[off+3] & 0xFFL) << 32) +
733               ((b[off+2] & 0xFFL) << 40) +
734               ((b[off+1] & 0xFFL) << 48) +
735               ((b[off] & 0xFFL) << 56);
736    }
737}