blob: c8d12142e60dfdfddce6ca12bf78403208998c79 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
* caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
* thread adding items and a single (different) thread removing items.
*
* @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
*/
public class FlowControllingBlockingQueue
{
private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
private final Queue _queue = new ConcurrentLinkedQueue();
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
private final ThresholdListener _listener;
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
private boolean disableFlowControl;
public boolean isEmpty()
{
return _queue.isEmpty();
}
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
void underThreshold(int currentValue);
}
public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
{
this(threshold, threshold, listener);
}
public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
{
_flowControlHighThreshold = highThreshold;
_flowControlLowThreshold = lowThreshold;
_listener = listener;
if (highThreshold == 0)
{
disableFlowControl = true;
}
}
public Object take() throws InterruptedException
{
Object o = _queue.poll();
if(o == null)
{
synchronized(this)
{
while((o = _queue.poll())==null)
{
wait();
}
}
}
if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
if (_count-- == _flowControlLowThreshold)
{
_listener.underThreshold(_count);
}
}
}
return o;
}
public void add(Object o)
{
synchronized(this)
{
_queue.add(o);
notifyAll();
}
if (!disableFlowControl && _listener != null)
{
synchronized (_listener)
{
if (++_count == _flowControlHighThreshold)
{
_listener.aboveThreshold(_count);
}
}
}
}
public Iterator iterator()
{
return _queue.iterator();
}
}