001/*
002 * jPOS Project [http://jpos.org]
003 * Copyright (C) 2000-2026 jPOS Software SRL
004 *
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jpos.util;
020
021import java.util.LinkedList;
022
023/**
024 * implements a blocking queue 
025 * @see ThreadPool
026 * @since 1.1
027 */
028@SuppressWarnings("unchecked")
029public class BlockingQueue {
030    private LinkedList queue = new LinkedList();
031    private boolean closed = false;
032    private int consumers = 0;
033
034    public static class Closed extends RuntimeException {
035
036        private static final long serialVersionUID = 3404885702116373450L;
037
038        public Closed() {
039            super ("queue-closed");
040        }
041    }
042
043    public synchronized void enqueue (Object o) throws Closed {
044        if (closed)
045            throw new Closed();
046        queue.addLast (o);
047        notify();
048    }
049    public synchronized void requeue (Object o) throws Closed {
050        if (closed)
051            throw new Closed();
052        queue.addFirst (o);
053        notify();
054    }
055
056    public synchronized Object dequeue()
057        throws InterruptedException, Closed
058    {
059        consumers++;
060        try {
061            while (queue.size() == 0) {
062                wait();
063                if (closed)
064                    throw new Closed();
065            }
066        } finally {
067            consumers--;
068        }
069        return queue.removeFirst();
070    }
071
072    public synchronized Object dequeue (long timeout)
073        throws InterruptedException, Closed
074    {
075        if (timeout == 0)
076            return dequeue ();
077
078        consumers++;
079        long maxTime = System.currentTimeMillis() + timeout;
080        try {
081            while (queue.size() == 0 && System.currentTimeMillis() < maxTime) {
082                if (timeout > 0L)
083                    wait (timeout);
084                if (closed)
085                    throw new Closed();
086            }
087        } finally {
088            consumers--;
089        }
090        return queue.size() > 0 ? queue.removeFirst() : null;
091    }
092    public synchronized void close() {
093        closed = true;
094        notifyAll();
095    }
096    public synchronized int consumerCount() {
097        return consumers;
098    }
099
100    public synchronized int consumerDeficit() {
101        return queue.size() - consumers;
102    }
103    
104    public synchronized boolean ready() {
105        return !closed;
106    }
107    public synchronized int pending() {
108        return queue.size();
109    }
110    public LinkedList getQueue () {
111        return queue;
112    }
113    public void setQueue (LinkedList queue) {
114        this.queue = queue;
115    }
116}
117