001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020import org.jpos.jfr.SpaceEvent;
021import org.jpos.util.Loggeable;
022import java.io.PrintStream;
023import java.io.Serializable;
024import java.time.Duration;
025import java.time.Instant;
026import java.util.*;
027import java.util.concurrent.TimeUnit;
028
029/**
030 * TSpace implementation
031 * @author Alejandro Revilla
032 * @version $Revision$ $Date$
033 * @since !.4.9
034
035 * @param <K> the key type
036 * @param <V> the value type
037 */
038
039@SuppressWarnings("unchecked")
040public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
041    /** Backing map keyed by user-supplied keys; values are entries or lists of entries. */
042    protected Map entries;
043    /** Per-key space listeners; itself a {@link TSpace} so it inherits the dispatch model. */
044    protected TSpace sl;    // space listeners
045    /** Periodic interval, in milliseconds, between background GC sweeps. */
046    public static final long GCDELAY = 5*1000;
047    private static final long GCLONG = 60_000L;
048    private static final long NRD_RESOLUTION = 500L;
049    private static final int MAX_ENTRIES_IN_DUMP = 1000;
050    private static final long ONE_MILLION = 1_000_000L;         // multiplier millis --> nanos
051    private final Set[] expirables;
052    private long lastLongGC = System.nanoTime();
053
054    /** Default constructor. */
055    public TSpace () {
056        super();
057        entries = new HashMap ();
058        expirables = new Set[] { new HashSet<K>(), new HashSet<K>() };
059        SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS);
060    }
061
062    @Override
063    public void out (K key, V value) {
064        var jfr = new SpaceEvent("out", "" + key);
065        jfr.begin();
066        if (key == null || value == null)
067            throw new NullPointerException ("key=" + key + ", value=" + value);
068        synchronized(this) {
069            List l = getList(key);
070            l.add (value);
071            if (l.size() == 1)
072                this.notifyAll ();
073        }
074        if (sl != null)
075            notifyListeners(key, value);
076        jfr.commit();
077    }
078
079    @Override
080    public void out (K key, V value, long timeout) {
081        var jfr = new SpaceEvent("out:tim", "" + key);
082        jfr.begin();
083
084        if (key == null || value == null) {
085            jfr.commit();
086            throw new NullPointerException("key=" + key + ", value=" + value);
087        }
088        Object v = value;
089        if (timeout > 0) {
090            v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
091        }
092        synchronized (this) {
093            List l = getList(key);
094            l.add(v);
095            if (l.size() == 1)
096                this.notifyAll ();
097            if (timeout > 0) {
098                registerExpirable(key, timeout);
099            }
100        }
101        if (sl != null)
102            notifyListeners(key, value);
103        jfr.commit();
104    }
105
106    @Override
107    public synchronized V rdp (Object key) {
108        var jfr = new SpaceEvent("rdp", "" + key);
109        jfr.begin();
110        try {
111            if (key instanceof Template)
112                return (V) getObject ((Template) key, false);
113            return (V) getHead (key, false);
114        } finally {
115            jfr.commit();
116        }
117    }
118
119    @Override
120    public synchronized V inp (Object key) {
121        var jfr = new SpaceEvent("inp", "" + key);
122        jfr.begin();
123        try {
124            if (key instanceof Template)
125                return (V) getObject ((Template) key, true);
126            return (V) getHead (key, true);
127        } finally {
128            jfr.commit();
129        }
130    }
131
132    @Override
133    public synchronized V in (Object key) {
134        Object obj;
135        while ((obj = inp (key)) == null) {
136            try {
137                this.wait ();
138            } catch (InterruptedException e) { }
139        }
140        return (V) obj;
141    }
142
143    @Override
144    public synchronized V in  (Object key, long timeout) {
145        V obj;
146        long now = System.nanoTime();
147        long to = now + timeout * ONE_MILLION;
148        long waitFor;
149        while ( (obj = inp (key)) == null &&
150                (waitFor = (to - System.nanoTime())) >= 0 )
151        {
152            try {
153                this.wait(Math.max(waitFor / ONE_MILLION, 1L));
154            } catch (InterruptedException e) { }
155        }
156        return obj;
157    }
158
159    @Override
160    public synchronized V rd  (Object key) {
161        Object obj;
162        while ((obj = rdp (key)) == null) {
163            try {
164                this.wait ();
165            } catch (InterruptedException e) { }
166        }
167        return (V) obj;
168    }
169
170    @Override
171    public synchronized V rd  (Object key, long timeout) {
172        V obj;
173        long now = System.nanoTime();
174        long to = now + (timeout * ONE_MILLION);
175        long waitFor;
176        while ( (obj = rdp (key)) == null &&
177                (waitFor = (to - System.nanoTime())) >= 0 )
178        {
179            try {
180                this.wait(Math.max(waitFor / ONE_MILLION, 1L));
181            } catch (InterruptedException e) { }
182        }
183        return obj;
184    }
185
186    @Override
187    public synchronized void nrd  (Object key) {
188        while (rdp (key) != null) {
189            try {
190                this.wait (NRD_RESOLUTION);
191            } catch (InterruptedException ignored) { }
192        }
193    }
194
195    @Override
196    public synchronized V nrd  (Object key, long timeout) {
197        V obj;
198        long now = System.nanoTime();
199        long to = now + (timeout * ONE_MILLION);
200        long waitFor;
201        while ( (obj = rdp (key)) != null &&
202                (waitFor = (to - System.nanoTime())) >= 0 )
203        {
204            try {
205                this.wait(Math.min(NRD_RESOLUTION,
206                                   Math.max(waitFor / ONE_MILLION, 1L)));
207            } catch (InterruptedException ignored) { }
208        }
209        return obj;
210    }
211
212    @Override
213    public void run () {
214        try {
215            gc();
216        } catch (Exception e) {
217            e.printStackTrace(); // this should never happen
218        }
219    }
220
221    /**
222     * Sweeps the short-lived expirable set, and the long-lived set when its
223     * sweep interval has elapsed.
224     */
225    public void gc () {
226        gc(0);
227        if (System.nanoTime() - lastLongGC > GCLONG*ONE_MILLION) {
228            gc(1);
229            lastLongGC = System.nanoTime();
230        }
231    }
232
233    private void gc (int generation) {
234        var jfr = new SpaceEvent("gc", Integer.toString(generation));
235        jfr.begin();
236
237        Set<K> exps;
238        synchronized (this) {
239            exps = expirables[generation];
240            expirables[generation] = new HashSet<K>();
241        }
242        for (K k : exps) {
243            if (rdp(k) != null) {
244                synchronized (this) {
245                    expirables[generation].add(k);
246                }
247            }
248            Thread.yield ();
249        }
250        if (sl != null) {
251            synchronized (this) {
252                if (sl != null && sl.isEmpty())
253                    sl = null;
254            }
255        }
256        jfr.commit();
257    }
258
259    @Override
260    public synchronized int size (Object key) {
261        var jfr = new SpaceEvent("size", "" + key);
262        jfr.begin();
263
264        int size = 0;
265        List l = (List) entries.get (key);
266        if (l != null) 
267            size = l.size();
268        jfr.commit();
269        return size;
270    }
271
272    @Override
273    public synchronized void addListener (Object key, SpaceListener listener) {
274        getSL().out (key, listener);
275    }
276
277    @Override
278    public synchronized void addListener 
279        (Object key, SpaceListener listener, long timeout) 
280    {
281        getSL().out (key, listener, timeout);
282    }
283
284    @Override
285    public synchronized void removeListener 
286        (Object key, SpaceListener listener) 
287    {
288        if (sl != null) {
289            sl.inp (new ObjectTemplate (key, listener));
290        }
291    }
292    /**
293     * Indicates whether the space currently holds any entries.
294     *
295     * @return {@code true} if no entries are stored
296     */
297    public boolean isEmpty() {
298        return entries.isEmpty();
299    }
300
301    @Override
302    public synchronized Set<K> getKeySet() {
303        return new HashSet<K>(entries.keySet());
304    }
305
306    /**
307     * Returns a space-separated list of every key currently stored.
308     *
309     * @return all keys, joined by single-space separators
310     */
311    public String getKeysAsString () {
312        StringBuilder sb = new StringBuilder();
313        Object[] keys;
314        synchronized (this) {
315            keys = entries.keySet().toArray();
316        }
317        for (int i=0; i<keys.length; i++) {
318            if (i > 0)
319                sb.append (' ');
320            sb.append (keys[i]);
321        }
322        return sb.toString();
323    }
324
325    @Override
326    public void dump(PrintStream p, String indent) {
327        var jfr = new SpaceEvent("dump", "");
328        jfr.begin();
329
330        Object[] keys;
331        int size = entries.size();
332        if (size > MAX_ENTRIES_IN_DUMP * 100) {
333            p.printf ("%sWARNING - space too big, size=%d%n", indent, size);
334            jfr.commit();
335            return;
336        }
337        synchronized (this) {
338            keys = entries.keySet().toArray();
339        }
340        int i=0;
341        for (Object key : keys) {
342            p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key);
343            if (i++ > MAX_ENTRIES_IN_DUMP) {
344                p.printf ("%s...%n", indent);
345                p.printf ("%s...%n", indent);
346                break;
347            }
348        }
349        p.printf("%s key-count: %d%n", indent, keys.length);
350        int exp0, exp1;
351        synchronized (this) {
352            exp0 = expirables[0].size();
353            exp1 = expirables[1].size();
354        }
355        p.printf("%s    gcinfo: %d,%d%n", indent, exp0, exp1);
356        jfr.commit();
357    }
358
359    /**
360     * Notifies every listener registered against {@code key} of an entry change.
361     *
362     * @param key entry key
363     * @param value the value just written or {@code null} when entries were removed
364     */
365    public void notifyListeners (Object key, Object value) {
366        var jfr = new SpaceEvent("notify", "" + key);
367        jfr.begin();
368
369        Object[] listeners = null;
370        synchronized (this) {
371            if (sl == null)
372                return;
373            List l = (List) sl.entries.get (key);
374            if (l != null)
375                listeners = l.toArray();
376        }
377        if (listeners != null) {
378            for (Object listener : listeners) {
379                Object o = listener;
380                if (o instanceof Expirable)
381                    o = ((Expirable) o).getValue();
382                if (o instanceof SpaceListener)
383                    ((SpaceListener) o).notify(key, value);
384            }
385        }
386        jfr.commit();
387    }
388
389    @Override
390    public void push (K key, V value) {
391        if (key == null || value == null)
392            throw new NullPointerException ("key=" + key + ", value=" + value);
393        var jfr = new SpaceEvent("push", "" + key);
394        jfr.begin();
395        synchronized(this) {
396            List l = getList(key);
397            boolean wasEmpty = l.isEmpty();
398            l.add (0, value);
399            if (wasEmpty)
400                this.notifyAll ();
401        }
402        if (sl != null)
403            notifyListeners(key, value);
404        jfr.commit();
405    }
406
407    @Override
408    public void push (K key, V value, long timeout) {
409        if (key == null || value == null)
410            throw new NullPointerException ("key=" + key + ", value=" + value);
411        var jfr = new SpaceEvent("push:tim", "" + key);
412        jfr.begin();
413        Object v = value;
414        if (timeout > 0) {
415            v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
416        }
417        synchronized (this) {
418            List l = getList(key);
419            boolean wasEmpty = l.isEmpty();
420            l.add (0, v);
421            if (wasEmpty)
422                this.notifyAll ();
423            if (timeout > 0) {
424                registerExpirable(key, timeout);
425            }
426        }
427        if (sl != null)
428            notifyListeners(key, value);
429        jfr.commit();
430    }
431
432    @Override
433    public void put (K key, V value) {
434        if (key == null || value == null)
435            throw new NullPointerException ("key=" + key + ", value=" + value);
436
437        var jfr = new SpaceEvent("put", "" + key);
438        jfr.begin();
439        synchronized (this) {
440            List l = new LinkedList();
441            l.add (value);
442            entries.put (key, l);
443            this.notifyAll ();
444        }
445        if (sl != null)
446            notifyListeners(key, value);
447        jfr.commit();
448    }
449
450    @Override
451    public void put (K key, V value, long timeout) {
452        if (key == null || value == null)
453            throw new NullPointerException ("key=" + key + ", value=" + value);
454        var jfr = new SpaceEvent("put:tim", "" + key);
455        jfr.begin();
456
457        Object v = value;
458        if (timeout > 0) {
459            v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
460        }
461        synchronized (this) {
462            List l = new LinkedList();
463            l.add (v);
464            entries.put (key, l);
465            this.notifyAll ();
466            if (timeout > 0) {
467                registerExpirable(key, timeout);
468            }
469        }
470        if (sl != null)
471            notifyListeners(key, value);
472        jfr.commit();
473    }
474
475    @Override
476    public boolean existAny (K[] keys) {
477        for (K key : keys) {
478            if (rdp(key) != null)
479                return true;
480        }
481        return false;
482    }
483
484    @Override
485    public boolean existAny (K[] keys, long timeout) {
486        long now = System.nanoTime();
487        long to = now + (timeout * ONE_MILLION);
488        long waitFor;
489        while ((waitFor = (to - System.nanoTime())) >= 0) {
490            if (existAny (keys))
491                return true;
492            synchronized (this) {
493                try {
494                    this.wait(Math.max(waitFor / ONE_MILLION, 1L));
495                } catch (InterruptedException e) { }
496            }
497        }
498        return false;
499    }
500
501    /**
502     * unstandard method (required for space replication) - use with care
503     * @return underlying entry map
504     */
505    public Map getEntries () {
506        return entries;
507    }
508
509    /**
510     * unstandard method (required for space replication) - use with care
511     * @param entries underlying entry map
512     */
513    public void setEntries (Map entries) {
514        this.entries = entries;
515    }
516
517    private List getList (Object key) {
518        List l = (List) entries.get (key);
519        if (l == null) 
520            entries.put (key, l = new LinkedList());
521        return l;
522    }
523
524    private Object getHead (Object key, boolean remove) {
525        Object obj = null;
526        List l = (List) entries.get (key);
527        boolean wasExpirable = false;
528        while (obj == null && l != null && l.size() > 0) {
529            obj = l.get(0);
530            if (obj instanceof Expirable) { 
531                obj = ((Expirable) obj).getValue();
532                wasExpirable = true;
533            }
534            if (obj == null) {
535                l.remove (0);
536                if (l.isEmpty()) {
537                    entries.remove (key);
538                }
539            }
540        }
541        if (l != null) {
542            if (remove && obj != null)
543                l.remove (0);
544            if (l.isEmpty()) {
545                entries.remove (key);
546                if (wasExpirable)
547                    unregisterExpirable(key);
548            }
549        }
550        return obj;
551    }
552
553    private Object getObject (Template tmpl, boolean remove) {
554        Object obj = null;
555        Object key = tmpl.getKey();
556        List l = (List) entries.get (key);
557        if (l != null) {
558            Iterator iter = l.iterator();
559            boolean wasExpirable = false;
560            while (iter.hasNext()) {
561                obj = iter.next();
562                if (obj instanceof Expirable) {
563                    obj = ((Expirable) obj).getValue();
564                    if (obj == null) {
565                        iter.remove();
566                        wasExpirable = true;
567                        continue;
568                    }
569                }
570                if (tmpl.equals (obj)) {
571                    if (remove)
572                        iter.remove();
573                    break;
574                } else
575                    obj = null;
576            }
577            if (l.isEmpty()) {
578                entries.remove (key);
579                if (wasExpirable)
580                    unregisterExpirable(key);
581            }
582        }
583        return obj;
584    }
585
586    private TSpace getSL() {
587        synchronized (this) {
588            if (sl == null)
589                sl = new TSpace();
590        }
591        return sl;
592    }
593
594    private void registerExpirable(K k, long t) {
595        expirables[t > GCLONG ? 1 : 0].add(k);
596    }
597
598    private void unregisterExpirable(Object k) {
599        for (Set<K> s : expirables)
600            s.remove(k);
601    }
602
603    static class Expirable implements Comparable, Serializable {
604
605        private static final long serialVersionUID = 0xA7F22BF5;
606
607        Object value;
608
609        /**
610         *  When to expire, in the future, as given by monotonic System.nanoTime().<br>
611         *  IMPORTANT: always use a nanosec offset from System.nanoTime()!
612         */
613        long expires;
614
615        Expirable (Object value, long expires) {
616            super();
617            this.value = value;
618            this.expires = expires;
619        }
620
621        boolean isExpired () {
622            return (System.nanoTime() - expires) > 0;
623        }
624
625        @Override
626        public String toString() {
627            return getClass().getName() 
628                + "@" + Integer.toHexString(hashCode())
629                + ",value=" + value.toString()
630                + ",expired=" + isExpired ();
631        }
632
633        Object getValue() {
634            return isExpired() ? null : value;
635        }
636
637        @Override
638        public int compareTo (Object other) {
639            long diff = this.expires - ((Expirable)other).expires;
640            return  diff > 0 ?  1 :
641                    diff < 0 ? -1 :
642                    0;
643        }
644    }
645}