blob: bf86051b69b10d049b1164ab24f227b413241528 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.consumer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
public class TestConsumerTarget implements ConsumerTarget<TestConsumerTarget>
{
private boolean _closed = false;
private String tag = "mocktag";
private Queue<?> queue = null;
private State _state = State.OPEN;
private ArrayList<MessageInstance> _messages = new ArrayList<>();
private boolean _isActive = true;
private MessageInstanceConsumer _consumer;
private AMQPSession _sessionModel = mock(AMQPSession.class);
private boolean _notifyDesired;
public TestConsumerTarget()
{
when(_sessionModel.getChannelId()).thenReturn(0);
when(_sessionModel.getAMQPConnection()).thenReturn(mock(AMQPConnection.class));
}
public boolean close()
{
_closed = true;
_state = State.CLOSED;
updateNotifyWorkDesired();
return true;
}
public String getName()
{
return tag;
}
public long getUnacknowledgedBytes()
{
return 0;
}
public long getUnacknowledgedMessages()
{
return 0;
}
public Queue<?> getQueue()
{
return queue;
}
public AMQPSession getSession()
{
return _sessionModel;
}
public boolean isActive()
{
return _isActive ;
}
public boolean isClosed()
{
return _closed;
}
public boolean isSuspended()
{
return false;
}
public void restoreCredit(ServerMessage message)
{
}
public void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
if (_messages.contains(entry))
{
entry.setRedelivered();
}
_messages.add(entry);
}
@Override
public boolean sendNextMessage()
{
return false;
}
public void flushBatched()
{
}
@Override
public void acquisitionRemoved(final MessageInstance node)
{
}
public State getState()
{
return _state;
}
@Override
public String getTargetAddress()
{
return getName();
}
@Override
public void consumerAdded(final MessageInstanceConsumer sub)
{
_consumer = sub;
}
@Override
public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer sub)
{
close();
return Futures.immediateFuture(null);
}
public void setState(State state)
{
_state = state;
updateNotifyWorkDesired();
}
@Override
public boolean processPending()
{
MessageContainer messageContainer = _consumer.pullMessage();
if (messageContainer == null)
{
return false;
}
send(_consumer, messageContainer.getMessageInstance(), false);
return true;
}
public ArrayList<MessageInstance> getMessages()
{
return _messages;
}
public void queueEmpty()
{
}
@Override
public boolean allocateCredit(final ServerMessage msg)
{
return true;
}
public void setActive(final boolean isActive)
{
_isActive = isActive;
}
@Override
public boolean isMultiQueue()
{
return false;
}
@Override
public void notifyWork()
{
}
@Override
public boolean isNotifyWorkDesired()
{
return _state == State.OPEN;
}
@Override
public void updateNotifyWorkDesired()
{
if (isNotifyWorkDesired() != _notifyDesired && _consumer != null)
{
_consumer.setNotifyWorkDesired(isNotifyWorkDesired());
_notifyDesired = isNotifyWorkDesired();
}
}
}