001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import java.util.LinkedList; 022 023/** 024 * implements a blocking queue 025 * @see ThreadPool 026 * @since 1.1 027 */ 028@SuppressWarnings("unchecked") 029public class BlockingQueue { 030 /** Creates an empty, open queue. */ 031 public BlockingQueue() {} 032 private LinkedList queue = new LinkedList(); 033 private boolean closed = false; 034 private int consumers = 0; 035 036 /** Thrown by queue operations after the queue has been closed. */ 037 public static class Closed extends RuntimeException { 038 039 private static final long serialVersionUID = 3404885702116373450L; 040 041 /** Constructs a new {@code Closed} exception with a default message. */ 042 public Closed() { 043 super ("queue-closed"); 044 } 045 } 046 047 /** 048 * Appends an object to the tail of the queue and wakes one waiter. 049 * 050 * @param o the object to enqueue 051 * @throws Closed if the queue has been closed 052 */ 053 public synchronized void enqueue (Object o) throws Closed { 054 if (closed) 055 throw new Closed(); 056 queue.addLast (o); 057 notify(); 058 } 059 /** 060 * Inserts an object at the head of the queue and wakes one waiter. 061 * 062 * @param o the object to requeue 063 * @throws Closed if the queue has been closed 064 */ 065 public synchronized void requeue (Object o) throws Closed { 066 if (closed) 067 throw new Closed(); 068 queue.addFirst (o); 069 notify(); 070 } 071 072 /** 073 * Removes and returns the head of the queue, blocking until an object is available. 074 * 075 * @return the object at the head of the queue 076 * @throws InterruptedException if the thread is interrupted while waiting 077 * @throws Closed if the queue is closed while waiting 078 */ 079 public synchronized Object dequeue() 080 throws InterruptedException, Closed 081 { 082 consumers++; 083 try { 084 while (queue.size() == 0) { 085 wait(); 086 if (closed) 087 throw new Closed(); 088 } 089 } finally { 090 consumers--; 091 } 092 return queue.removeFirst(); 093 } 094 095 /** 096 * Removes and returns the head of the queue, blocking up to {@code timeout} ms. 097 * 098 * @param timeout maximum wait in milliseconds; {@code 0} blocks indefinitely 099 * @return the object at the head of the queue, or {@code null} if the wait elapsed 100 * @throws InterruptedException if the thread is interrupted while waiting 101 * @throws Closed if the queue is closed while waiting 102 */ 103 public synchronized Object dequeue (long timeout) 104 throws InterruptedException, Closed 105 { 106 if (timeout == 0) 107 return dequeue (); 108 109 consumers++; 110 long maxTime = System.currentTimeMillis() + timeout; 111 try { 112 while (queue.size() == 0 && System.currentTimeMillis() < maxTime) { 113 if (timeout > 0L) 114 wait (timeout); 115 if (closed) 116 throw new Closed(); 117 } 118 } finally { 119 consumers--; 120 } 121 return queue.size() > 0 ? queue.removeFirst() : null; 122 } 123 /** 124 * Closes the queue and wakes all waiting consumers, who will then receive {@link Closed}. 125 */ 126 public synchronized void close() { 127 closed = true; 128 notifyAll(); 129 } 130 /** 131 * Returns the number of consumers currently waiting on this queue. 132 * 133 * @return the live consumer count 134 */ 135 public synchronized int consumerCount() { 136 return consumers; 137 } 138 139 /** 140 * Returns the difference between queued items and waiting consumers. 141 * 142 * @return queue size minus consumer count (negative when consumers exceed items) 143 */ 144 public synchronized int consumerDeficit() { 145 return queue.size() - consumers; 146 } 147 148 /** 149 * Indicates whether the queue is open for new operations. 150 * 151 * @return {@code true} if the queue has not been closed 152 */ 153 public synchronized boolean ready() { 154 return !closed; 155 } 156 /** 157 * Returns the number of items currently in the queue. 158 * 159 * @return the queue size 160 */ 161 public synchronized int pending() { 162 return queue.size(); 163 } 164 /** 165 * Returns the underlying list backing this queue. 166 * 167 * @return the internal list (live reference, not a copy) 168 */ 169 public LinkedList getQueue () { 170 return queue; 171 } 172 /** 173 * Replaces the underlying list backing this queue. 174 * 175 * @param queue the list to use as the new backing store 176 */ 177 public void setQueue (LinkedList queue) { 178 this.queue = queue; 179 } 180} 181