001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import java.util.LinkedList;
022
023/**
024 * implements a blocking queue 
025 * @see ThreadPool
026 * @since 1.1
027 */
028@SuppressWarnings("unchecked")
029public class BlockingQueue {
030    /** Creates an empty, open queue. */
031    public BlockingQueue() {}
032    private LinkedList queue = new LinkedList();
033    private boolean closed = false;
034    private int consumers = 0;
035
036    /** Thrown by queue operations after the queue has been closed. */
037    public static class Closed extends RuntimeException {
038
039        private static final long serialVersionUID = 3404885702116373450L;
040
041        /** Constructs a new {@code Closed} exception with a default message. */
042        public Closed() {
043            super ("queue-closed");
044        }
045    }
046
047    /**
048     * Appends an object to the tail of the queue and wakes one waiter.
049     *
050     * @param o the object to enqueue
051     * @throws Closed if the queue has been closed
052     */
053    public synchronized void enqueue (Object o) throws Closed {
054        if (closed)
055            throw new Closed();
056        queue.addLast (o);
057        notify();
058    }
059    /**
060     * Inserts an object at the head of the queue and wakes one waiter.
061     *
062     * @param o the object to requeue
063     * @throws Closed if the queue has been closed
064     */
065    public synchronized void requeue (Object o) throws Closed {
066        if (closed)
067            throw new Closed();
068        queue.addFirst (o);
069        notify();
070    }
071
072    /**
073     * Removes and returns the head of the queue, blocking until an object is available.
074     *
075     * @return the object at the head of the queue
076     * @throws InterruptedException if the thread is interrupted while waiting
077     * @throws Closed if the queue is closed while waiting
078     */
079    public synchronized Object dequeue()
080        throws InterruptedException, Closed
081    {
082        consumers++;
083        try {
084            while (queue.size() == 0) {
085                wait();
086                if (closed)
087                    throw new Closed();
088            }
089        } finally {
090            consumers--;
091        }
092        return queue.removeFirst();
093    }
094
095    /**
096     * Removes and returns the head of the queue, blocking up to {@code timeout} ms.
097     *
098     * @param timeout maximum wait in milliseconds; {@code 0} blocks indefinitely
099     * @return the object at the head of the queue, or {@code null} if the wait elapsed
100     * @throws InterruptedException if the thread is interrupted while waiting
101     * @throws Closed if the queue is closed while waiting
102     */
103    public synchronized Object dequeue (long timeout)
104        throws InterruptedException, Closed
105    {
106        if (timeout == 0)
107            return dequeue ();
108
109        consumers++;
110        long maxTime = System.currentTimeMillis() + timeout;
111        try {
112            while (queue.size() == 0 && System.currentTimeMillis() < maxTime) {
113                if (timeout > 0L)
114                    wait (timeout);
115                if (closed)
116                    throw new Closed();
117            }
118        } finally {
119            consumers--;
120        }
121        return queue.size() > 0 ? queue.removeFirst() : null;
122    }
123    /**
124     * Closes the queue and wakes all waiting consumers, who will then receive {@link Closed}.
125     */
126    public synchronized void close() {
127        closed = true;
128        notifyAll();
129    }
130    /**
131     * Returns the number of consumers currently waiting on this queue.
132     *
133     * @return the live consumer count
134     */
135    public synchronized int consumerCount() {
136        return consumers;
137    }
138
139    /**
140     * Returns the difference between queued items and waiting consumers.
141     *
142     * @return queue size minus consumer count (negative when consumers exceed items)
143     */
144    public synchronized int consumerDeficit() {
145        return queue.size() - consumers;
146    }
147
148    /**
149     * Indicates whether the queue is open for new operations.
150     *
151     * @return {@code true} if the queue has not been closed
152     */
153    public synchronized boolean ready() {
154        return !closed;
155    }
156    /**
157     * Returns the number of items currently in the queue.
158     *
159     * @return the queue size
160     */
161    public synchronized int pending() {
162        return queue.size();
163    }
164    /**
165     * Returns the underlying list backing this queue.
166     *
167     * @return the internal list (live reference, not a copy)
168     */
169    public LinkedList getQueue () {
170        return queue;
171    }
172    /**
173     * Replaces the underlying list backing this queue.
174     *
175     * @param queue the list to use as the new backing store
176     */
177    public void setQueue (LinkedList queue) {
178        this.queue = queue;
179    }
180}
181