blob: bfadaba278ee0b7d6b12c0af5f322d61d9bd7c4b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.util;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
* caller is not obliged to react to the events.
* <p>
* This implementation is <b>only</b> safe where we have a single
* thread adding items and a single (different) thread removing items.
* <p>
* TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
*/
public class FlowControllingBlockingQueue<T>
{
private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final Queue<T> _queue = new ConcurrentLinkedQueue<T>();
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
private final ThresholdListener _listener;
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
private boolean disableFlowControl;
private volatile boolean _closed;
public boolean isEmpty()
{
return _queue.isEmpty();
}
public void close()
{
synchronized (this)
{
_closed = true;
notifyAll();
}
}
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
void underThreshold(int currentValue);
}
public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
{
this(threshold, threshold, listener);
}
public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
{
_flowControlHighThreshold = highThreshold;
_flowControlLowThreshold = lowThreshold;
_listener = listener;
if (highThreshold <= 0)
{
disableFlowControl = true;
}
else if (lowThreshold > highThreshold)
{
throw new IllegalArgumentException(String.format(
"Invalid low threshold %d : it should be less or equal high threshold %d",
lowThreshold,
highThreshold));
}
else if (lowThreshold < 1)
{
throw new IllegalArgumentException(String.format("Invalid low threshold %d: it should be greater than 0",
lowThreshold));
}
}
public T blockingPeek() throws InterruptedException
{
T o = _queue.peek();
if (o == null)
{
synchronized (this)
{
while (!_closed && (o = _queue.peek()) == null)
{
wait();
}
}
}
return o;
}
public T nonBlockingTake() throws InterruptedException
{
T o = _queue.poll();
if (o != null && !disableFlowControl && _listener != null)
{
reportBelowIfNecessary();
}
return o;
}
public T take() throws InterruptedException
{
T o = _queue.poll();
if(o == null)
{
synchronized(this)
{
while(!_closed && (o = _queue.poll())==null)
{
wait();
}
}
}
if (!_closed && !disableFlowControl && _listener != null)
{
reportBelowIfNecessary();
}
return o;
}
public void add(T o)
{
synchronized(this)
{
_queue.add(o);
notifyAll();
}
if (!disableFlowControl && _listener != null)
{
reportAboveIfNecessary();
}
}
public boolean remove(final T o)
{
final boolean removed = _queue.remove(o);
if (removed && !disableFlowControl && _listener != null)
{
reportBelowIfNecessary();
}
return removed;
}
public Iterator<T> iterator()
{
return _queue.iterator();
}
public void clear()
{
_queue.clear();
if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
int count = _count;
_count = 0;
if (count >= _flowControlLowThreshold)
{
_listener.underThreshold(0);
}
}
}
}
private void reportAboveIfNecessary()
{
synchronized (_listener)
{
if (++_count == _flowControlHighThreshold)
{
_listener.aboveThreshold(_count);
}
}
}
private void reportBelowIfNecessary()
{
synchronized (_listener)
{
if (_count-- == _flowControlLowThreshold)
{
_listener.underThreshold(_count);
}
}
}
}