001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.space;
020import org.jpos.jfr.SpaceEvent;
021import org.jpos.util.Loggeable;
022import java.io.PrintStream;
023import java.io.Serializable;
024import java.time.Duration;
025import java.time.Instant;
026import java.util.*;
027import java.util.concurrent.TimeUnit;
028
029/**
030 * TSpace implementation
031 * @author Alejandro Revilla
032 * @version $Revision$ $Date$
033 * @since !.4.9
034 */
035
036@SuppressWarnings("unchecked")
037public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
038    protected Map entries;
039    protected TSpace sl;    // space listeners
040    public static final long GCDELAY = 5*1000;
041    private static final long GCLONG = 60_000L;
042    private static final long NRD_RESOLUTION = 500L;
043    private static final int MAX_ENTRIES_IN_DUMP = 1000;
044    private static final long ONE_MILLION = 1_000_000L;         // multiplier millis --> nanos
045    private final Set[] expirables;
046    private long lastLongGC = System.nanoTime();
047
048    public TSpace () {
049        super();
050        entries = new HashMap ();
051        expirables = new Set[] { new HashSet<K>(), new HashSet<K>() };
052        SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS);
053    }
054
055    @Override
056    public void out (K key, V value) {
057        var jfr = new SpaceEvent("out", "" + key);
058        jfr.begin();
059        if (key == null || value == null)
060            throw new NullPointerException ("key=" + key + ", value=" + value);
061        synchronized(this) {
062            List l = getList(key);
063            l.add (value);
064            if (l.size() == 1)
065                this.notifyAll ();
066        }
067        if (sl != null)
068            notifyListeners(key, value);
069        jfr.commit();
070    }
071
072    @Override
073    public void out (K key, V value, long timeout) {
074        var jfr = new SpaceEvent("out:tim", "" + key);
075        jfr.begin();
076
077        if (key == null || value == null) {
078            jfr.commit();
079            throw new NullPointerException("key=" + key + ", value=" + value);
080        }
081        Object v = value;
082        if (timeout > 0) {
083            v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
084        }
085        synchronized (this) {
086            List l = getList(key);
087            l.add(v);
088            if (l.size() == 1)
089                this.notifyAll ();
090            if (timeout > 0) {
091                registerExpirable(key, timeout);
092            }
093        }
094        if (sl != null)
095            notifyListeners(key, value);
096        jfr.commit();
097    }
098
099    @Override
100    public synchronized V rdp (Object key) {
101        var jfr = new SpaceEvent("rdp", "" + key);
102        jfr.begin();
103        try {
104            if (key instanceof Template)
105                return (V) getObject ((Template) key, false);
106            return (V) getHead (key, false);
107        } finally {
108            jfr.commit();
109        }
110    }
111
112    @Override
113    public synchronized V inp (Object key) {
114        var jfr = new SpaceEvent("inp", "" + key);
115        jfr.begin();
116        try {
117            if (key instanceof Template)
118                return (V) getObject ((Template) key, true);
119            return (V) getHead (key, true);
120        } finally {
121            jfr.commit();
122        }
123    }
124
125    @Override
126    public synchronized V in (Object key) {
127        Object obj;
128        while ((obj = inp (key)) == null) {
129            try {
130                this.wait ();
131            } catch (InterruptedException e) { }
132        }
133        return (V) obj;
134    }
135
136    @Override
137    public synchronized V in  (Object key, long timeout) {
138        V obj;
139        long now = System.nanoTime();
140        long to = now + timeout * ONE_MILLION;
141        long waitFor;
142        while ( (obj = inp (key)) == null &&
143                (waitFor = (to - System.nanoTime())) >= 0 )
144        {
145            try {
146                this.wait(Math.max(waitFor / ONE_MILLION, 1L));
147            } catch (InterruptedException e) { }
148        }
149        return obj;
150    }
151
152    @Override
153    public synchronized V rd  (Object key) {
154        Object obj;
155        while ((obj = rdp (key)) == null) {
156            try {
157                this.wait ();
158            } catch (InterruptedException e) { }
159        }
160        return (V) obj;
161    }
162
163    @Override
164    public synchronized V rd  (Object key, long timeout) {
165        V obj;
166        long now = System.nanoTime();
167        long to = now + (timeout * ONE_MILLION);
168        long waitFor;
169        while ( (obj = rdp (key)) == null &&
170                (waitFor = (to - System.nanoTime())) >= 0 )
171        {
172            try {
173                this.wait(Math.max(waitFor / ONE_MILLION, 1L));
174            } catch (InterruptedException e) { }
175        }
176        return obj;
177    }
178
179    @Override
180    public synchronized void nrd  (Object key) {
181        while (rdp (key) != null) {
182            try {
183                this.wait (NRD_RESOLUTION);
184            } catch (InterruptedException ignored) { }
185        }
186    }
187
188    @Override
189    public synchronized V nrd  (Object key, long timeout) {
190        V obj;
191        long now = System.nanoTime();
192        long to = now + (timeout * ONE_MILLION);
193        long waitFor;
194        while ( (obj = rdp (key)) != null &&
195                (waitFor = (to - System.nanoTime())) >= 0 )
196        {
197            try {
198                this.wait(Math.min(NRD_RESOLUTION,
199                                   Math.max(waitFor / ONE_MILLION, 1L)));
200            } catch (InterruptedException ignored) { }
201        }
202        return obj;
203    }
204
205    @Override
206    public void run () {
207        try {
208            gc();
209        } catch (Exception e) {
210            e.printStackTrace(); // this should never happen
211        }
212    }
213
214    public void gc () {
215        gc(0);
216        if (System.nanoTime() - lastLongGC > GCLONG*ONE_MILLION) {
217            gc(1);
218            lastLongGC = System.nanoTime();
219        }
220    }
221
222    private void gc (int generation) {
223        var jfr = new SpaceEvent("gc", Integer.toString(generation));
224        jfr.begin();
225
226        Set<K> exps;
227        synchronized (this) {
228            exps = expirables[generation];
229            expirables[generation] = new HashSet<K>();
230        }
231        for (K k : exps) {
232            if (rdp(k) != null) {
233                synchronized (this) {
234                    expirables[generation].add(k);
235                }
236            }
237            Thread.yield ();
238        }
239        if (sl != null) {
240            synchronized (this) {
241                if (sl != null && sl.isEmpty())
242                    sl = null;
243            }
244        }
245        jfr.commit();
246    }
247
248    @Override
249    public synchronized int size (Object key) {
250        var jfr = new SpaceEvent("size", "" + key);
251        jfr.begin();
252
253        int size = 0;
254        List l = (List) entries.get (key);
255        if (l != null) 
256            size = l.size();
257        jfr.commit();
258        return size;
259    }
260
261    @Override
262    public synchronized void addListener (Object key, SpaceListener listener) {
263        getSL().out (key, listener);
264    }
265
266    @Override
267    public synchronized void addListener 
268        (Object key, SpaceListener listener, long timeout) 
269    {
270        getSL().out (key, listener, timeout);
271    }
272
273    @Override
274    public synchronized void removeListener 
275        (Object key, SpaceListener listener) 
276    {
277        if (sl != null) {
278            sl.inp (new ObjectTemplate (key, listener));
279        }
280    }
281    public boolean isEmpty() {
282        return entries.isEmpty();
283    }
284
285    @Override
286    public synchronized Set<K> getKeySet() {
287        return new HashSet<K>(entries.keySet());
288    }
289
290    public String getKeysAsString () {
291        StringBuilder sb = new StringBuilder();
292        Object[] keys;
293        synchronized (this) {
294            keys = entries.keySet().toArray();
295        }
296        for (int i=0; i<keys.length; i++) {
297            if (i > 0)
298                sb.append (' ');
299            sb.append (keys[i]);
300        }
301        return sb.toString();
302    }
303
304    @Override
305    public void dump(PrintStream p, String indent) {
306        var jfr = new SpaceEvent("dump", "");
307        jfr.begin();
308
309        Object[] keys;
310        int size = entries.size();
311        if (size > MAX_ENTRIES_IN_DUMP * 100) {
312            p.printf ("%sWARNING - space too big, size=%d%n", indent, size);
313            jfr.commit();
314            return;
315        }
316        synchronized (this) {
317            keys = entries.keySet().toArray();
318        }
319        int i=0;
320        for (Object key : keys) {
321            p.printf("%s<key count='%d'>%s</key>%n", indent, size(key), key);
322            if (i++ > MAX_ENTRIES_IN_DUMP) {
323                p.printf ("%s...%n", indent);
324                p.printf ("%s...%n", indent);
325                break;
326            }
327        }
328        p.printf("%s key-count: %d%n", indent, keys.length);
329        int exp0, exp1;
330        synchronized (this) {
331            exp0 = expirables[0].size();
332            exp1 = expirables[1].size();
333        }
334        p.printf("%s    gcinfo: %d,%d%n", indent, exp0, exp1);
335        jfr.commit();
336    }
337
338    public void notifyListeners (Object key, Object value) {
339        var jfr = new SpaceEvent("notify", "" + key);
340        jfr.begin();
341
342        Object[] listeners = null;
343        synchronized (this) {
344            if (sl == null)
345                return;
346            List l = (List) sl.entries.get (key);
347            if (l != null)
348                listeners = l.toArray();
349        }
350        if (listeners != null) {
351            for (Object listener : listeners) {
352                Object o = listener;
353                if (o instanceof Expirable)
354                    o = ((Expirable) o).getValue();
355                if (o instanceof SpaceListener)
356                    ((SpaceListener) o).notify(key, value);
357            }
358        }
359        jfr.commit();
360    }
361
362    @Override
363    public void push (K key, V value) {
364        if (key == null || value == null)
365            throw new NullPointerException ("key=" + key + ", value=" + value);
366        var jfr = new SpaceEvent("push", "" + key);
367        jfr.begin();
368        synchronized(this) {
369            List l = getList(key);
370            boolean wasEmpty = l.isEmpty();
371            l.add (0, value);
372            if (wasEmpty)
373                this.notifyAll ();
374        }
375        if (sl != null)
376            notifyListeners(key, value);
377        jfr.commit();
378    }
379
380    @Override
381    public void push (K key, V value, long timeout) {
382        if (key == null || value == null)
383            throw new NullPointerException ("key=" + key + ", value=" + value);
384        var jfr = new SpaceEvent("push:tim", "" + key);
385        jfr.begin();
386        Object v = value;
387        if (timeout > 0) {
388            v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
389        }
390        synchronized (this) {
391            List l = getList(key);
392            boolean wasEmpty = l.isEmpty();
393            l.add (0, v);
394            if (wasEmpty)
395                this.notifyAll ();
396            if (timeout > 0) {
397                registerExpirable(key, timeout);
398            }
399        }
400        if (sl != null)
401            notifyListeners(key, value);
402        jfr.commit();
403    }
404
405    @Override
406    public void put (K key, V value) {
407        if (key == null || value == null)
408            throw new NullPointerException ("key=" + key + ", value=" + value);
409
410        var jfr = new SpaceEvent("put", "" + key);
411        jfr.begin();
412        synchronized (this) {
413            List l = new LinkedList();
414            l.add (value);
415            entries.put (key, l);
416            this.notifyAll ();
417        }
418        if (sl != null)
419            notifyListeners(key, value);
420        jfr.commit();
421    }
422
423    @Override
424    public void put (K key, V value, long timeout) {
425        if (key == null || value == null)
426            throw new NullPointerException ("key=" + key + ", value=" + value);
427        var jfr = new SpaceEvent("put:tim", "" + key);
428        jfr.begin();
429
430        Object v = value;
431        if (timeout > 0) {
432            v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
433        }
434        synchronized (this) {
435            List l = new LinkedList();
436            l.add (v);
437            entries.put (key, l);
438            this.notifyAll ();
439            if (timeout > 0) {
440                registerExpirable(key, timeout);
441            }
442        }
443        if (sl != null)
444            notifyListeners(key, value);
445        jfr.commit();
446    }
447
448    @Override
449    public boolean existAny (K[] keys) {
450        for (K key : keys) {
451            if (rdp(key) != null)
452                return true;
453        }
454        return false;
455    }
456
457    @Override
458    public boolean existAny (K[] keys, long timeout) {
459        long now = System.nanoTime();
460        long to = now + (timeout * ONE_MILLION);
461        long waitFor;
462        while ((waitFor = (to - System.nanoTime())) >= 0) {
463            if (existAny (keys))
464                return true;
465            synchronized (this) {
466                try {
467                    this.wait(Math.max(waitFor / ONE_MILLION, 1L));
468                } catch (InterruptedException e) { }
469            }
470        }
471        return false;
472    }
473
474    /**
475     * unstandard method (required for space replication) - use with care
476     * @return underlying entry map
477     */
478    public Map getEntries () {
479        return entries;
480    }
481
482    /**
483     * unstandard method (required for space replication) - use with care
484     * @param entries underlying entry map
485     */
486    public void setEntries (Map entries) {
487        this.entries = entries;
488    }
489
490    private List getList (Object key) {
491        List l = (List) entries.get (key);
492        if (l == null) 
493            entries.put (key, l = new LinkedList());
494        return l;
495    }
496
497    private Object getHead (Object key, boolean remove) {
498        Object obj = null;
499        List l = (List) entries.get (key);
500        boolean wasExpirable = false;
501        while (obj == null && l != null && l.size() > 0) {
502            obj = l.get(0);
503            if (obj instanceof Expirable) { 
504                obj = ((Expirable) obj).getValue();
505                wasExpirable = true;
506            }
507            if (obj == null) {
508                l.remove (0);
509                if (l.isEmpty()) {
510                    entries.remove (key);
511                }
512            }
513        }
514        if (l != null) {
515            if (remove && obj != null)
516                l.remove (0);
517            if (l.isEmpty()) {
518                entries.remove (key);
519                if (wasExpirable)
520                    unregisterExpirable(key);
521            }
522        }
523        return obj;
524    }
525
526    private Object getObject (Template tmpl, boolean remove) {
527        Object obj = null;
528        Object key = tmpl.getKey();
529        List l = (List) entries.get (key);
530        if (l != null) {
531            Iterator iter = l.iterator();
532            boolean wasExpirable = false;
533            while (iter.hasNext()) {
534                obj = iter.next();
535                if (obj instanceof Expirable) {
536                    obj = ((Expirable) obj).getValue();
537                    if (obj == null) {
538                        iter.remove();
539                        wasExpirable = true;
540                        continue;
541                    }
542                }
543                if (tmpl.equals (obj)) {
544                    if (remove)
545                        iter.remove();
546                    break;
547                } else
548                    obj = null;
549            }
550            if (l.isEmpty()) {
551                entries.remove (key);
552                if (wasExpirable)
553                    unregisterExpirable(key);
554            }
555        }
556        return obj;
557    }
558
559    private TSpace getSL() {
560        synchronized (this) {
561            if (sl == null)
562                sl = new TSpace();
563        }
564        return sl;
565    }
566
567    private void registerExpirable(K k, long t) {
568        expirables[t > GCLONG ? 1 : 0].add(k);
569    }
570
571    private void unregisterExpirable(Object k) {
572        for (Set<K> s : expirables)
573            s.remove(k);
574    }
575
576    static class Expirable implements Comparable, Serializable {
577
578        private static final long serialVersionUID = 0xA7F22BF5;
579
580        Object value;
581
582        /**
583         *  When to expire, in the future, as given by monotonic System.nanoTime().<br>
584         *  IMPORTANT: always use a nanosec offset from System.nanoTime()!
585         */
586        long expires;
587
588        Expirable (Object value, long expires) {
589            super();
590            this.value = value;
591            this.expires = expires;
592        }
593
594        boolean isExpired () {
595            return (System.nanoTime() - expires) > 0;
596        }
597
598        @Override
599        public String toString() {
600            return getClass().getName() 
601                + "@" + Integer.toHexString(hashCode())
602                + ",value=" + value.toString()
603                + ",expired=" + isExpired ();
604        }
605
606        Object getValue() {
607            return isExpired() ? null : value;
608        }
609
610        @Override
611        public int compareTo (Object other) {
612            long diff = this.expires - ((Expirable)other).expires;
613            return  diff > 0 ?  1 :
614                    diff < 0 ? -1 :
615                    0;
616        }
617    }
618}