blob: d7c8cc490118bf3df7a3ecc0c03746cfa3db3a6d [file] [log] [blame]
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
* This implementation of {@link SubscriptionRecoveryPolicy} will only keep the last message.
*
* @org.apache.xbean.XBean
*
* @version $Revision$
*/
public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy{
volatile private MessageReference messages[];
private int maximumSize=100;
private int tail=0;
synchronized public boolean add(ConnectionContext context,MessageReference node) throws Exception{
messages[tail++]=node;
if(tail>=messages.length)
tail=0;
return true;
}
synchronized public void recover(ConnectionContext context,Topic topic,Subscription sub) throws Exception{
// Re-dispatch the last message seen.
int t=tail;
// The buffer may not have rolled over yet..., start from the front
if(messages[t]==null)
t=0;
// Well the buffer is really empty then.
if(messages[t]==null)
return;
// Keep dispatching until t hit's tail again.
MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
do{
MessageReference node=messages[t];
try{
msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
msgContext.setMessageReference(node);
if(sub.matches(node,msgContext)){
sub.add(node);
}
}finally{
msgContext.clear();
}
t++;
if(t>=messages.length)
t=0;
}while(t!=tail);
}
public void start() throws Exception{
messages=new MessageReference[maximumSize];
}
public void stop() throws Exception{
messages=null;
}
public int getMaximumSize(){
return maximumSize;
}
/**
* Sets the maximum number of messages that this destination will hold around in RAM
*/
public void setMaximumSize(int maximumSize){
this.maximumSize=maximumSize;
}
public Message[] browse(ActiveMQDestination destination) throws Exception{
List result=new ArrayList();
DestinationFilter filter=DestinationFilter.parseFilter(destination);
int t=tail;
if(messages[t]==null)
t=0;
if(messages[t]!=null){
do{
MessageReference ref=messages[t];
Message message=ref.getMessage();
if(filter.matches(message.getDestination())){
result.add(message);
}
t++;
if(t>=messages.length)
t=0;
}while(t!=tail);
}
return (Message[]) result.toArray(new Message[result.size()]);
}
}