001/* 002 * jPOS Project [http://jpos.org] 003 * Copyright (C) 2000-2026 jPOS Software SRL 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jpos.util; 020 021import java.util.LinkedList; 022 023/** 024 * implements a blocking queue 025 * @see ThreadPool 026 * @since 1.1 027 */ 028@SuppressWarnings("unchecked") 029public class BlockingQueue { 030 private LinkedList queue = new LinkedList(); 031 private boolean closed = false; 032 private int consumers = 0; 033 034 public static class Closed extends RuntimeException { 035 036 private static final long serialVersionUID = 3404885702116373450L; 037 038 public Closed() { 039 super ("queue-closed"); 040 } 041 } 042 043 public synchronized void enqueue (Object o) throws Closed { 044 if (closed) 045 throw new Closed(); 046 queue.addLast (o); 047 notify(); 048 } 049 public synchronized void requeue (Object o) throws Closed { 050 if (closed) 051 throw new Closed(); 052 queue.addFirst (o); 053 notify(); 054 } 055 056 public synchronized Object dequeue() 057 throws InterruptedException, Closed 058 { 059 consumers++; 060 try { 061 while (queue.size() == 0) { 062 wait(); 063 if (closed) 064 throw new Closed(); 065 } 066 } finally { 067 consumers--; 068 } 069 return queue.removeFirst(); 070 } 071 072 public synchronized Object dequeue (long timeout) 073 throws InterruptedException, Closed 074 { 075 if (timeout == 0) 076 return dequeue (); 077 078 consumers++; 079 long maxTime = System.currentTimeMillis() + timeout; 080 try { 081 while (queue.size() == 0 && System.currentTimeMillis() < maxTime) { 082 if (timeout > 0L) 083 wait (timeout); 084 if (closed) 085 throw new Closed(); 086 } 087 } finally { 088 consumers--; 089 } 090 return queue.size() > 0 ? queue.removeFirst() : null; 091 } 092 public synchronized void close() { 093 closed = true; 094 notifyAll(); 095 } 096 public synchronized int consumerCount() { 097 return consumers; 098 } 099 100 public synchronized int consumerDeficit() { 101 return queue.size() - consumers; 102 } 103 104 public synchronized boolean ready() { 105 return !closed; 106 } 107 public synchronized int pending() { 108 return queue.size(); 109 } 110 public LinkedList getQueue () { 111 return queue; 112 } 113 public void setQueue (LinkedList queue) { 114 this.queue = queue; 115 } 116} 117