blob: 8afa6aac073981e5251a993195a8a981afba464e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.NotAssociatedDtxException;
import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlow;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageSetFlowMode;
import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.network.Ticker;
public class ServerSession extends Session
implements AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
Deletable<ServerSession>
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final UUID _id = UUID.randomUUID();
private final Subject _subject = new Subject();
private final AccessControlContext _accessControllerContext;
private final SecurityToken _token;
private long _createTime = System.currentTimeMillis();
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private org.apache.qpid.server.model.Session<?> _modelObject;
private long _blockTime;
private long _blockingTimeout;
private boolean _wireBlockingState;
private final List<ConsumerTarget> _consumersWithPendingWork = new ArrayList<>();
private final PublishAuthorisationCache _publishAuthCahe;
public static interface MessageDispositionChangeListener
{
public void onAccept();
public void onRelease(boolean setRedelivered);
public void onReject();
public boolean acquire();
}
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
private ServerTransaction _transaction;
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
private volatile long _uncommittedMessageSize;
private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
private long _maxUncommittedInMemorySize;
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
super(connection, delegate, name, expiry);
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
ServerConnection serverConnection = (ServerConnection) connection;
AMQPConnection_0_10 amqpConnection = serverConnection.getAmqpConnection();
_subject.getPrincipals().addAll(serverConnection.getAuthorizedSubject().getPrincipals());
_subject.getPrincipals().add(new SessionPrincipal(this));
_accessControllerContext = amqpConnection.getAccessControlContextFromSubject(_subject);
final NamedAddressSpace addressSpace = serverConnection.getAddressSpace();
if(addressSpace instanceof ConfiguredObject)
{
_token = ((ConfiguredObject)addressSpace).newToken(_subject);
}
else
{
_token = amqpConnection.getBroker().newToken(_subject);
}
_blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
_maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_publishAuthCahe = new PublishAuthorisationCache(_token,
amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));
}
public AccessControlContext getAccessControllerContext()
{
return _accessControllerContext;
}
protected void setState(final State state)
{
if(runningAsSubject())
{
super.setState(state);
if (state == State.OPEN)
{
getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE());
}
}
else
{
runAsSubject(new PrivilegedAction<Void>() {
@Override
public Void run()
{
setState(state);
return null;
}
});
}
}
private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
{
return AccessController.doPrivileged(privilegedAction, getAccessControllerContext());
}
private boolean runningAsSubject()
{
return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
}
private void invokeBlock()
{
invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
invoke(new MessageStop(""));
}
private void invokeUnblock()
{
MessageFlow mf = new MessageFlow();
mf.setUnit(MessageCreditUnit.MESSAGE);
mf.setDestination("");
_outstandingCredit.set(Integer.MAX_VALUE);
mf.setValue(Integer.MAX_VALUE);
invoke(mf);
}
void authorisePublish(final MessageDestination destination,
final String routingKey,
final boolean immediate,
final long currentTime)
{
_publishAuthCahe.authorisePublish(destination, routingKey, immediate, currentTime);
}
@Override
protected boolean isFull(int id)
{
return isCommandsFull(id);
}
public int enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
{
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
int enqueues = exchange.send(message,
message.getInitialRoutingAddress(),
instanceProperties, _transaction, _checkCapacityAction
);
getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
}
private void resetUncommittedMessages()
{
_uncommittedMessageSize = 0l;
_uncommittedMessages.clear();
}
private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle)
{
if (isTransactional() && !(_transaction instanceof DistributedTransaction))
{
_uncommittedMessageSize += handle.getMetaData().getContentSize();
if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
{
handle.flowToDisk();
if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
{
getConnection().getAmqpConnection().getEventLogger()
.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
}
if(!_uncommittedMessages.isEmpty())
{
for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages)
{
uncommittedHandle.flowToDisk();
}
_uncommittedMessages.clear();
}
}
else
{
_uncommittedMessages.add(handle);
}
}
}
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
invoke(xfr, postIdSettingAction);
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
{
_messageDispositionListenerMap.put(xfr.getId(), acceptListener);
}
private static interface MessageDispositionAction
{
void performAction(MessageDispositionChangeListener listener);
}
public void accept(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
listener.onAccept();
}
});
}
public void release(RangeSet ranges, final boolean setRedelivered)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
listener.onRelease(setRedelivered);
}
});
}
public void reject(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
listener.onReject();
}
});
}
public RangeSet acquire(RangeSet transfers)
{
RangeSet acquired = RangeSetFactory.createRangeSet();
if(!_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = transfers.iterator();
if(rangeIter.hasNext())
{
Range range = rangeIter.next();
while(range != null && unacceptedMessages.hasNext())
{
int next = unacceptedMessages.next();
while(gt(next, range.getUpper()))
{
if(rangeIter.hasNext())
{
range = rangeIter.next();
}
else
{
range = null;
break;
}
}
if(range != null && range.includes(next))
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
if(changeListener != null && changeListener.acquire())
{
acquired.add(next);
}
}
}
}
}
return acquired;
}
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
if(ranges != null)
{
if(ranges.size() == 1)
{
Range r = ranges.getFirst();
for(int i = r.getLower(); i <= r.getUpper(); i++)
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
if(changeListener != null)
{
action.performAction(changeListener);
}
}
}
else if(!_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = ranges.iterator();
if(rangeIter.hasNext())
{
Range range = rangeIter.next();
while(range != null && unacceptedMessages.hasNext())
{
int next = unacceptedMessages.next();
while(gt(next, range.getUpper()))
{
if(rangeIter.hasNext())
{
range = rangeIter.next();
}
else
{
range = null;
break;
}
}
if(range != null && range.includes(next))
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
action.performAction(changeListener);
}
}
}
}
}
}
public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
public void onClose()
{
if(_transaction instanceof LocalTransaction)
{
_transaction.rollback();
}
else if(_transaction instanceof DistributedTransaction)
{
getAddressSpace().getDtxRegistry().endAssociations(this);
}
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease(true);
}
_messageDispositionListenerMap.clear();
for (Action<? super ServerSession> task : _taskList)
{
task.performAction(this);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
if (operationalLoggingMessage == null)
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
}
@Override
protected void awaitClose()
{
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
public void acknowledge(final ConsumerImpl consumer,
final ConsumerTarget_0_10 target,
final MessageInstance entry)
{
if (entry.makeAcquisitionUnstealable(consumer))
{
_transaction.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
public void postCommit()
{
entry.delete();
}
public void onRollback()
{
// The client has acknowledge the message and therefore have seen it.
// In the event of rollback, the message must be marked as redelivered.
entry.setRedelivered();
entry.release(consumer);
}
});
}
}
Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
public void register(final ConsumerImpl consumerImpl)
{
if(consumerImpl instanceof Consumer<?>)
{
final Consumer<?> consumer = (Consumer<?>) consumerImpl;
_consumers.add(consumer);
consumer.addChangeListener(_consumerClosedListener);
consumerAdded(consumer);
}
}
public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
sub.close();
}
public boolean isTransactional()
{
return _transaction.isTransactional();
}
public void selectTx()
{
_transaction = new LocalTransaction(this.getMessageStore());
_txnStarts.incrementAndGet();
}
public void selectDtx()
{
_transaction = new DistributedTransaction(this, getAddressSpace().getDtxRegistry());
}
public void startDtx(Xid xid, boolean join, boolean resume)
throws JoinAndResumeDtxException,
UnknownDtxBranchException,
AlreadyKnownDtxException,
DtxNotSelectedException
{
DistributedTransaction distributedTransaction = assertDtxTransaction();
distributedTransaction.start(xid, join, resume);
}
public void endDtx(Xid xid, boolean fail, boolean suspend)
throws NotAssociatedDtxException,
UnknownDtxBranchException,
DtxNotSelectedException,
SuspendAndFailDtxException, TimeoutDtxException
{
DistributedTransaction distributedTransaction = assertDtxTransaction();
distributedTransaction.end(xid, fail, suspend);
}
public long getTimeoutDtx(Xid xid)
throws UnknownDtxBranchException
{
return getAddressSpace().getDtxRegistry().getTimeout(xid);
}
public void setTimeoutDtx(Xid xid, long timeout)
throws UnknownDtxBranchException
{
getAddressSpace().getDtxRegistry().setTimeout(xid, timeout);
}
public void prepareDtx(Xid xid)
throws UnknownDtxBranchException,
IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException
{
getAddressSpace().getDtxRegistry().prepare(xid);
}
public void commitDtx(Xid xid, boolean onePhase)
throws UnknownDtxBranchException,
IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException
{
getAddressSpace().getDtxRegistry().commit(xid, onePhase);
}
public void rollbackDtx(Xid xid)
throws UnknownDtxBranchException,
IncorrectDtxStateException, StoreException, TimeoutDtxException
{
getAddressSpace().getDtxRegistry().rollback(xid);
}
public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
{
getAddressSpace().getDtxRegistry().forget(xid);
}
public List<Xid> recoverDtx()
{
return getAddressSpace().getDtxRegistry().recover();
}
private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
{
if(_transaction instanceof DistributedTransaction)
{
return (DistributedTransaction) _transaction;
}
else
{
throw new DtxNotSelectedException();
}
}
public void commit()
{
_transaction.commit();
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
resetUncommittedMessages();
}
public void rollback()
{
_transaction.rollback();
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
resetUncommittedMessages();
}
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
{
//There can currently only be at most one outstanding transaction
//due to only having LocalTransaction support. Set value to 1 if 0.
_txnCount.compareAndSet(0,1);
}
}
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
{
//There can currently only be at most one outstanding transaction
//due to only having LocalTransaction support. Set value to 0 if 1.
_txnCount.compareAndSet(1,0);
}
}
public Long getTxnCommits()
{
return _txnCommits.get();
}
public Long getTxnRejects()
{
return _txnRejects.get();
}
public int getChannelId()
{
return getChannel();
}
public Long getTxnCount()
{
return _txnCount.get();
}
public Long getTxnStart()
{
return _txnStarts.get();
}
public Principal getAuthorizedPrincipal()
{
return getConnection().getAuthorizedPrincipal();
}
public Subject getAuthorizedSubject()
{
return _subject;
}
public void addDeleteTask(Action<? super ServerSession> task)
{
_taskList.add(task);
}
public void removeDeleteTask(Action<? super ServerSession> task)
{
_taskList.remove(task);
}
public Object getReference()
{
return getConnection().getReference();
}
public MessageStore getMessageStore()
{
return getAddressSpace().getMessageStore();
}
public NamedAddressSpace getAddressSpace()
{
return getConnection().getAddressSpace();
}
public boolean isDurable()
{
return false;
}
public long getCreateTime()
{
return _createTime;
}
@Override
public UUID getId()
{
return _id;
}
@Override
public AMQPConnection_0_10 getAMQPConnection()
{
return getConnection().getAmqpConnection();
}
@Override
public ServerConnection getConnection()
{
return (ServerConnection) super.getConnection();
}
public LogSubject getLogSubject()
{
return this;
}
public void block(Queue<?> queue)
{
block(queue, queue.getName());
}
public void block()
{
block(this, "** All Queues **");
}
private void block(final Object queue, final String name)
{
synchronized (_blockingEntities)
{
if(_blockingEntities.add(queue))
{
if(_blocking.compareAndSet(false,true))
{
getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
getConnection().notifyWork();
}
}
}
}
}
public void unblock(Queue<?> queue)
{
unblock((Object)queue);
}
public void unblock()
{
unblock(this);
}
private void unblock(final Object queue)
{
if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
getConnection().notifyWork();
}
}
}
boolean blockingTimeoutExceeded()
{
long blockTime = _blockTime;
boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
return b;
}
@Override
public void transportStateChanged()
{
for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
{
consumerTarget.transportStateChanged();
}
}
@Override
public Object getConnectionReference()
{
return getConnection().getReference();
}
public String toLogString()
{
long connectionId = super.getConnection() instanceof ServerConnection
? getConnection().getConnectionId()
: -1;
String authorizedPrincipal = (getAuthorizedPrincipal() == null) ? "?" : getAuthorizedPrincipal().getName();
String remoteAddress = String.valueOf(getConnection().getRemoteSocketAddress());
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
connectionId,
authorizedPrincipal,
remoteAddress,
getAddressSpace().getName(),
getChannel())
+ "] ";
}
@Override
public void close(AMQConstant cause, String message)
{
if (cause == null)
{
close();
}
else
{
close(cause.getCode(), message);
}
}
void close(int cause, String message)
{
_forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message));
close();
}
@Override
public void close()
{
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
unregisterSubscriptions();
if(_modelObject != null)
{
_modelObject.delete();
}
super.close();
}
void unregisterSubscriptions()
{
final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
}
void stopSubscriptions()
{
final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
}
public void receivedComplete()
{
final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
awaitCommandCompletion();
}
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
}
public boolean getBlocking()
{
return _blocking.get();
}
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
public void completeAsyncCommands()
{
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
{
cmd.complete();
_unfinishedCommandsQueue.poll();
}
while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
{
cmd = _unfinishedCommandsQueue.poll();
cmd.complete();
}
}
public void awaitCommandCompletion()
{
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.poll()) != null)
{
cmd.complete();
}
}
public Object getAsyncCommandMark()
{
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
private static class AsyncCommand
{
private final ListenableFuture<Void> _future;
private ServerTransaction.Action _action;
public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
}
void complete()
{
boolean interrupted = false;
try
{
while (true)
{
try
{
_future.get();
break;
}
catch (InterruptedException e)
{
interrupted = true;
}
}
}
catch(ExecutionException e)
{
if(e.getCause() instanceof RuntimeException)
{
throw (RuntimeException)e.getCause();
}
else if(e.getCause() instanceof Error)
{
throw (Error) e.getCause();
}
else
{
throw new ServerScopedRuntimeException(e.getCause());
}
}
if(interrupted)
{
Thread.currentThread().interrupt();
}
_action.postCommit();
_action = null;
}
boolean isReadyForCompletion()
{
return _future.isDone();
}
}
protected void setClose(boolean close)
{
super.setClose(close);
}
@Override
public int getConsumerCount()
{
return _subscriptions.values().size();
}
@Override
public Collection<Consumer<?>> getConsumers()
{
return Collections.unmodifiableCollection(_consumers);
}
@Override
public void addConsumerListener(final ConsumerListener listener)
{
_consumerListeners.add(listener);
}
@Override
public void removeConsumerListener(final ConsumerListener listener)
{
_consumerListeners.remove(listener);
}
@Override
public void setModelObject(final org.apache.qpid.server.model.Session<?> session)
{
_modelObject = session;
}
@Override
public org.apache.qpid.server.model.Session<?> getModelObject()
{
return _modelObject;
}
@Override
public long getTransactionStartTime()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
{
return serverTransaction.getTransactionStartTime();
}
else
{
return 0L;
}
}
@Override
public long getTransactionUpdateTime()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
{
return serverTransaction.getTransactionUpdateTime();
}
else
{
return 0L;
}
}
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
l.consumerAdded(consumer);
}
}
private void consumerRemoved(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
{
l.consumerRemoved(consumer);
}
}
@Override
public boolean processPending()
{
if (!getAMQPConnection().isIOThread())
{
return false;
}
boolean desiredBlockingState = _blocking.get();
if (desiredBlockingState != _wireBlockingState)
{
_wireBlockingState = desiredBlockingState;
if (desiredBlockingState)
{
invokeBlock();
}
else
{
invokeUnblock();
}
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
boolean consumerListNeedsRefreshing;
if(_consumersWithPendingWork.isEmpty())
{
_consumersWithPendingWork.addAll(getSubscriptions());
consumerListNeedsRefreshing = false;
}
else
{
consumerListNeedsRefreshing = true;
}
// QPID-7447: prevent unnecessary allocation of empty iterator
Iterator<ConsumerTarget> iter = _consumersWithPendingWork.isEmpty() ? Collections.<ConsumerTarget>emptyIterator() : _consumersWithPendingWork.iterator();
boolean consumerHasMoreWork = false;
while(iter.hasNext())
{
final ConsumerTarget target = iter.next();
iter.remove();
if(target.hasPendingWork())
{
consumerHasMoreWork = true;
target.processPending();
break;
}
}
return consumerHasMoreWork || consumerListNeedsRefreshing;
}
@Override
public void addTicker(final Ticker ticker)
{
getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
// trigger a wakeup to ensure the ticker will be taken into account
getAMQPConnection().notifyWork();
}
@Override
public void removeTicker(final Ticker ticker)
{
getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
}
@Override
public void notifyConsumerTargetCurrentStates()
{
Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
{
if(!consumerTarget.isPullOnly())
{
consumerTarget.notifyCurrentState();
}
}
}
@Override
public void ensureConsumersNoticedStateChange()
{
Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
{
try
{
consumerTarget.getSendLock();
}
finally
{
consumerTarget.releaseSendLock();
}
}
}
@Override
public void doTimeoutAction(final String reason)
{
getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
public final long getMaxUncommittedInMemorySize()
{
return _maxUncommittedInMemorySize;
}
@Override
public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
private class CheckCapacityAction implements Action<MessageInstance>
{
@Override
public void performAction(final MessageInstance entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
{
((CapacityChecker)queue).checkCapacity(ServerSession.this);
}
}
}
private class ConsumerClosedListener implements ConfigurationChangeListener
{
@Override
public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
{
if(newState == org.apache.qpid.server.model.State.DELETED)
{
consumerRemoved((Consumer<?>)object);
}
}
@Override
public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
{
}
@Override
public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
{
}
@Override
public void attributeSet(final ConfiguredObject object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
}
@Override
public void bulkChangeStart(final ConfiguredObject<?> object)
{
}
@Override
public void bulkChangeEnd(final ConfiguredObject<?> object)
{
}
}
}