blob: e77738b5ca587503a71e91db1bc219f24b0510de [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.CLOSED;
import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.CLOSING;
import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.DETACHED;
import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.NEW;
import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.OPEN;
import static org.apache.qpid.server.protocol.v0_10.ServerSession.State.RESUMING;
import static org.apache.qpid.server.protocol.v0_10.transport.Option.COMPLETED;
import static org.apache.qpid.server.protocol.v0_10.transport.Option.TIMELY_REPLY;
import static org.apache.qpid.server.util.Serial.ge;
import static org.apache.qpid.server.util.Serial.gt;
import static org.apache.qpid.server.util.Serial.le;
import static org.apache.qpid.server.util.Serial.lt;
import static org.apache.qpid.server.util.Serial.max;
import static org.apache.qpid.server.util.Strings.toUTF8;
import java.nio.ByteBuffer;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.txn.AsyncCommand;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_10.transport.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.NotAssociatedDtxException;
import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
public class ServerSession extends SessionInvoker
implements LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerSession.class);
public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
private static final String NULL_DESTINATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<>());
private final Deque<AsyncCommand> _unfinishedCommandsQueue = new ConcurrentLinkedDeque<>();
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
// completed incoming commands
private final Object processedLock = new Object();
private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
private final Object commandsLock = new Object();
private final Object stateLock = new Object();
private final Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<>();
private final AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<>();
private final long _blockingTimeout;
private final ServerConnection connection;
private final Binary name;
private final ServerSessionDelegate delegate;
private final Map<Integer,Method> commands = new HashMap<>();
private final int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024 * 1024);
private final Semaphore credit = new Semaphore(0);
private final Map<Integer,ResultFuture<?>> results = new HashMap<>();
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<>();
private Session_0_10 _modelObject;
private long _blockTime;
private boolean _wireBlockingState;
private boolean closing;
private int channel;
private boolean incomingInit;
// incoming command count
private int commandsIn;
private RangeSet processed;
private int maxProcessed;
private int syncPoint;
// outgoing command count
private int commandsOut = 0;
private int commandBytes = 0;
private int maxComplete = commandsOut - 1;
private State state = NEW;
private Thread resumer = null;
private boolean transacted = false;
private SessionDetachCode detachCode;
private boolean _isNoReplay = false;
private org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exception = null;
private volatile ServerTransaction _transaction;
public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
{
this.connection = connection;
this.delegate = delegate;
this.name = name;
this.closing = false;
this._isNoReplay = false;
initReceiver();
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_blockingTimeout = connection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
}
public Binary getName()
{
return name;
}
protected void setClose(boolean close)
{
this.closing = close;
}
public int getChannel()
{
return channel;
}
void setChannel(int channel)
{
this.channel = channel;
}
protected State getState()
{
return this.state;
}
void addCredit(int value)
{
credit.release(value);
}
void drainCredit()
{
credit.drainPermits();
}
private void initReceiver()
{
synchronized (processedLock)
{
incomingInit = false;
processed = RangeSetFactory.createRangeSet();
}
}
void attach()
{
initReceiver();
sessionAttach(name.getBytes());
sessionRequestTimeout(0);//use expiry here only if/when session resume is supported
}
void resume()
{
synchronized (commandsLock)
{
attach();
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = getCommand(i);
if (m == null)
{
m = new ExecutionSync();
m.setId(i);
}
else if (m instanceof MessageTransfer)
{
MessageTransfer xfr = (MessageTransfer)m;
Header header = xfr.getHeader();
if (header != null)
{
if (header.getDeliveryProperties() != null)
{
header.getDeliveryProperties().setRedelivered(true);
}
else
{
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setRedelivered(true);
xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
header.getNonStandardProperties()));
}
}
else
{
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setRedelivered(true);
xfr.setHeader(new Header(deliveryProps, null, null));
}
}
sessionCommandPoint(m.getId(), 0);
send(m);
}
sessionCommandPoint(commandsOut, 0);
sessionFlush(COMPLETED);
resumer = Thread.currentThread();
state = RESUMING;
if(isTransacted())
{
txSelect();
}
resumer = null;
}
}
private Method getCommand(int i)
{
return commands.get(i);
}
private void setCommand(int commandId, Method command)
{
commands.put(commandId, command);
}
private Method removeCommand(int id)
{
return commands.remove(id);
}
final void commandPoint(int id)
{
synchronized (processedLock)
{
this.commandsIn = id;
if (!incomingInit)
{
incomingInit = true;
maxProcessed = commandsIn - 1;
syncPoint = maxProcessed;
}
}
}
public int getCommandsOut()
{
return commandsOut;
}
public int getCommandsIn()
{
return commandsIn;
}
public int nextCommandId()
{
return commandsIn++;
}
final void identify(Method cmd)
{
if (!incomingInit)
{
throw new IllegalStateException();
}
int id = nextCommandId();
cmd.setId(id);
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("identify: ch={}, commandId={}", this.channel, id);
}
if ((id & 0xff) == 0)
{
flushProcessed(TIMELY_REPLY);
}
}
public void processed(Method command)
{
processed(command.getId());
}
public void processed(int command)
{
processed(command, command);
}
public void processed(Range range)
{
processed(range.getLower(), range.getUpper());
}
public void processed(int lower, int upper)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("{} ch={} processed([{},{}]) {} {}", this, channel, lower, upper, syncPoint, maxProcessed);
}
boolean flush;
synchronized (processedLock)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("{} processed: {}", this, processed);
}
if (ge(upper, commandsIn))
{
throw new IllegalArgumentException
("range exceeds max received command-id: " + Range.newInstance(lower, upper));
}
processed.add(lower, upper);
Range first = processed.getFirst();
int flower = first.getLower();
int fupper = first.getUpper();
int old = maxProcessed;
if (le(flower, maxProcessed + 1))
{
maxProcessed = max(maxProcessed, fupper);
}
boolean synced = ge(maxProcessed, syncPoint);
flush = lt(old, syncPoint) && synced;
if (synced)
{
syncPoint = maxProcessed;
}
}
if (flush)
{
flushProcessed();
}
}
void flushExpected()
{
RangeSet rs = RangeSetFactory.createRangeSet();
synchronized (processedLock)
{
if (incomingInit)
{
rs.add(commandsIn);
}
}
sessionExpected(rs, null);
}
public void flushProcessed(Option... options)
{
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
synchronized (commandsLock)
{
if (state == DETACHED || state == CLOSING || state == CLOSED)
{
return;
}
if (copy.size() > 0)
{
sessionCompleted(copy, options);
}
}
}
void knownComplete(RangeSet kc)
{
if (kc.size() > 0)
{
synchronized (processedLock)
{
processed.subtract(kc) ;
}
}
}
void syncPoint()
{
int id = getCommandsIn() - 1;
LOGGER.debug("{} synced to {}", this, id);
boolean flush;
synchronized (processedLock)
{
syncPoint = id;
flush = ge(maxProcessed, syncPoint);
}
if (flush)
{
flushProcessed();
}
}
protected boolean complete(int lower, int upper)
{
//avoid autoboxing
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("{} complete({}, {})", this, lower, upper);
}
synchronized (commandsLock)
{
int old = maxComplete;
for (int id = max(maxComplete, lower); le(id, upper); id++)
{
Method m = removeCommand(id);
if (m != null)
{
commandBytes -= m.getBodySize();
m.complete();
}
}
if (le(lower, maxComplete + 1))
{
maxComplete = max(maxComplete, upper);
}
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("{} commands remaining: {}", this, commandsOut - maxComplete);
}
commandsLock.notifyAll();
return gt(maxComplete, old);
}
}
void received(Method m)
{
m.delegate(this, delegate);
}
private void send(Method m)
{
m.setChannel(channel);
connection.send(m);
if (!m.isBatch())
{
connection.flush();
}
}
protected boolean isBytesFull()
{
return commandBytes >= byteLimit;
}
protected boolean isCommandsFull(int id)
{
return id - maxComplete >= commandLimit;
}
@Override
public void invoke(Method m)
{
invoke(m,(Runnable)null);
}
public void invoke(Method m, Runnable postIdSettingAction)
{
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
{
Thread current = Thread.currentThread();
if (!current.equals(resumer))
{
return;
}
}
if (state != OPEN && state != CLOSED && state != CLOSING)
{
Thread current = Thread.currentThread();
if (!current.equals(resumer) )
{
// Should not happen
throw new SessionException(String.format("Unexpected state %s", state));
}
}
switch (state)
{
case OPEN:
break;
case RESUMING:
Thread current = Thread.currentThread();
if (!current.equals(resumer))
{
throw new SessionException
("timed out waiting for resume to finish");
}
break;
case CLOSING:
case CLOSED:
org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc = getException();
if (exc != null)
{
throw new SessionException(exc);
}
else
{
throw new SessionClosedException();
}
default:
throw new SessionException
(String.format
("timed out waiting for session to become open " +
"(state=%s)", state));
}
int next;
next = commandsOut++;
m.setId(next);
if(postIdSettingAction != null)
{
postIdSettingAction.run();
}
if (isFull(next))
{
// Should not happen
throw new SessionException(String.format("Command buffer full next: %d", next));
}
if (state == CLOSED)
{
org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc = getException();
if (exc != null)
{
throw new SessionException(exc);
}
else
{
throw new SessionClosedException();
}
}
if (isFull(next))
{
throw new SessionException("timed out waiting for completion");
}
if (next == 0)
{
sessionCommandPoint(0, 0);
}
boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();
if ((replayTransfer) || m.hasCompletionListener())
{
setCommand(next, m);
commandBytes += m.getBodySize();
}
try
{
send(m);
}
catch (SenderException e)
{
if (!closing)
{
// if we are not closing then this will happen
// again on resume
LOGGER.error("error sending command", e);
}
else
{
e.rethrow();
}
}
// flush every 64K commands to avoid ambiguity on
// wraparound
if (shouldIssueFlush(next))
{
try
{
sessionFlush(COMPLETED);
}
catch (SenderException e)
{
if (!closing)
{
// if expiry is > 0 then this will happen
// again on resume
LOGGER.error("error sending flush (periodic)", e);
}
else
{
e.rethrow();
}
}
}
}
}
else
{
send(m);
}
}
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
}
void result(int command, Struct result)
{
ResultFuture<?> future;
synchronized (results)
{
future = results.remove(command);
}
if (future != null)
{
future.set(result);
}
else
{
LOGGER.warn("Received a response to a command" +
" that's no longer valid on the client side." +
" [ command id : {} , result : {} ]", command, result);
}
}
void setException(org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc)
{
synchronized (results)
{
if (exception != null)
{
throw new IllegalStateException(
String.format("too many exceptions: %s, %s", exception, exc));
}
exception = exc;
}
}
org.apache.qpid.server.protocol.v0_10.transport.ExecutionException getException()
{
synchronized (results)
{
return exception;
}
}
@Override
protected <T> Future<T> invoke(Method m, Class<T> klass)
{
synchronized (commandsLock)
{
int command = commandsOut;
ResultFuture<T> future = new ResultFuture<>(klass);
synchronized (results)
{
results.put(command, future);
}
invoke(m);
return future;
}
}
public final void messageTransfer(String destination,
MessageAcceptMode acceptMode,
MessageAcquireMode acquireMode,
Header header,
byte[] body,
Option ... _options) {
messageTransfer(destination, acceptMode, acquireMode, header,
ByteBuffer.wrap(body), _options);
}
public final void messageTransfer(String destination,
MessageAcceptMode acceptMode,
MessageAcquireMode acquireMode,
Header header,
String body,
Option ... _options) {
messageTransfer(destination, acceptMode, acquireMode, header,
toUTF8(body), _options);
}
public void exception(Throwable t)
{
LOGGER.error("caught exception", t);
}
public void closed()
{
synchronized (commandsLock)
{
if (closing || getException() != null)
{
state = CLOSED;
}
else
{
state = DETACHED;
}
commandsLock.notifyAll();
synchronized (results)
{
for (ResultFuture<?> result : results.values())
{
synchronized(result)
{
result.notifyAll();
}
}
}
if(state == CLOSED)
{
delegate.closed(this);
}
else
{
delegate.detached(this);
}
}
if(state == CLOSED)
{
connection.removeSession(this);
}
}
public boolean isClosing()
{
return state == CLOSED || state == CLOSING || connection.isClosing();
}
@Override
public String toString()
{
return String.format("ssn:%s", name);
}
public void setTransacted(boolean b) {
this.transacted = b;
}
public boolean isTransacted(){
return transacted;
}
public void setDetachCode(SessionDetachCode dtc)
{
this.detachCode = dtc;
}
public SessionDetachCode getDetachCode()
{
return this.detachCode;
}
public Object getStateLock()
{
return stateLock;
}
protected void sendSessionAttached(final byte[] name, final Option... options)
{
super.sessionAttached(name, options);
}
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
public interface MessageDispositionChangeListener
{
void onAccept();
void onRelease(boolean setRedelivered, final boolean closing);
void onReject();
boolean acquire();
}
private interface MessageDispositionAction
{
void performAction(MessageDispositionChangeListener listener);
}
public Subject getSubject()
{
return _modelObject.getSubject();
}
public AccessControlContext getAccessControllerContext()
{
return _modelObject.getAccessControllerContext();
}
protected void setState(final State state)
{
if(runningAsSubject())
{
synchronized (commandsLock)
{
this.state = state;
commandsLock.notifyAll();
}
if (state == State.OPEN)
{
getAMQPConnection().getEventLogger().message(ChannelMessages.CREATE());
}
}
else
{
runAsSubject(() ->
{
setState(state);
return null;
});
}
}
private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
{
return AccessController.doPrivileged(privilegedAction, getAccessControllerContext());
}
private boolean runningAsSubject()
{
return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
}
private void invokeBlock()
{
invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
invoke(new MessageStop(""));
}
private void invokeUnblock()
{
MessageFlow mf = new MessageFlow();
mf.setUnit(MessageCreditUnit.MESSAGE);
mf.setDestination("");
_outstandingCredit.set(Integer.MAX_VALUE);
mf.setValue(Integer.MAX_VALUE);
invoke(mf);
}
void authorisePublish(final MessageDestination destination,
final String routingKey,
final boolean immediate,
final long currentTime)
{
_modelObject.getPublishAuthCache().authorisePublish(destination, routingKey, immediate, currentTime);
}
protected boolean isFull(int id)
{
return isCommandsFull(id);
}
RoutingResult<MessageTransferMessage> enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
{
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
// locally cache arrival time to ensure that we don't reload metadata
final long arrivalTime = message.getArrivalTime();
final RoutingResult<MessageTransferMessage> result =
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
result.send(_transaction, null);
getModelObject().registerMessageReceived(message.getSize());
if (isTransactional())
{
getModelObject().registerTransactedMessageReceived();
}
return result;
}
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
getModelObject().registerMessageDelivered(xfr.getBodySize());
if (_transaction.isTransactional())
{
getModelObject().registerTransactedMessageDelivered();
}
invoke(xfr, postIdSettingAction);
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
{
_messageDispositionListenerMap.put(xfr.getId(), acceptListener);
}
public void accept(RangeSet ranges)
{
dispositionChange(ranges, MessageDispositionChangeListener::onAccept);
}
public void release(RangeSet ranges, final boolean setRedelivered)
{
dispositionChange(ranges, listener -> listener.onRelease(setRedelivered, false));
}
public void reject(RangeSet ranges)
{
dispositionChange(ranges, MessageDispositionChangeListener::onReject);
}
public RangeSet acquire(RangeSet transfers)
{
RangeSet acquired = RangeSetFactory.createRangeSet();
if(!_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = transfers.iterator();
if(rangeIter.hasNext())
{
Range range = rangeIter.next();
while(range != null && unacceptedMessages.hasNext())
{
int next = unacceptedMessages.next();
while(gt(next, range.getUpper()))
{
if(rangeIter.hasNext())
{
range = rangeIter.next();
}
else
{
range = null;
break;
}
}
if(range != null && range.includes(next))
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
if(changeListener != null && changeListener.acquire())
{
acquired.add(next);
}
}
}
}
}
return acquired;
}
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
if(ranges != null)
{
if(ranges.size() == 1)
{
Range r = ranges.getFirst();
for(int i = r.getLower(); i <= r.getUpper(); i++)
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
if(changeListener != null)
{
action.performAction(changeListener);
}
}
}
else if(!_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = ranges.iterator();
if(rangeIter.hasNext())
{
Range range = rangeIter.next();
while(range != null && unacceptedMessages.hasNext())
{
int next = unacceptedMessages.next();
while(gt(next, range.getUpper()))
{
if(rangeIter.hasNext())
{
range = rangeIter.next();
}
else
{
range = null;
break;
}
}
if(range != null && range.includes(next))
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
action.performAction(changeListener);
}
}
}
}
}
}
public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
public void onClose()
{
AMQPConnection_0_10 amqpConnection = getAMQPConnection();
if(_transaction instanceof LocalTransaction)
{
if (((LocalTransaction) _transaction).hasOutstandingWork())
{
amqpConnection.incrementTransactionRollbackCounter();
}
amqpConnection.decrementTransactionOpenCounter();
_transaction.rollback();
amqpConnection.unregisterTransactionTickers(_transaction);
}
else if(_transaction instanceof DistributedTransaction)
{
getAddressSpace().getDtxRegistry().endAssociations(_modelObject);
}
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease(true, true);
}
_messageDispositionListenerMap.clear();
for (Action<? super Session_0_10> task : _modelObject.getTaskList())
{
task.performAction(_modelObject);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
if (operationalLoggingMessage == null && getConnection().getConnectionCloseMessage() != null)
{
operationalLoggingMessage = ChannelMessages.CLOSE_FORCED(getConnection().getConnectionCloseCode(),
getConnection().getConnectionCloseMessage());
}
if (operationalLoggingMessage == null)
{
operationalLoggingMessage = ChannelMessages.CLOSE();
}
amqpConnection.getEventLogger().message(getLogSubject(), operationalLoggingMessage);
}
protected void awaitClose()
{
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
public void acknowledge(final MessageInstanceConsumer consumer,
final ConsumerTarget_0_10 target,
final MessageInstance entry)
{
if (entry.makeAcquisitionUnstealable(consumer))
{
_transaction.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
// The client has acknowledge the message and therefore have seen it.
// In the event of rollback, the message must be marked as redelivered.
entry.setRedelivered();
entry.release(consumer);
}
});
}
}
Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
sub.close();
}
public boolean isTransactional()
{
return _transaction.isTransactional();
}
ServerTransaction getTransaction()
{
return _transaction;
}
public void selectTx()
{
ServerTransaction txn = _transaction;
AMQPConnection_0_10 amqpConnection = getAMQPConnection();
if (txn instanceof LocalTransaction)
{
amqpConnection.unregisterTransactionTickers(_transaction);
}
_transaction = amqpConnection.createLocalTransaction();
long notificationRepeatPeriod =
getModelObject().getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
amqpConnection.registerTransactionTickers(_transaction,
message -> amqpConnection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
(String) message),
notificationRepeatPeriod);
}
public void selectDtx()
{
_transaction = new DistributedTransaction(_modelObject, getAddressSpace().getDtxRegistry());
}
public void startDtx(Xid xid, boolean join, boolean resume)
throws JoinAndResumeDtxException,
UnknownDtxBranchException,
AlreadyKnownDtxException,
DtxNotSelectedException
{
DistributedTransaction distributedTransaction = assertDtxTransaction();
distributedTransaction.start(toDtxXid(xid), join, resume);
}
public void endDtx(Xid xid, boolean fail, boolean suspend)
throws NotAssociatedDtxException,
UnknownDtxBranchException,
DtxNotSelectedException,
SuspendAndFailDtxException, TimeoutDtxException
{
DistributedTransaction distributedTransaction = assertDtxTransaction();
distributedTransaction.end(toDtxXid(xid), fail, suspend);
}
public long getTimeoutDtx(Xid xid)
throws UnknownDtxBranchException
{
return getAddressSpace().getDtxRegistry().getTimeout(toDtxXid(xid));
}
public void setTimeoutDtx(Xid xid, long timeout)
throws UnknownDtxBranchException
{
getAddressSpace().getDtxRegistry().setTimeout(toDtxXid(xid), timeout);
}
public void prepareDtx(Xid xid)
throws UnknownDtxBranchException,
IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException
{
getAddressSpace().getDtxRegistry().prepare(toDtxXid(xid));
}
public void commitDtx(Xid xid, boolean onePhase)
throws UnknownDtxBranchException,
IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException
{
getAddressSpace().getDtxRegistry().commit(toDtxXid(xid), onePhase);
}
public void rollbackDtx(Xid xid)
throws UnknownDtxBranchException,
IncorrectDtxStateException, StoreException, TimeoutDtxException
{
getAddressSpace().getDtxRegistry().rollback(toDtxXid(xid));
}
public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
{
getAddressSpace().getDtxRegistry().forget(toDtxXid(xid));
}
public List<Xid> recoverDtx()
{
List<Xid> xids = new ArrayList<>();
Iterator<org.apache.qpid.server.txn.Xid> dtxXids = getAddressSpace().getDtxRegistry().recover().iterator();
while(dtxXids.hasNext())
{
org.apache.qpid.server.txn.Xid dtxXid = dtxXids.next();
xids.add(new Xid(dtxXid.getFormat(), dtxXid.getGlobalId(), dtxXid.getBranchId()));
}
return xids;
}
private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
{
if(_transaction instanceof DistributedTransaction)
{
return (DistributedTransaction) _transaction;
}
else
{
throw new DtxNotSelectedException();
}
}
public void commit()
{
_transaction.commit();
getAMQPConnection().incrementTransactionBeginCounter();
}
public void rollback()
{
_transaction.rollback();
AMQPConnection_0_10 amqpConnection = getAMQPConnection();
amqpConnection.incrementTransactionRollbackCounter();
amqpConnection.incrementTransactionBeginCounter();
}
public int getChannelId()
{
return getChannel();
}
public Principal getAuthorizedPrincipal()
{
return getConnection().getAuthorizedPrincipal();
}
public Subject getAuthorizedSubject()
{
return getSubject();
}
public Object getReference()
{
return getConnection().getReference();
}
public MessageStore getMessageStore()
{
return getAddressSpace().getMessageStore();
}
public NamedAddressSpace getAddressSpace()
{
return getConnection().getAddressSpace();
}
public boolean isDurable()
{
return false;
}
public UUID getId()
{
return _modelObject.getId();
}
public AMQPConnection_0_10 getAMQPConnection()
{
return getConnection().getAmqpConnection();
}
public ServerConnection getConnection()
{
return connection;
}
public LogSubject getLogSubject()
{
return _modelObject.getLogSubject();
}
public void block(Queue<?> queue)
{
block(queue, queue.getName());
}
public void block()
{
block(this, "** All Queues **");
}
private void block(final Object queue, final String name)
{
synchronized (_blockingEntities)
{
if(_blockingEntities.add(queue))
{
if(_blocking.compareAndSet(false,true))
{
getAMQPConnection().getEventLogger().message(getLogSubject(), ChannelMessages.FLOW_ENFORCED(name));
if(getState() == State.OPEN)
{
getAMQPConnection().notifyWork(_modelObject);
}
}
}
}
}
public void unblock(Queue<?> queue)
{
unblock((Object)queue);
}
public void unblock()
{
unblock(this);
}
private void unblock(final Object queue)
{
if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
getAMQPConnection().getEventLogger().message(getLogSubject(), ChannelMessages.FLOW_REMOVED());
getAMQPConnection().notifyWork(_modelObject);
}
}
}
boolean blockingTimeoutExceeded()
{
long blockTime = _blockTime;
boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
return b;
}
public void updateBlockedStateIfNecesssary()
{
boolean desiredBlockingState = _blocking.get();
if (desiredBlockingState != _wireBlockingState)
{
_wireBlockingState = desiredBlockingState;
if (desiredBlockingState)
{
invokeBlock();
}
else
{
invokeUnblock();
}
_blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
}
}
public Object getConnectionReference()
{
return getConnection().getReference();
}
@Override
public String toLogString()
{
return _modelObject.toLogString();
}
public void close(int cause, String message)
{
_forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message));
close();
}
public void close()
{
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
unregisterSubscriptions();
if(_modelObject != null)
{
_modelObject.delete();
}
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Closing [{}] in state [{}]", this, state);
}
synchronized (commandsLock)
{
switch(state)
{
case DETACHED:
state = CLOSED;
delegate.closed(this);
connection.removeSession(this);
break;
case CLOSED:
break;
default:
state = CLOSING;
setClose(true);
sessionRequestTimeout(0);
sessionDetach(name.getBytes());
awaitClose();
}
}
}
void unregisterSubscriptions()
{
final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
}
void stopSubscriptions()
{
final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
}
void receivedComplete()
{
runAsSubject(() ->
{
final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
awaitCommandCompletion();
return null;
});
}
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
}
public boolean getBlocking()
{
return _blocking.get();
}
public void completeAsyncCommands()
{
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
{
cmd.complete();
_unfinishedCommandsQueue.poll();
}
while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
{
cmd = _unfinishedCommandsQueue.poll();
cmd.complete();
}
}
public void awaitCommandCompletion()
{
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.poll()) != null)
{
cmd.complete();
}
}
public Object getAsyncCommandMark()
{
return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
}
@Override
public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
public void setModelObject(final Session_0_10 session)
{
_modelObject = session;
}
public Session_0_10 getModelObject()
{
return _modelObject;
}
public long getTransactionStartTimeLong()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
{
return serverTransaction.getTransactionStartTime();
}
else
{
return 0L;
}
}
public long getTransactionUpdateTimeLong()
{
ServerTransaction serverTransaction = _transaction;
if (serverTransaction.isTransactional())
{
return serverTransaction.getTransactionUpdateTime();
}
else
{
return 0L;
}
}
private class ResultFuture<T> implements Future<T>
{
private final Class<T> klass;
private T result;
private ResultFuture(Class<T> klass)
{
this.klass = klass;
}
private void set(Struct result)
{
synchronized (this)
{
this.result = klass.cast(result);
notifyAll();
}
}
@Override
public T get(long timeout)
{
synchronized (this)
{
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
LOGGER.debug("{} waiting for result: {}", ServerSession.this, this);
w.await();
}
}
if (isDone())
{
return result;
}
else if (state == CLOSED)
{
org.apache.qpid.server.protocol.v0_10.transport.ExecutionException ex = getException();
if(ex == null)
{
throw new SessionClosedException();
}
throw new SessionException(ex);
}
else
{
throw new SessionException(
String.format("%s timed out waiting for result: %s",
ServerSession.this, this));
}
}
@Override
public boolean isDone()
{
return result != null;
}
@Override
public String toString()
{
return String.format("Future(%s)", isDone() ? result : klass);
}
}
public static org.apache.qpid.server.txn.Xid toDtxXid(final Xid xid)
{
return new org.apache.qpid.server.txn.Xid(xid.getFormat(),
xid.getGlobalId(),
xid.getBranchId());
}
}