| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.transport; |
| |
| import java.security.Principal; |
| import java.text.MessageFormat; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.security.auth.Subject; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.AMQStoreException; |
| import org.apache.qpid.protocol.AMQConstant; |
| import org.apache.qpid.server.TransactionTimeoutHelper; |
| import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; |
| import org.apache.qpid.server.logging.LogActor; |
| import org.apache.qpid.server.logging.LogSubject; |
| import org.apache.qpid.server.logging.actors.CurrentActor; |
| import org.apache.qpid.server.logging.actors.GenericActor; |
| import org.apache.qpid.server.logging.messages.ChannelMessages; |
| import org.apache.qpid.server.logging.subjects.ChannelLogSubject; |
| import org.apache.qpid.server.message.InboundMessage; |
| import org.apache.qpid.server.message.MessageMetaData_0_10; |
| import org.apache.qpid.server.message.MessageReference; |
| import org.apache.qpid.server.message.MessageTransferMessage; |
| import org.apache.qpid.server.message.ServerMessage; |
| import org.apache.qpid.server.protocol.AMQConnectionModel; |
| import org.apache.qpid.server.protocol.AMQSessionModel; |
| import org.apache.qpid.server.queue.AMQQueue; |
| import org.apache.qpid.server.queue.BaseQueue; |
| import org.apache.qpid.server.queue.QueueEntry; |
| import org.apache.qpid.server.security.AuthorizationHolder; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StoreFuture; |
| import org.apache.qpid.server.subscription.Subscription_0_10; |
| import org.apache.qpid.server.txn.AlreadyKnownDtxException; |
| import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; |
| import org.apache.qpid.server.txn.DistributedTransaction; |
| import org.apache.qpid.server.txn.DtxNotSelectedException; |
| import org.apache.qpid.server.txn.IncorrectDtxStateException; |
| import org.apache.qpid.server.txn.JoinAndResumeDtxException; |
| import org.apache.qpid.server.txn.LocalTransaction; |
| import org.apache.qpid.server.txn.NotAssociatedDtxException; |
| import org.apache.qpid.server.txn.RollbackOnlyDtxException; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.txn.SuspendAndFailDtxException; |
| import org.apache.qpid.server.txn.TimeoutDtxException; |
| import org.apache.qpid.server.txn.UnknownDtxBranchException; |
| import org.apache.qpid.server.virtualhost.VirtualHost; |
| import org.apache.qpid.transport.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; |
| import static org.apache.qpid.util.Serial.gt; |
| |
| public class ServerSession extends Session |
| implements AuthorizationHolder, |
| AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); |
| |
| private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); |
| private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; |
| private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; |
| |
| private final UUID _id = UUID.randomUUID(); |
| private long _createTime = System.currentTimeMillis(); |
| private LogActor _actor = GenericActor.getInstance(this); |
| |
| private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); |
| |
| private final AtomicBoolean _blocking = new AtomicBoolean(false); |
| private ChannelLogSubject _logSubject; |
| private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); |
| |
| public static interface MessageDispositionChangeListener |
| { |
| public void onAccept(); |
| |
| public void onRelease(boolean setRedelivered); |
| |
| public void onReject(); |
| |
| public boolean acquire(); |
| |
| |
| } |
| |
| public static interface Task |
| { |
| public void doTask(ServerSession session); |
| } |
| |
| |
| private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = |
| new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); |
| |
| private ServerTransaction _transaction; |
| |
| private final AtomicLong _txnStarts = new AtomicLong(0); |
| private final AtomicLong _txnCommits = new AtomicLong(0); |
| private final AtomicLong _txnRejects = new AtomicLong(0); |
| private final AtomicLong _txnCount = new AtomicLong(0); |
| |
| private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); |
| |
| private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); |
| |
| private final TransactionTimeoutHelper _transactionTimeoutHelper; |
| |
| |
| public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) |
| { |
| super(connection, delegate, name, expiry); |
| _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); |
| _logSubject = new ChannelLogSubject(this); |
| |
| _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() |
| { |
| @Override |
| public void doTimeoutAction(String reason) throws AMQException |
| { |
| getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); |
| } |
| }); |
| } |
| |
| protected void setState(State state) |
| { |
| super.setState(state); |
| |
| if (state == State.OPEN) |
| { |
| _actor.message(ChannelMessages.CREATE()); |
| if(_blocking.get()) |
| { |
| invokeBlock(); |
| } |
| } |
| } |
| |
| private void invokeBlock() |
| { |
| invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); |
| invoke(new MessageStop("")); |
| } |
| |
| @Override |
| protected boolean isFull(int id) |
| { |
| return isCommandsFull(id); |
| } |
| |
| public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) |
| { |
| if(_outstandingCredit.get() != UNLIMITED_CREDIT |
| && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) |
| { |
| _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); |
| invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); |
| } |
| getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); |
| PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; |
| _transaction.enqueue(queues,message, postTransactionAction); |
| incrementOutstandingTxnsIfNecessary(); |
| } |
| |
| |
| public void sendMessage(MessageTransfer xfr, |
| Runnable postIdSettingAction) |
| { |
| getConnectionModel().registerMessageDelivered(xfr.getBodySize()); |
| invoke(xfr, postIdSettingAction); |
| } |
| |
| public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) |
| { |
| _messageDispositionListenerMap.put(xfr.getId(), acceptListener); |
| } |
| |
| |
| private static interface MessageDispositionAction |
| { |
| void performAction(MessageDispositionChangeListener listener); |
| } |
| |
| public void accept(RangeSet ranges) |
| { |
| dispositionChange(ranges, new MessageDispositionAction() |
| { |
| public void performAction(MessageDispositionChangeListener listener) |
| { |
| listener.onAccept(); |
| } |
| }); |
| } |
| |
| |
| public void release(RangeSet ranges, final boolean setRedelivered) |
| { |
| dispositionChange(ranges, new MessageDispositionAction() |
| { |
| public void performAction(MessageDispositionChangeListener listener) |
| { |
| listener.onRelease(setRedelivered); |
| } |
| }); |
| } |
| |
| public void reject(RangeSet ranges) |
| { |
| dispositionChange(ranges, new MessageDispositionAction() |
| { |
| public void performAction(MessageDispositionChangeListener listener) |
| { |
| listener.onReject(); |
| } |
| }); |
| } |
| |
| public RangeSet acquire(RangeSet transfers) |
| { |
| RangeSet acquired = RangeSetFactory.createRangeSet(); |
| |
| if(!_messageDispositionListenerMap.isEmpty()) |
| { |
| Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); |
| Iterator<Range> rangeIter = transfers.iterator(); |
| |
| if(rangeIter.hasNext()) |
| { |
| Range range = rangeIter.next(); |
| |
| while(range != null && unacceptedMessages.hasNext()) |
| { |
| int next = unacceptedMessages.next(); |
| while(gt(next, range.getUpper())) |
| { |
| if(rangeIter.hasNext()) |
| { |
| range = rangeIter.next(); |
| } |
| else |
| { |
| range = null; |
| break; |
| } |
| } |
| if(range != null && range.includes(next)) |
| { |
| MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next); |
| if(changeListener != null && changeListener.acquire()) |
| { |
| acquired.add(next); |
| } |
| } |
| |
| |
| } |
| |
| } |
| |
| |
| } |
| |
| return acquired; |
| } |
| |
| public void dispositionChange(RangeSet ranges, MessageDispositionAction action) |
| { |
| if(ranges != null) |
| { |
| |
| if(ranges.size() == 1) |
| { |
| Range r = ranges.getFirst(); |
| for(int i = r.getLower(); i <= r.getUpper(); i++) |
| { |
| MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i); |
| if(changeListener != null) |
| { |
| action.performAction(changeListener); |
| } |
| } |
| } |
| else if(!_messageDispositionListenerMap.isEmpty()) |
| { |
| Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); |
| Iterator<Range> rangeIter = ranges.iterator(); |
| |
| if(rangeIter.hasNext()) |
| { |
| Range range = rangeIter.next(); |
| |
| while(range != null && unacceptedMessages.hasNext()) |
| { |
| int next = unacceptedMessages.next(); |
| while(gt(next, range.getUpper())) |
| { |
| if(rangeIter.hasNext()) |
| { |
| range = rangeIter.next(); |
| } |
| else |
| { |
| range = null; |
| break; |
| } |
| } |
| if(range != null && range.includes(next)) |
| { |
| MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); |
| action.performAction(changeListener); |
| } |
| |
| |
| } |
| |
| } |
| } |
| } |
| } |
| |
| public void removeDispositionListener(Method method) |
| { |
| _messageDispositionListenerMap.remove(method.getId()); |
| } |
| |
| public void onClose() |
| { |
| if(_transaction instanceof LocalTransaction) |
| { |
| _transaction.rollback(); |
| } |
| else if(_transaction instanceof DistributedTransaction) |
| { |
| getVirtualHost().getDtxRegistry().endAssociations(this); |
| } |
| |
| for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) |
| { |
| listener.onRelease(true); |
| } |
| _messageDispositionListenerMap.clear(); |
| |
| for (Task task : _taskList) |
| { |
| task.doTask(this); |
| } |
| |
| CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); |
| } |
| |
| @Override |
| protected void awaitClose() |
| { |
| // Broker shouldn't block awaiting close - thus do override this method to do nothing |
| } |
| |
| public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) |
| { |
| _transaction.dequeue(entry.getQueue(), entry.getMessage(), |
| new ServerTransaction.Action() |
| { |
| |
| public void postCommit() |
| { |
| sub.acknowledge(entry); |
| } |
| |
| public void onRollback() |
| { |
| // The client has acknowledge the message and therefore have seen it. |
| // In the event of rollback, the message must be marked as redelivered. |
| entry.setRedelivered(); |
| entry.release(); |
| } |
| }); |
| } |
| |
| public Collection<Subscription_0_10> getSubscriptions() |
| { |
| return _subscriptions.values(); |
| } |
| |
| public void register(String destination, Subscription_0_10 sub) |
| { |
| _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub); |
| } |
| |
| public Subscription_0_10 getSubscription(String destination) |
| { |
| return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination); |
| } |
| |
| public void unregister(Subscription_0_10 sub) |
| { |
| _subscriptions.remove(sub.getName()); |
| try |
| { |
| sub.getSendLock(); |
| AMQQueue queue = sub.getQueue(); |
| if(queue != null) |
| { |
| queue.unregisterSubscription(sub); |
| } |
| } |
| catch (AMQException e) |
| { |
| // TODO |
| _logger.error("Failed to unregister subscription :" + e.getMessage(), e); |
| } |
| finally |
| { |
| sub.releaseSendLock(); |
| } |
| } |
| |
| public boolean isTransactional() |
| { |
| return _transaction.isTransactional(); |
| } |
| |
| public void selectTx() |
| { |
| _transaction = new LocalTransaction(this.getMessageStore()); |
| _txnStarts.incrementAndGet(); |
| } |
| |
| public void selectDtx() |
| { |
| _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost()); |
| |
| } |
| |
| |
| public void startDtx(Xid xid, boolean join, boolean resume) |
| throws JoinAndResumeDtxException, |
| UnknownDtxBranchException, |
| AlreadyKnownDtxException, |
| DtxNotSelectedException |
| { |
| DistributedTransaction distributedTransaction = assertDtxTransaction(); |
| distributedTransaction.start(xid, join, resume); |
| } |
| |
| |
| public void endDtx(Xid xid, boolean fail, boolean suspend) |
| throws NotAssociatedDtxException, |
| UnknownDtxBranchException, |
| DtxNotSelectedException, |
| SuspendAndFailDtxException, TimeoutDtxException |
| { |
| DistributedTransaction distributedTransaction = assertDtxTransaction(); |
| distributedTransaction.end(xid, fail, suspend); |
| } |
| |
| |
| public long getTimeoutDtx(Xid xid) |
| throws UnknownDtxBranchException |
| { |
| return getVirtualHost().getDtxRegistry().getTimeout(xid); |
| } |
| |
| |
| public void setTimeoutDtx(Xid xid, long timeout) |
| throws UnknownDtxBranchException |
| { |
| getVirtualHost().getDtxRegistry().setTimeout(xid, timeout); |
| } |
| |
| |
| public void prepareDtx(Xid xid) |
| throws UnknownDtxBranchException, |
| IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException |
| { |
| getVirtualHost().getDtxRegistry().prepare(xid); |
| } |
| |
| public void commitDtx(Xid xid, boolean onePhase) |
| throws UnknownDtxBranchException, |
| IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException |
| { |
| getVirtualHost().getDtxRegistry().commit(xid, onePhase); |
| } |
| |
| |
| public void rollbackDtx(Xid xid) |
| throws UnknownDtxBranchException, |
| IncorrectDtxStateException, AMQStoreException, TimeoutDtxException |
| { |
| getVirtualHost().getDtxRegistry().rollback(xid); |
| } |
| |
| |
| public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException |
| { |
| getVirtualHost().getDtxRegistry().forget(xid); |
| } |
| |
| public List<Xid> recoverDtx() |
| { |
| return getVirtualHost().getDtxRegistry().recover(); |
| } |
| |
| private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException |
| { |
| if(_transaction instanceof DistributedTransaction) |
| { |
| return (DistributedTransaction) _transaction; |
| } |
| else |
| { |
| throw new DtxNotSelectedException(); |
| } |
| } |
| |
| |
| public void commit() |
| { |
| _transaction.commit(); |
| |
| _txnCommits.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| } |
| |
| public void rollback() |
| { |
| _transaction.rollback(); |
| |
| _txnRejects.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| } |
| |
| |
| private void incrementOutstandingTxnsIfNecessary() |
| { |
| if(isTransactional()) |
| { |
| //There can currently only be at most one outstanding transaction |
| //due to only having LocalTransaction support. Set value to 1 if 0. |
| _txnCount.compareAndSet(0,1); |
| } |
| } |
| |
| private void decrementOutstandingTxnsIfNecessary() |
| { |
| if(isTransactional()) |
| { |
| //There can currently only be at most one outstanding transaction |
| //due to only having LocalTransaction support. Set value to 0 if 1. |
| _txnCount.compareAndSet(1,0); |
| } |
| } |
| |
| public Long getTxnCommits() |
| { |
| return _txnCommits.get(); |
| } |
| |
| public Long getTxnRejects() |
| { |
| return _txnRejects.get(); |
| } |
| |
| public int getChannelId() |
| { |
| return getChannel(); |
| } |
| |
| public Long getTxnCount() |
| { |
| return _txnCount.get(); |
| } |
| |
| public Long getTxnStart() |
| { |
| return _txnStarts.get(); |
| } |
| |
| public Principal getAuthorizedPrincipal() |
| { |
| return getConnection().getAuthorizedPrincipal(); |
| } |
| |
| public Subject getAuthorizedSubject() |
| { |
| return getConnection().getAuthorizedSubject(); |
| } |
| |
| public void addSessionCloseTask(Task task) |
| { |
| _taskList.add(task); |
| } |
| |
| public void removeSessionCloseTask(Task task) |
| { |
| _taskList.remove(task); |
| } |
| |
| public Object getReference() |
| { |
| return getConnection().getReference(); |
| } |
| |
| public MessageStore getMessageStore() |
| { |
| return getVirtualHost().getMessageStore(); |
| } |
| |
| public VirtualHost getVirtualHost() |
| { |
| return getConnection().getVirtualHost(); |
| } |
| |
| public boolean isDurable() |
| { |
| return false; |
| } |
| |
| |
| public long getCreateTime() |
| { |
| return _createTime; |
| } |
| |
| @Override |
| public UUID getId() |
| { |
| return _id; |
| } |
| |
| public AMQConnectionModel getConnectionModel() |
| { |
| return getConnection(); |
| } |
| |
| public String getClientID() |
| { |
| return getConnection().getClientId(); |
| } |
| |
| @Override |
| public ServerConnection getConnection() |
| { |
| return (ServerConnection) super.getConnection(); |
| } |
| |
| public LogActor getLogActor() |
| { |
| return _actor; |
| } |
| |
| public LogSubject getLogSubject() |
| { |
| return (LogSubject) this; |
| } |
| |
| public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException |
| { |
| _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); |
| } |
| |
| public void block(AMQQueue queue) |
| { |
| block(queue, queue.getName()); |
| } |
| |
| public void block() |
| { |
| block(this, "** All Queues **"); |
| } |
| |
| |
| private void block(Object queue, String name) |
| { |
| synchronized (_blockingEntities) |
| { |
| if(_blockingEntities.add(queue)) |
| { |
| |
| if(_blocking.compareAndSet(false,true)) |
| { |
| if(getState() == State.OPEN) |
| { |
| invokeBlock(); |
| } |
| _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); |
| } |
| |
| |
| } |
| } |
| } |
| |
| public void unblock(AMQQueue queue) |
| { |
| unblock((Object)queue); |
| } |
| |
| public void unblock() |
| { |
| unblock(this); |
| } |
| |
| private void unblock(Object queue) |
| { |
| synchronized(_blockingEntities) |
| { |
| if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) |
| { |
| if(_blocking.compareAndSet(true,false) && !isClosing()) |
| { |
| |
| _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); |
| MessageFlow mf = new MessageFlow(); |
| mf.setUnit(MessageCreditUnit.MESSAGE); |
| mf.setDestination(""); |
| _outstandingCredit.set(Integer.MAX_VALUE); |
| mf.setValue(Integer.MAX_VALUE); |
| invoke(mf); |
| |
| |
| } |
| } |
| } |
| } |
| |
| public boolean onSameConnection(InboundMessage inbound) |
| { |
| return ((inbound instanceof MessageTransferMessage) |
| && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference()) |
| || ((inbound instanceof MessageMetaData_0_10) |
| && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference()); |
| } |
| |
| |
| public String toLogString() |
| { |
| long connectionId = super.getConnection() instanceof ServerConnection |
| ? getConnection().getConnectionId() |
| : -1; |
| |
| String remoteAddress = String.valueOf(getConnection().getRemoteAddress()); |
| return "[" + |
| MessageFormat.format(CHANNEL_FORMAT, |
| connectionId, |
| getClientID(), |
| remoteAddress, |
| getVirtualHost().getName(), |
| getChannel()) |
| + "] "; |
| } |
| |
| @Override |
| public void close() |
| { |
| // unregister subscriptions in order to prevent sending of new messages |
| // to subscriptions with closing session |
| unregisterSubscriptions(); |
| super.close(); |
| } |
| |
| void unregisterSubscriptions() |
| { |
| final Collection<Subscription_0_10> subscriptions = getSubscriptions(); |
| for (Subscription_0_10 subscription_0_10 : subscriptions) |
| { |
| unregister(subscription_0_10); |
| } |
| } |
| |
| void stopSubscriptions() |
| { |
| final Collection<Subscription_0_10> subscriptions = getSubscriptions(); |
| for (Subscription_0_10 subscription_0_10 : subscriptions) |
| { |
| subscription_0_10.stop(); |
| } |
| } |
| |
| |
| public void receivedComplete() |
| { |
| final Collection<Subscription_0_10> subscriptions = getSubscriptions(); |
| for (Subscription_0_10 subscription_0_10 : subscriptions) |
| { |
| subscription_0_10.flushCreditState(false); |
| } |
| awaitCommandCompletion(); |
| } |
| |
| private class PostEnqueueAction implements ServerTransaction.Action |
| { |
| |
| private List<? extends BaseQueue> _queues; |
| private ServerMessage _message; |
| private final boolean _transactional; |
| |
| public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional) |
| { |
| _transactional = transactional; |
| setState(queues, message); |
| } |
| |
| public void setState(List<? extends BaseQueue> queues, ServerMessage message) |
| { |
| _message = message; |
| _queues = queues; |
| } |
| |
| public void postCommit() |
| { |
| MessageReference<?> ref = _message.newReference(); |
| for(int i = 0; i < _queues.size(); i++) |
| { |
| try |
| { |
| BaseQueue queue = _queues.get(i); |
| queue.enqueue(_message, _transactional, null); |
| if(queue instanceof AMQQueue) |
| { |
| ((AMQQueue)queue).checkCapacity(ServerSession.this); |
| } |
| |
| } |
| catch (AMQException e) |
| { |
| // TODO |
| throw new RuntimeException(e); |
| } |
| } |
| ref.release(); |
| } |
| |
| public void onRollback() |
| { |
| // NO-OP |
| } |
| } |
| |
| public int getUnacknowledgedMessageCount() |
| { |
| return _messageDispositionListenerMap.size(); |
| } |
| |
| public boolean getBlocking() |
| { |
| return _blocking.get(); |
| } |
| |
| private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); |
| |
| public void completeAsyncCommands() |
| { |
| AsyncCommand cmd; |
| while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) |
| { |
| cmd.complete(); |
| _unfinishedCommandsQueue.poll(); |
| } |
| while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) |
| { |
| cmd = _unfinishedCommandsQueue.poll(); |
| cmd.awaitReadyForCompletion(); |
| cmd.complete(); |
| } |
| } |
| |
| |
| public void awaitCommandCompletion() |
| { |
| AsyncCommand cmd; |
| while((cmd = _unfinishedCommandsQueue.poll()) != null) |
| { |
| cmd.awaitReadyForCompletion(); |
| cmd.complete(); |
| } |
| } |
| |
| |
| public Object getAsyncCommandMark() |
| { |
| return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); |
| } |
| |
| public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) |
| { |
| _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); |
| } |
| |
| private static class AsyncCommand |
| { |
| private final StoreFuture _future; |
| private ServerTransaction.Action _action; |
| |
| public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) |
| { |
| _future = future; |
| _action = action; |
| } |
| |
| void awaitReadyForCompletion() |
| { |
| _future.waitForCompletion(); |
| } |
| |
| void complete() |
| { |
| if(!_future.isComplete()) |
| { |
| _future.waitForCompletion(); |
| } |
| _action.postCommit(); |
| _action = null; |
| } |
| |
| boolean isReadyForCompletion() |
| { |
| return _future.isComplete(); |
| } |
| } |
| |
| protected void setClose(boolean close) |
| { |
| super.setClose(close); |
| } |
| |
| @Override |
| public int getConsumerCount() |
| { |
| return _subscriptions.values().size(); |
| } |
| |
| @Override |
| public int compareTo(AMQSessionModel o) |
| { |
| return getId().compareTo(o.getId()); |
| } |
| |
| } |