001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import java.io.*;
022import java.time.Duration;
023import java.time.Instant;
024import java.util.Map;
025import java.util.HashMap;
026import java.util.Set;
027import java.util.concurrent.Future;
028import java.util.concurrent.Semaphore;
029
030import com.sleepycat.je.*;
031import com.sleepycat.persist.EntityStore; 
032import com.sleepycat.persist.StoreConfig; 
033import com.sleepycat.persist.EntityCursor;
034import com.sleepycat.persist.PrimaryIndex;
035import com.sleepycat.persist.SecondaryIndex;
036import com.sleepycat.persist.model.Entity;
037import com.sleepycat.persist.model.Persistent;
038import com.sleepycat.persist.model.PrimaryKey;
039import com.sleepycat.persist.model.SecondaryKey;
040import com.sleepycat.persist.model.Relationship;
041import java.util.HashSet;
042import java.util.concurrent.TimeUnit;
043
044import org.jpos.iso.ISOUtil;
045import org.jpos.util.Log;
046import org.jpos.util.Loggeable;
047
048/**
049 * BerkeleyDB Jave Edition based persistent space implementation
050 *
051 * @author Alejandro Revilla
052 * @since 1.6.5
053
054 * @param <K> the key type
055 * @param <V> the value type
056 */
057@SuppressWarnings("unchecked")
058public class JESpace<K,V> extends Log implements LocalSpace<K,V>, PersistentSpace, Loggeable, Runnable {
059    /** BerkeleyDB JE environment instance. */
060    Environment dbe = null;
061    /** BerkeleyDB JE entity store. */
062    EntityStore store = null;
063    /** Primary index for Ref entities. */
064    PrimaryIndex<Long, Ref> pIndex = null;
065    /** Primary index for GCRef entities. */
066    PrimaryIndex<Long,GCRef> gcpIndex = null;
067    /** Secondary index for Ref entities by key. */
068    SecondaryIndex<String,Long, Ref> sIndex = null;
069    /** Secondary index for GCRef entities by expiration time. */
070    SecondaryIndex<Long,Long,GCRef> gcsIndex = null;
071    /** Semaphore used to prevent concurrent GC runs. */
072    Semaphore gcSem = new Semaphore(1);
073    /** Local space used to manage space listeners. */
074    LocalSpace<Object,SpaceListener> sl;
075    /** Resolution in milliseconds for non-blocking read polling. */
076    private static final long NRD_RESOLUTION = 500L;
077    /** Delay in milliseconds between GC runs. */
078    public static final long GC_DELAY = 15*1000L;
079    /** Default transaction timeout in milliseconds. */
080    public static final long DEFAULT_TXN_TIMEOUT = 30*1000L;
081    /** Default lock timeout in milliseconds. */
082    public static final long DEFAULT_LOCK_TIMEOUT = 120*1000L;
083    /** Future handle for the scheduled GC task. */
084    private Future gcTask;
085
086    /** Registry mapping space names to their JESpace instances. */
087    static final Map<String,Space> spaceRegistrar = 
088        new HashMap<String,Space> ();
089
090    /**
091     * Constructs a JESpace with the given name and path/parameter string.
092     *
093     * @param name   the space name (also used as the entity store name)
094     * @param params comma-separated parameters; first element is the directory path
095     * @throws SpaceError if the BerkeleyDB environment or store cannot be opened
096     */
097    public JESpace(String name, String params) throws SpaceError {
098        super();
099        try {
100            EnvironmentConfig envConfig = new EnvironmentConfig();
101            StoreConfig storeConfig = new StoreConfig();
102            String[] p = ISOUtil.commaDecode(params);
103            String path = p[0];
104            envConfig.setAllowCreate (true);
105            envConfig.setTransactional(true);
106            envConfig.setLockTimeout(getParam("lock.timeout", p, DEFAULT_LOCK_TIMEOUT), TimeUnit.MILLISECONDS);
107            envConfig.setTxnTimeout(getParam("txn.timeout", p, DEFAULT_TXN_TIMEOUT), TimeUnit.MILLISECONDS);
108            storeConfig.setAllowCreate (true);
109            storeConfig.setTransactional (true);
110
111            File dir = new File(path);
112            dir.mkdirs();
113
114            dbe = new Environment (dir, envConfig);
115            store = new EntityStore (dbe, name, storeConfig);
116            pIndex = store.getPrimaryIndex (Long.class, Ref.class);
117            gcpIndex = store.getPrimaryIndex (Long.class, GCRef.class);
118            sIndex = store.getSecondaryIndex (pIndex, String.class, "key");
119            gcsIndex = store.getSecondaryIndex (gcpIndex, Long.class, "expires");
120            gcTask = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GC_DELAY, GC_DELAY, TimeUnit.MILLISECONDS);
121        } catch (Exception e) {
122            throw new SpaceError (e);
123        }
124    }
125
126    public void out (K key, V value) {
127        out (key, value, 0L);
128    }
129    public void out (K key, V value, long timeout) {
130        Transaction txn = null;
131        try {
132            txn = dbe.beginTransaction (null, null);
133            Ref ref = new Ref(key.toString(), value, timeout);
134            pIndex.put (ref);
135            if (timeout > 0L)
136                gcpIndex.putNoReturn (
137                    new GCRef (ref.getId(), ref.getExpiration())
138                );
139            txn.commit();
140            txn = null;
141            synchronized (this) {
142                notifyAll ();
143            }
144            if (sl != null)
145                notifyListeners(key, value);
146        } catch (Exception e) {
147            throw new SpaceError (e);
148        } finally {
149            if (txn != null)
150                abort (txn);
151        }
152    }
153    public void push (K key, V value, long timeout) {
154        Transaction txn = null;
155        try {
156            txn = dbe.beginTransaction (null, null);
157            Ref ref = new Ref(key.toString(), value, timeout);
158            pIndex.put (ref);
159            pIndex.delete (ref.getId());
160            ref.reverseId();
161            pIndex.put (ref);
162            txn.commit();
163            txn = null;
164            synchronized (this) {
165                notifyAll ();
166            }
167            if (sl != null)
168                notifyListeners(key, value);
169        } catch (Exception e) {
170            throw new SpaceError (e);
171        } finally {
172            if (txn != null)
173                abort (txn);
174        }
175    }
176    public void push (K key, V value) {
177        push (key, value, 0L);
178    }
179    @SuppressWarnings("unchecked")
180    public V rdp (Object key) {
181        try {
182            return (V) getObject (key, false);
183        } catch (DatabaseException e) {
184            throw new SpaceError (e);
185        }
186    }
187
188    @SuppressWarnings("unchecked")
189    public synchronized V in (Object key) {
190        Object obj;
191        while ((obj = inp (key)) == null) {
192            try {
193                this.wait ();
194            } catch (InterruptedException ignored) { }
195        }
196        return (V) obj;
197    }
198    @SuppressWarnings("unchecked")
199    public synchronized V in (Object key, long timeout) {
200        Object obj;
201        Instant now = Instant.now();
202        long duration;
203        while ((obj = inp (key)) == null &&
204                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
205        {
206            try {
207                this.wait (timeout - duration);
208            } catch (InterruptedException ignored) { }
209        }
210        return (V) obj;
211    }
212
213    @SuppressWarnings("unchecked")
214    public synchronized V rd  (Object key) {
215        Object obj;
216        while ((obj = rdp (key)) == null) {
217            try {
218                this.wait ();
219            } catch (InterruptedException ignored) { }
220        }
221        return (V) obj;
222    }
223    @SuppressWarnings("unchecked")
224    public synchronized V rd  (Object key, long timeout) {
225        Object obj;
226        Instant now = Instant.now();
227        long duration;
228        while ((obj = rdp (key)) == null &&
229                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
230        {
231            try {
232                this.wait (timeout - duration);
233            } catch (InterruptedException ignored) { }
234        }
235        return (V) obj;
236    }
237    public synchronized void nrd  (Object key) {
238        while (rdp (key) != null) {
239            try {
240                this.wait (NRD_RESOLUTION);
241            } catch (InterruptedException ignored) { }
242        }
243    }
244    public synchronized V nrd  (Object key, long timeout) {
245        Object obj;
246        Instant now = Instant.now();
247        long duration;
248        while ((obj = rdp (key)) != null &&
249                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
250        {
251            try {
252                this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
253            } catch (InterruptedException ignored) { }
254        }
255        return (V) obj;
256    }
257    @SuppressWarnings("unchecked")
258    public V inp (Object key) {
259        try {
260            return (V) getObject (key, true);
261        } catch (DatabaseException e) {
262            throw new SpaceError (e);
263        }
264    }
265
266    public boolean existAny (Object[] keys) {
267        for (Object key : keys) {
268            if (rdp(key) != null) {
269                return true;
270            }
271        }
272        return false;
273    }
274    public boolean existAny (Object[] keys, long timeout) {
275        Instant now = Instant.now();
276        long duration;
277        while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
278            if (existAny (keys))
279                return true;
280            synchronized (this) {
281                try {
282                    wait (timeout - duration);
283                } catch (InterruptedException ignored) { }
284            }
285        }
286        return false;
287    }
288    public synchronized void put (K key, V value, long timeout) {
289        while (inp (key) != null)
290            ; // NOPMD
291        out (key, value, timeout);
292    }
293    /** Removes all existing entries for the key then writes a single entry (head-of-queue replacement).
294     * @param key the entry key
295     * @param value the new value
296     */
297    public synchronized void put (K key, V value) {
298        while (inp (key) != null)
299            ; // NOPMD
300        out (key, value);
301    }
302    /** Runs a garbage-collection pass removing expired entries from the BDB JE store.
303     * @throws DatabaseException on BDB error
304     */
305    public void gc () throws DatabaseException {
306        Transaction txn = null;
307        EntityCursor<GCRef> cursor = null;
308        try {
309            if (!gcSem.tryAcquire())
310                return;
311            txn = dbe.beginTransaction (null, null);
312            cursor = gcsIndex.entities (
313                txn, 0L, true, Instant.now().toEpochMilli(), false, null
314            );
315            for (GCRef gcRef: cursor) {
316                pIndex.delete (gcRef.getId());
317                cursor.delete ();
318            }
319            cursor.close();
320            cursor = null;
321            txn.commit();
322            txn = null;
323            if (sl != null) {
324                synchronized (this) {
325                    if (sl != null && sl.getKeySet().isEmpty())
326                        sl = null;
327                }
328            }
329        } finally {
330            if (cursor != null)
331                cursor.close();
332            if (txn != null)
333                abort (txn);
334            gcSem.release();
335        }
336    }
337    public void run() {
338        try {
339            gc();
340        } catch (Exception e) {
341            warn(e);
342        }
343    }
344    public void close () throws DatabaseException {
345        gcSem.acquireUninterruptibly();
346        gcTask.cancel(false);
347        while (!gcTask.isDone()) {
348            try {
349                Thread.sleep(500L);
350            } catch (InterruptedException ignored) { }
351        }
352        store.close ();
353        dbe.close();
354    }
355
356    /** Returns (or creates) the named JESpace stored at the given path.
357     * @param name space name
358     * @param path filesystem path for BDB JE environment
359     * @return the JESpace instance
360     */
361    public synchronized static JESpace getSpace (String name, String path)
362    {
363        JESpace sp = (JESpace) spaceRegistrar.get (name);
364        if (sp == null) {
365            sp = new JESpace(name, path);
366            spaceRegistrar.put (name, sp);
367        }
368        return sp;
369    }
370    /** Returns (or creates) the named JESpace using the name as the storage path.
371     * @param name space name and storage path
372     * @return the JESpace instance
373     */
374    public static JESpace getSpace (String name) {
375        return getSpace (name, name);        
376    }
377    private Object getObject (Object key, boolean remove) throws DatabaseException {
378        Transaction txn = null;
379        EntityCursor<Ref> cursor = null;
380        Template tmpl = null;
381        if (key instanceof Template) {
382            tmpl = (Template) key;
383            key  = tmpl.getKey();
384        }
385        try {
386            txn = dbe.beginTransaction (null, null);
387            cursor = sIndex.subIndex(key.toString()).entities(txn, null);
388            for (Ref ref : cursor) {
389                if (ref.isActive()) {
390                    if (tmpl != null && !tmpl.equals (ref.getValue()))
391                        continue;
392                    if (remove) {
393                        cursor.delete();
394                        if (ref.hasExpiration()) 
395                            gcpIndex.delete (txn, ref.getId());
396                    }
397                    cursor.close(); cursor = null;
398                    txn.commit(); txn = null;
399                    return ref.getValue();
400                }
401                else {
402                    cursor.delete();
403                    if (ref.hasExpiration()) 
404                        gcpIndex.delete (txn, ref.getId());
405                }
406            }
407            cursor.close(); cursor = null;
408            txn.commit(); txn = null;
409            return null;
410        } finally {
411            if (cursor != null)
412                cursor.close ();
413            if (txn != null)
414                txn.abort();
415        }
416    }
417    private void abort (Transaction txn) throws SpaceError {
418        try {
419            txn.abort();
420        } catch (DatabaseException e) {
421            throw new SpaceError (e);
422        }
423    }
424
425    private LocalSpace<Object,SpaceListener> getSL() {
426        synchronized (this) {
427            if (sl == null)
428                sl = new TSpace<Object,SpaceListener>();
429        }
430        return sl;
431    }
432
433    private void notifyListeners (Object key, Object value) {
434        Set<SpaceListener> listeners = new HashSet<SpaceListener>();
435        synchronized (this) {
436            if (sl == null)
437                return;
438            SpaceListener s = null;
439            while ((s = sl.inp(key)) != null)
440                listeners.add(s);
441            for (SpaceListener spl: listeners)
442                sl.out(key, spl);
443        }
444        for (SpaceListener spl: listeners)
445            spl.notify (key, value);
446    }
447
448    public synchronized void addListener(Object key, SpaceListener listener) {
449        getSL().out (key, listener);
450    }
451
452    public synchronized void addListener(Object key, SpaceListener listener, long timeout) {
453        getSL().out (key, listener);
454    }
455
456    public synchronized void removeListener(Object key, SpaceListener listener) {
457        if (sl != null)
458            sl.inp (new ObjectTemplate (key, listener));
459    }
460
461    public Set getKeySet() {
462        Set res = new HashSet();
463        Transaction txn = null;
464        EntityCursor<Ref> cursor = null;
465        try {
466            txn = dbe.beginTransaction (null, null);
467            cursor = sIndex.entities(txn, null);
468            for (Ref ref : cursor)
469                res.add(ref.getKey());
470            cursor.close();
471            cursor = null;
472            txn.commit();
473            txn = null;
474        } catch (IllegalStateException ex) {
475            warn (ex);
476        } finally {
477            if (cursor != null)
478                cursor.close ();
479            if (txn != null)
480                txn.abort();
481        }
482
483        return res;
484    }
485
486  public int size(Object key) {
487      Transaction txn = null;
488      EntityCursor<Ref> cursor = null;
489      try {
490          txn = dbe.beginTransaction (null, null);
491          cursor = sIndex.subIndex(key.toString()).entities(txn, null);
492          int keyCount = 0;
493          for (Ref ref : cursor)
494              if (ref.isActive())
495                  keyCount++;
496          cursor.close();
497          cursor = null;
498          txn.commit();
499          txn = null;
500          return keyCount;
501      } catch (IllegalStateException e) {
502          return -1;
503      } finally {
504          if (cursor != null)
505              cursor.close ();
506          if (txn != null)
507              txn.abort();
508      }
509  }
510
511    /**
512     * Persistent entity representing a single space entry (key/value with optional expiration).
513     */
514    @Entity
515    public static class Ref {
516        @PrimaryKey(sequence="Ref.id")
517        /** Auto-generated primary key for this Ref. */
518        private long id;
519
520        @SecondaryKey(relate= Relationship.MANY_TO_ONE)
521        /** The space key associated with this Ref. */
522        private String key;
523
524        /** Expiration timestamp in epoch milliseconds, or 0 if no expiration. */
525        private long expires;
526        /** The serialized or native value stored in the space. */
527        private Object value;
528
529        /** Default constructor required by BerkeleyDB JE. */
530        public Ref () {
531            super();
532        }
533
534        /**
535         * Constructs a Ref for the given key, value and timeout.
536         *
537         * @param key     the space key
538         * @param value   the value to store
539         * @param timeout timeout in milliseconds, or 0 for no expiration
540         */
541        public Ref (String key, Object value, long timeout) {
542            this.key = key;
543            this.value =  serialize (value);
544            if (timeout > 0L)
545                this.expires = Instant.now().toEpochMilli() + timeout;
546        }
547
548        /**
549         * Returns the primary key id of this Ref.
550         *
551         * @return the primary key
552         */
553        public long getId() {
554            return id;
555        }
556
557        /**
558         * Negates the id to push this entry to the front of an ordered scan (push semantics).
559         */
560        public void reverseId() {
561            this.id = -this.id;
562        }
563
564        /**
565         * Returns {@code true} if this Ref has passed its expiration time.
566         *
567         * @return {@code true} if expired
568         */
569        public boolean isExpired () {
570            return expires > 0L && expires < Instant.now().toEpochMilli();
571        }
572
573        /**
574         * Returns {@code true} if this Ref has not yet expired.
575         *
576         * @return {@code true} if still active
577         */
578        public boolean isActive () {
579            return !isExpired();
580        }
581
582        /**
583         * Returns the space key of this Ref.
584         *
585         * @return the key string
586         */
587        public Object getKey () {
588            return key;
589        }
590
591        /**
592         * Returns the deserialized value of this Ref.
593         *
594         * @return the stored value
595         */
596        public Object getValue () {
597            return deserialize(value);
598        }
599
600        /**
601         * Returns the expiration timestamp in epoch milliseconds.
602         *
603         * @return expiration time, or 0 if no expiration is set
604         */
605        public long getExpiration () {
606            return expires;
607        }
608
609        /**
610         * Returns {@code true} if this Ref has an expiration set.
611         *
612         * @return {@code true} if an expiration time is set
613         */
614        public boolean hasExpiration () {
615            return expires > 0L;
616        }
617        private boolean isPersistent (Class c) {
618            return
619                c.isPrimitive() ||
620                c.isAnnotationPresent(Entity.class) ||
621                c.isAnnotationPresent(Persistent.class);
622        }
623        private Object serialize (Object obj) {
624            Class cls = obj.getClass();
625            if (isPersistent (cls))
626                return obj;
627
628            ByteArrayOutputStream baos = new ByteArrayOutputStream();
629            try {
630                ObjectOutputStream os = new ObjectOutputStream(baos);
631                os.writeObject(obj);
632                obj = baos.toByteArray();
633            } catch (IOException e) {
634                throw new SpaceError (e);
635            }
636            return obj;
637        }
638        private Object deserialize (Object obj) {
639            Class cls = obj.getClass();
640            if (isPersistent (cls))
641                return obj;
642
643            ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) obj);
644            try {
645                ObjectInputStream is = org.jpos.util.Serializer.createSafeObjectInputStream(bais);
646                return is.readObject();
647            } catch (Exception e) {
648                throw new SpaceError (e);
649            }
650
651        }
652    }
653    
654    public void dump(PrintStream p, String indent) {
655        Transaction txn = null;
656        EntityCursor<Ref> cursor = null;
657        int count = 0;
658        try {
659            txn = dbe.beginTransaction (null, null);
660            cursor = sIndex.entities(txn, null);
661            String key = null;
662            int keyCount = 0;
663            for (Ref ref : cursor) {
664                if (ref.getKey().equals(key)) {
665                    keyCount++;
666                } else {
667                    if (key != null) {
668                        dumpKey (p, indent, key, keyCount);
669                        count++;
670                    }
671                    keyCount = 1;
672                    key = ref.getKey().toString();
673                }
674            }
675            if (key != null) {
676                dumpKey (p, indent, key, keyCount);
677                count++;
678            }
679            p.println(indent+"<keycount>"+count+"</keycount>");
680            cursor.close(); cursor = null;
681            txn.commit(); txn = null;
682        } catch (IllegalStateException e) {
683            //Empty Cursor
684            p.println(indent+"<keycount>0</keycount>");
685        } finally {
686            if (cursor != null)
687                cursor.close ();
688            if (txn != null)
689                txn.abort();
690        }
691    }
692
693    private void dumpKey (PrintStream p, String indent, String key, int count) {
694        if (count > 0)
695            p.printf ("%s<key size='%d'>%s</key>\n", indent, count, key);
696        else
697            p.printf ("%s<key>%s</key>\n", indent, key);
698    }
699
700    private long getParam (String name, String[] params, long defaultValue) {
701        for (String s : params) {
702            if (s.contains(name)) {
703                int pos = s.indexOf('=');
704                if (pos >=0 && s.length() > pos)
705                    return Long.valueOf(s.substring(pos+1).trim());
706            }
707        }
708        return defaultValue;
709    }
710
711    /**
712     * Persistent entity used by the garbage collector to track expiring Ref entries.
713     */
714    @Entity
715    public static class GCRef {
716        @PrimaryKey
717        /** The id of the corresponding Ref entry to be garbage collected. */
718        private long id;
719
720        @SecondaryKey(relate=Relationship.MANY_TO_ONE)
721        /** Expiration timestamp in epoch milliseconds used to order GC candidates. */
722        private long expires;
723
724        /** Default constructor required by BerkeleyDB JE. */
725        public GCRef () {
726            super();
727        }
728
729        /**
730         * Constructs a GCRef for the given Ref id and expiration time.
731         *
732         * @param id      the id of the Ref to be collected
733         * @param expires the expiration timestamp in epoch milliseconds
734         */
735        public GCRef (long id, long expires) {
736            this.id = id;
737            this.expires = expires;
738        }
739
740        /**
741         * Returns the id of the corresponding Ref entry.
742         *
743         * @return the Ref primary key
744         */
745        public long getId() {
746            return id;
747        }
748
749        /**
750         * Returns {@code true} if the expiration time has passed.
751         *
752         * @return {@code true} if this GCRef is expired
753         */
754        public boolean isExpired () {
755            return expires > 0L && expires < Instant.now().toEpochMilli();
756        }
757
758        /**
759         * Returns the expiration timestamp in epoch milliseconds.
760         *
761         * @return expiration time
762         */
763        public long getExpiration () {
764            return expires;
765        }
766    }
767}