001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020
021import java.io.*;
022import java.time.Duration;
023import java.time.Instant;
024import java.util.Map;
025import java.util.HashMap;
026import java.util.Set;
027import java.util.concurrent.Future;
028import java.util.concurrent.Semaphore;
029
030import com.sleepycat.je.*;
031import com.sleepycat.persist.EntityStore; 
032import com.sleepycat.persist.StoreConfig; 
033import com.sleepycat.persist.EntityCursor;
034import com.sleepycat.persist.PrimaryIndex;
035import com.sleepycat.persist.SecondaryIndex;
036import com.sleepycat.persist.model.Entity;
037import com.sleepycat.persist.model.Persistent;
038import com.sleepycat.persist.model.PrimaryKey;
039import com.sleepycat.persist.model.SecondaryKey;
040import com.sleepycat.persist.model.Relationship;
041import java.util.HashSet;
042import java.util.concurrent.TimeUnit;
043
044import org.jpos.iso.ISOUtil;
045import org.jpos.util.Log;
046import org.jpos.util.Loggeable;
047
048/**
049 * BerkeleyDB Jave Edition based persistent space implementation
050 *
051 * @author Alejandro Revilla
052 * @since 1.6.5
053 */
054@SuppressWarnings("unchecked")
055public class JESpace<K,V> extends Log implements LocalSpace<K,V>, PersistentSpace, Loggeable, Runnable {
056    Environment dbe = null;
057    EntityStore store = null;
058    PrimaryIndex<Long, Ref> pIndex = null;
059    PrimaryIndex<Long,GCRef> gcpIndex = null;
060    SecondaryIndex<String,Long, Ref> sIndex = null;
061    SecondaryIndex<Long,Long,GCRef> gcsIndex = null;
062    Semaphore gcSem = new Semaphore(1);
063    LocalSpace<Object,SpaceListener> sl;
064    private static final long NRD_RESOLUTION = 500L;
065    public static final long GC_DELAY = 15*1000L;
066    public static final long DEFAULT_TXN_TIMEOUT = 30*1000L;
067    public static final long DEFAULT_LOCK_TIMEOUT = 120*1000L;
068    private Future gcTask;
069
070    static final Map<String,Space> spaceRegistrar = 
071        new HashMap<String,Space> ();
072
073    public JESpace(String name, String params) throws SpaceError {
074        super();
075        try {
076            EnvironmentConfig envConfig = new EnvironmentConfig();
077            StoreConfig storeConfig = new StoreConfig();
078            String[] p = ISOUtil.commaDecode(params);
079            String path = p[0];
080            envConfig.setAllowCreate (true);
081            envConfig.setTransactional(true);
082            envConfig.setLockTimeout(getParam("lock.timeout", p, DEFAULT_LOCK_TIMEOUT), TimeUnit.MILLISECONDS);
083            envConfig.setTxnTimeout(getParam("txn.timeout", p, DEFAULT_TXN_TIMEOUT), TimeUnit.MILLISECONDS);
084            storeConfig.setAllowCreate (true);
085            storeConfig.setTransactional (true);
086
087            File dir = new File(path);
088            dir.mkdirs();
089
090            dbe = new Environment (dir, envConfig);
091            store = new EntityStore (dbe, name, storeConfig);
092            pIndex = store.getPrimaryIndex (Long.class, Ref.class);
093            gcpIndex = store.getPrimaryIndex (Long.class, GCRef.class);
094            sIndex = store.getSecondaryIndex (pIndex, String.class, "key");
095            gcsIndex = store.getSecondaryIndex (gcpIndex, Long.class, "expires");
096            gcTask = SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GC_DELAY, GC_DELAY, TimeUnit.MILLISECONDS);
097        } catch (Exception e) {
098            throw new SpaceError (e);
099        }
100    }
101
102    public void out (K key, V value) {
103        out (key, value, 0L);
104    }
105    public void out (K key, V value, long timeout) {
106        Transaction txn = null;
107        try {
108            txn = dbe.beginTransaction (null, null);
109            Ref ref = new Ref(key.toString(), value, timeout);
110            pIndex.put (ref);
111            if (timeout > 0L)
112                gcpIndex.putNoReturn (
113                    new GCRef (ref.getId(), ref.getExpiration())
114                );
115            txn.commit();
116            txn = null;
117            synchronized (this) {
118                notifyAll ();
119            }
120            if (sl != null)
121                notifyListeners(key, value);
122        } catch (Exception e) {
123            throw new SpaceError (e);
124        } finally {
125            if (txn != null)
126                abort (txn);
127        }
128    }
129    public void push (K key, V value, long timeout) {
130        Transaction txn = null;
131        try {
132            txn = dbe.beginTransaction (null, null);
133            Ref ref = new Ref(key.toString(), value, timeout);
134            pIndex.put (ref);
135            pIndex.delete (ref.getId());
136            ref.reverseId();
137            pIndex.put (ref);
138            txn.commit();
139            txn = null;
140            synchronized (this) {
141                notifyAll ();
142            }
143            if (sl != null)
144                notifyListeners(key, value);
145        } catch (Exception e) {
146            throw new SpaceError (e);
147        } finally {
148            if (txn != null)
149                abort (txn);
150        }
151    }
152    public void push (K key, V value) {
153        push (key, value, 0L);
154    }
155    @SuppressWarnings("unchecked")
156    public V rdp (Object key) {
157        try {
158            return (V) getObject (key, false);
159        } catch (DatabaseException e) {
160            throw new SpaceError (e);
161        }
162    }
163
164    @SuppressWarnings("unchecked")
165    public synchronized V in (Object key) {
166        Object obj;
167        while ((obj = inp (key)) == null) {
168            try {
169                this.wait ();
170            } catch (InterruptedException ignored) { }
171        }
172        return (V) obj;
173    }
174    @SuppressWarnings("unchecked")
175    public synchronized V in (Object key, long timeout) {
176        Object obj;
177        Instant now = Instant.now();
178        long duration;
179        while ((obj = inp (key)) == null &&
180                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
181        {
182            try {
183                this.wait (timeout - duration);
184            } catch (InterruptedException ignored) { }
185        }
186        return (V) obj;
187    }
188
189    @SuppressWarnings("unchecked")
190    public synchronized V rd  (Object key) {
191        Object obj;
192        while ((obj = rdp (key)) == null) {
193            try {
194                this.wait ();
195            } catch (InterruptedException ignored) { }
196        }
197        return (V) obj;
198    }
199    @SuppressWarnings("unchecked")
200    public synchronized V rd  (Object key, long timeout) {
201        Object obj;
202        Instant now = Instant.now();
203        long duration;
204        while ((obj = rdp (key)) == null &&
205                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
206        {
207            try {
208                this.wait (timeout - duration);
209            } catch (InterruptedException ignored) { }
210        }
211        return (V) obj;
212    }
213    public synchronized void nrd  (Object key) {
214        while (rdp (key) != null) {
215            try {
216                this.wait (NRD_RESOLUTION);
217            } catch (InterruptedException ignored) { }
218        }
219    }
220    public synchronized V nrd  (Object key, long timeout) {
221        Object obj;
222        Instant now = Instant.now();
223        long duration;
224        while ((obj = rdp (key)) != null &&
225                (duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
226        {
227            try {
228                this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
229            } catch (InterruptedException ignored) { }
230        }
231        return (V) obj;
232    }
233    @SuppressWarnings("unchecked")
234    public V inp (Object key) {
235        try {
236            return (V) getObject (key, true);
237        } catch (DatabaseException e) {
238            throw new SpaceError (e);
239        }
240    }
241
242    public boolean existAny (Object[] keys) {
243        for (Object key : keys) {
244            if (rdp(key) != null) {
245                return true;
246            }
247        }
248        return false;
249    }
250    public boolean existAny (Object[] keys, long timeout) {
251        Instant now = Instant.now();
252        long duration;
253        while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
254            if (existAny (keys))
255                return true;
256            synchronized (this) {
257                try {
258                    wait (timeout - duration);
259                } catch (InterruptedException ignored) { }
260            }
261        }
262        return false;
263    }
264    public synchronized void put (K key, V value, long timeout) {
265        while (inp (key) != null)
266            ; // NOPMD
267        out (key, value, timeout);
268    }
269    public synchronized void put (K key, V value) {
270        while (inp (key) != null)
271            ; // NOPMD
272        out (key, value);
273    }
274    public void gc () throws DatabaseException {
275        Transaction txn = null;
276        EntityCursor<GCRef> cursor = null;
277        try {
278            if (!gcSem.tryAcquire())
279                return;
280            txn = dbe.beginTransaction (null, null);
281            cursor = gcsIndex.entities (
282                txn, 0L, true, Instant.now().toEpochMilli(), false, null
283            );
284            for (GCRef gcRef: cursor) {
285                pIndex.delete (gcRef.getId());
286                cursor.delete ();
287            }
288            cursor.close();
289            cursor = null;
290            txn.commit();
291            txn = null;
292            if (sl != null) {
293                synchronized (this) {
294                    if (sl != null && sl.getKeySet().isEmpty())
295                        sl = null;
296                }
297            }
298        } finally {
299            if (cursor != null)
300                cursor.close();
301            if (txn != null)
302                abort (txn);
303            gcSem.release();
304        }
305    }
306    public void run() {
307        try {
308            gc();
309        } catch (Exception e) {
310            warn(e);
311        }
312    }
313    public void close () throws DatabaseException {
314        gcSem.acquireUninterruptibly();
315        gcTask.cancel(false);
316        while (!gcTask.isDone()) {
317            try {
318                Thread.sleep(500L);
319            } catch (InterruptedException ignored) { }
320        }
321        store.close ();
322        dbe.close();
323    }
324
325    public synchronized static JESpace getSpace (String name, String path)
326    {
327        JESpace sp = (JESpace) spaceRegistrar.get (name);
328        if (sp == null) {
329            sp = new JESpace(name, path);
330            spaceRegistrar.put (name, sp);
331        }
332        return sp;
333    }
334    public static JESpace getSpace (String name) {
335        return getSpace (name, name);        
336    }
337    private Object getObject (Object key, boolean remove) throws DatabaseException {
338        Transaction txn = null;
339        EntityCursor<Ref> cursor = null;
340        Template tmpl = null;
341        if (key instanceof Template) {
342            tmpl = (Template) key;
343            key  = tmpl.getKey();
344        }
345        try {
346            txn = dbe.beginTransaction (null, null);
347            cursor = sIndex.subIndex(key.toString()).entities(txn, null);
348            for (Ref ref : cursor) {
349                if (ref.isActive()) {
350                    if (tmpl != null && !tmpl.equals (ref.getValue()))
351                        continue;
352                    if (remove) {
353                        cursor.delete();
354                        if (ref.hasExpiration()) 
355                            gcpIndex.delete (txn, ref.getId());
356                    }
357                    cursor.close(); cursor = null;
358                    txn.commit(); txn = null;
359                    return ref.getValue();
360                }
361                else {
362                    cursor.delete();
363                    if (ref.hasExpiration()) 
364                        gcpIndex.delete (txn, ref.getId());
365                }
366            }
367            cursor.close(); cursor = null;
368            txn.commit(); txn = null;
369            return null;
370        } finally {
371            if (cursor != null)
372                cursor.close ();
373            if (txn != null)
374                txn.abort();
375        }
376    }
377    private void abort (Transaction txn) throws SpaceError {
378        try {
379            txn.abort();
380        } catch (DatabaseException e) {
381            throw new SpaceError (e);
382        }
383    }
384
385    private LocalSpace<Object,SpaceListener> getSL() {
386        synchronized (this) {
387            if (sl == null)
388                sl = new TSpace<Object,SpaceListener>();
389        }
390        return sl;
391    }
392
393    private void notifyListeners (Object key, Object value) {
394        Set<SpaceListener> listeners = new HashSet<SpaceListener>();
395        synchronized (this) {
396            if (sl == null)
397                return;
398            SpaceListener s = null;
399            while ((s = sl.inp(key)) != null)
400                listeners.add(s);
401            for (SpaceListener spl: listeners)
402                sl.out(key, spl);
403        }
404        for (SpaceListener spl: listeners)
405            spl.notify (key, value);
406    }
407
408    public synchronized void addListener(Object key, SpaceListener listener) {
409        getSL().out (key, listener);
410    }
411
412    public synchronized void addListener(Object key, SpaceListener listener, long timeout) {
413        getSL().out (key, listener);
414    }
415
416    public synchronized void removeListener(Object key, SpaceListener listener) {
417        if (sl != null)
418            sl.inp (new ObjectTemplate (key, listener));
419    }
420
421    public Set getKeySet() {
422        Set res = new HashSet();
423        Transaction txn = null;
424        EntityCursor<Ref> cursor = null;
425        try {
426            txn = dbe.beginTransaction (null, null);
427            cursor = sIndex.entities(txn, null);
428            for (Ref ref : cursor)
429                res.add(ref.getKey());
430            cursor.close();
431            cursor = null;
432            txn.commit();
433            txn = null;
434        } catch (IllegalStateException ex) {
435            warn (ex);
436        } finally {
437            if (cursor != null)
438                cursor.close ();
439            if (txn != null)
440                txn.abort();
441        }
442
443        return res;
444    }
445
446  public int size(Object key) {
447      Transaction txn = null;
448      EntityCursor<Ref> cursor = null;
449      try {
450          txn = dbe.beginTransaction (null, null);
451          cursor = sIndex.subIndex(key.toString()).entities(txn, null);
452          int keyCount = 0;
453          for (Ref ref : cursor)
454              if (ref.isActive())
455                  keyCount++;
456          cursor.close();
457          cursor = null;
458          txn.commit();
459          txn = null;
460          return keyCount;
461      } catch (IllegalStateException e) {
462          return -1;
463      } finally {
464          if (cursor != null)
465              cursor.close ();
466          if (txn != null)
467              txn.abort();
468      }
469  }
470
471    @Entity
472    public static class Ref {
473        @PrimaryKey(sequence="Ref.id")
474        private long id;
475
476        @SecondaryKey(relate= Relationship.MANY_TO_ONE)
477        private String key;
478
479        private long expires;
480        private Object value;
481
482        public Ref () {
483            super();
484        }
485        public Ref (String key, Object value, long timeout) {
486            this.key = key;
487            this.value =  serialize (value);
488            if (timeout > 0L)
489                this.expires = Instant.now().toEpochMilli() + timeout;
490        }
491        public long getId() {
492            return id;
493        }
494        public void reverseId() {
495            this.id = -this.id;
496        }
497        public boolean isExpired () {
498            return expires > 0L && expires < Instant.now().toEpochMilli();
499        }
500        public boolean isActive () {
501            return !isExpired();
502        }
503        public Object getKey () {
504            return key;
505        }
506        public Object getValue () {
507            return deserialize(value);
508        }
509        public long getExpiration () {
510            return expires;
511        }
512        public boolean hasExpiration () {
513            return expires > 0L;
514        }
515        private boolean isPersistent (Class c) {
516            return
517                c.isPrimitive() ||
518                c.isAnnotationPresent(Entity.class) ||
519                c.isAnnotationPresent(Persistent.class);
520        }
521        private Object serialize (Object obj) {
522            Class cls = obj.getClass();
523            if (isPersistent (cls))
524                return obj;
525
526            ByteArrayOutputStream baos = new ByteArrayOutputStream();
527            try {
528                ObjectOutputStream os = new ObjectOutputStream(baos);
529                os.writeObject(obj);
530                obj = baos.toByteArray();
531            } catch (IOException e) {
532                throw new SpaceError (e);
533            }
534            return obj;
535        }
536        private Object deserialize (Object obj) {
537            Class cls = obj.getClass();
538            if (isPersistent (cls))
539                return obj;
540
541            ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) obj);
542            try {
543                ObjectInputStream is = new ObjectInputStream (bais);
544                return is.readObject();
545            } catch (Exception e) {
546                throw new SpaceError (e);
547            }
548
549        }
550    }
551    
552    public void dump(PrintStream p, String indent) {
553        Transaction txn = null;
554        EntityCursor<Ref> cursor = null;
555        int count = 0;
556        try {
557            txn = dbe.beginTransaction (null, null);
558            cursor = sIndex.entities(txn, null);
559            String key = null;
560            int keyCount = 0;
561            for (Ref ref : cursor) {
562                if (ref.getKey().equals(key)) {
563                    keyCount++;
564                } else {
565                    if (key != null) {
566                        dumpKey (p, indent, key, keyCount);
567                        count++;
568                    }
569                    keyCount = 1;
570                    key = ref.getKey().toString();
571                }
572            }
573            if (key != null) {
574                dumpKey (p, indent, key, keyCount);
575                count++;
576            }
577            p.println(indent+"<keycount>"+count+"</keycount>");
578            cursor.close(); cursor = null;
579            txn.commit(); txn = null;
580        } catch (IllegalStateException e) {
581            //Empty Cursor
582            p.println(indent+"<keycount>0</keycount>");
583        } finally {
584            if (cursor != null)
585                cursor.close ();
586            if (txn != null)
587                txn.abort();
588        }
589    }
590
591    private void dumpKey (PrintStream p, String indent, String key, int count) {
592        if (count > 0)
593            p.printf ("%s<key size='%d'>%s</key>\n", indent, count, key);
594        else
595            p.printf ("%s<key>%s</key>\n", indent, key);
596    }
597
598    private long getParam (String name, String[] params, long defaultValue) {
599        for (String s : params) {
600            if (s.contains(name)) {
601                int pos = s.indexOf('=');
602                if (pos >=0 && s.length() > pos)
603                    return Long.valueOf(s.substring(pos+1).trim());
604            }
605        }
606        return defaultValue;
607    }
608
609    @Entity
610    public static class GCRef {
611        @PrimaryKey
612        private long id;
613
614        @SecondaryKey(relate=Relationship.MANY_TO_ONE)
615        private long expires;
616        public GCRef () {
617            super();
618        }
619        public GCRef (long id, long expires) {
620            this.id = id;
621            this.expires = expires;
622        }
623        public long getId() {
624            return id;
625        }
626        public boolean isExpired () {
627            return expires > 0L && expires < Instant.now().toEpochMilli();
628        }
629        public long getExpiration () {
630            return expires;
631        }
632    }
633}