blob: 51cb0b3dafd4ef0b79de8f3fc20bbe463a517752 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.memory.buffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* A {@link MessageBuffer} which evicts from the largest buffers first.
*
*
*/
public class SizeBasedMessageBuffer implements MessageBuffer {
private int limit = 100 * 64 * 1024;
private List<MessageQueue> bubbleList = new ArrayList<MessageQueue>();
private int size;
private Object lock = new Object();
public SizeBasedMessageBuffer() {
}
public SizeBasedMessageBuffer(int limit) {
this.limit = limit;
}
public int getSize() {
synchronized (lock) {
return size;
}
}
/**
* Creates a new message queue instance
*/
public MessageQueue createMessageQueue() {
MessageQueue queue = new MessageQueue(this);
synchronized (lock) {
queue.setPosition(bubbleList.size());
bubbleList.add(queue);
}
return queue;
}
/**
* After a message queue has changed we may need to perform some evictions
*
* @param delta
* @param queueSize
*/
public void onSizeChanged(MessageQueue queue, int delta, int queueSize) {
synchronized (lock) {
bubbleUp(queue, queueSize);
size += delta;
while (size > limit) {
MessageQueue biggest = bubbleList.get(0);
size -= biggest.evictMessage();
bubbleDown(biggest, 0);
}
}
}
public void clear() {
synchronized (lock) {
for (Iterator<MessageQueue> iter = bubbleList.iterator(); iter.hasNext();) {
MessageQueue queue = iter.next();
queue.clear();
}
size = 0;
}
}
protected void bubbleUp(MessageQueue queue, int queueSize) {
// lets bubble up to head of queueif we need to
int position = queue.getPosition();
while (--position >= 0) {
MessageQueue pivot = bubbleList.get(position);
if (pivot.getSize() < queueSize) {
swap(position, pivot, position + 1, queue);
} else {
break;
}
}
}
protected void bubbleDown(MessageQueue biggest, int position) {
int queueSize = biggest.getSize();
int end = bubbleList.size();
for (int second = position + 1; second < end; second++) {
MessageQueue pivot = bubbleList.get(second);
if (pivot.getSize() > queueSize) {
swap(position, biggest, second, pivot);
} else {
break;
}
position = second;
}
}
protected void swap(int firstPosition, MessageQueue first, int secondPosition, MessageQueue second) {
bubbleList.set(firstPosition, second);
bubbleList.set(secondPosition, first);
first.setPosition(secondPosition);
second.setPosition(firstPosition);
}
}