blob: 67ddd6ca77ac6b07420b5b9c625bfd5769a1f8ad [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
import java.lang.ref.WeakReference;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
public static interface MessageDispositionChangeListener
{
public void onAccept();
public void onRelease();
public void onReject();
public boolean acquire();
}
public static interface Task
{
public void doTask(ServerSession session);
}
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
private ServerTransaction _transaction;
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
private final WeakReference<Session> _reference;
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
}
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
{
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
_reference = new WeakReference<Session>(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
protected void setState(State state)
{
super.setState(state);
if (state == State.OPEN)
{
_actor.message(ChannelMessages.CREATE());
}
}
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
}
@Override
protected boolean isFull(int id)
{
return isCommandsFull(id);
}
public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
_transaction.enqueue(queues,message, new ServerTransaction.Action()
{
BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
public void postCommit()
{
for(int i = 0; i < _queues.length; i++)
{
try
{
_queues[i].enqueue(message);
}
catch (AMQException e)
{
// TODO
throw new RuntimeException(e);
}
}
}
public void onRollback()
{
// NO-OP
}
});
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
}
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
getConnectionModel().registerMessageDelivered(xfr.getBodySize());
invoke(xfr, postIdSettingAction);
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
{
_messageDispositionListenerMap.put(xfr.getId(), acceptListener);
}
private static interface MessageDispositionAction
{
void performAction(MessageDispositionChangeListener listener);
}
public void accept(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
listener.onAccept();
}
});
}
public void release(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
listener.onRelease();
}
});
}
public void reject(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
listener.onReject();
}
});
}
public RangeSet acquire(RangeSet transfers)
{
RangeSet acquired = new RangeSet();
if(!_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = transfers.iterator();
if(rangeIter.hasNext())
{
Range range = rangeIter.next();
while(range != null && unacceptedMessages.hasNext())
{
int next = unacceptedMessages.next();
while(gt(next, range.getUpper()))
{
if(rangeIter.hasNext())
{
range = rangeIter.next();
}
else
{
range = null;
break;
}
}
if(range != null && range.includes(next))
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
if(changeListener != null && changeListener.acquire())
{
acquired.add(next);
}
}
}
}
}
return acquired;
}
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
if(ranges != null && !_messageDispositionListenerMap.isEmpty())
{
Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
Iterator<Range> rangeIter = ranges.iterator();
if(rangeIter.hasNext())
{
Range range = rangeIter.next();
while(range != null && unacceptedMessages.hasNext())
{
int next = unacceptedMessages.next();
while(gt(next, range.getUpper()))
{
if(rangeIter.hasNext())
{
range = rangeIter.next();
}
else
{
range = null;
break;
}
}
if(range != null && range.includes(next))
{
MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
action.performAction(changeListener);
}
}
}
}
}
public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
public void onClose()
{
_transaction.rollback();
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease();
}
_messageDispositionListenerMap.clear();
getConfigStore().removeConfiguredObject(this);
for (Task task : _taskList)
{
task.doTask(this);
}
CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
}
@Override
protected void awaitClose()
{
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
{
_transaction.dequeue(entry.getQueue(), entry.getMessage(),
new ServerTransaction.Action()
{
public void postCommit()
{
sub.acknowledge(entry);
}
public void onRollback()
{
entry.release();
}
});
updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
{
return _subscriptions.values();
}
public void register(String destination, Subscription_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub);
}
public Subscription_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination);
}
public void unregister(Subscription_0_10 sub)
{
_subscriptions.remove(sub.getConsumerTag().toString());
try
{
sub.getSendLock();
AMQQueue queue = sub.getQueue();
if(queue != null)
{
queue.unregisterSubscription(sub);
}
}
catch (AMQException e)
{
// TODO
_logger.error("Failed to unregister subscription", e);
}
finally
{
sub.releaseSendLock();
}
}
public boolean isTransactional()
{
// this does not look great but there should only be one "non-transactional"
// transactional context, while there could be several transactional ones in
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
}
public void selectTx()
{
_transaction = new LocalTransaction(this.getMessageStore());
_txnStarts.incrementAndGet();
}
public void commit()
{
_transaction.commit();
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
public void rollback()
{
_transaction.rollback();
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
{
//There can currently only be at most one outstanding transaction
//due to only having LocalTransaction support. Set value to 1 if 0.
_txnCount.compareAndSet(0,1);
}
}
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
{
//There can currently only be at most one outstanding transaction
//due to only having LocalTransaction support. Set value to 0 if 1.
_txnCount.compareAndSet(1,0);
}
}
/**
* Update last transaction activity timestamp
*/
public void updateTransactionalActivity()
{
if (isTransactional())
{
_txnUpdateTime.set(System.currentTimeMillis());
}
}
public Long getTxnStarts()
{
return _txnStarts.get();
}
public Long getTxnCommits()
{
return _txnCommits.get();
}
public Long getTxnRejects()
{
return _txnRejects.get();
}
public Long getTxnCount()
{
return _txnCount.get();
}
public Principal getAuthorizedPrincipal()
{
return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
}
public Subject getAuthorizedSubject()
{
return ((ServerConnection) getConnection()).getAuthorizedSubject();
}
public void addSessionCloseTask(Task task)
{
_taskList.add(task);
}
public void removeSessionCloseTask(Task task)
{
_taskList.remove(task);
}
public WeakReference<Session> getReference()
{
return _reference;
}
public MessageStore getMessageStore()
{
return getVirtualHost().getMessageStore();
}
public VirtualHost getVirtualHost()
{
return (VirtualHost) _connectionConfig.getVirtualHost();
}
public UUID getId()
{
return _id;
}
public SessionConfigType getConfigType()
{
return SessionConfigType.getInstance();
}
public ConfiguredObject getParent()
{
return getVirtualHost();
}
public boolean isDurable()
{
return false;
}
public boolean isAttached()
{
return true;
}
public long getDetachedLifespan()
{
return 0;
}
public Long getExpiryTime()
{
return null;
}
public Long getMaxClientRate()
{
return null;
}
public ConnectionConfig getConnectionConfig()
{
return _connectionConfig;
}
public String getSessionName()
{
return getName().toString();
}
public long getCreateTime()
{
return _createTime;
}
public void mgmtClose()
{
close();
}
public Object getID()
{
return getName();
}
public AMQConnectionModel getConnectionModel()
{
return (ServerConnection) getConnection();
}
public String getClientID()
{
return getConnection().getClientId();
}
public LogActor getLogActor()
{
return _actor;
}
public LogSubject getLogSubject()
{
return (LogSubject) this;
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
if (inTransaction())
{
long currentTime = System.currentTimeMillis();
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
// Log a warning on idle or open transactions
if (idleWarn > 0L && idleTime > idleWarn)
{
CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
_logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
}
else if (openWarn > 0L && openTime > openWarn)
{
CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
_logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
}
// Close connection for idle or open transactions that have timed out
if (idleClose > 0L && idleTime > idleClose)
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
}
else if (openClose > 0L && openTime > openClose)
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
}
}
}
public String toLogString()
{
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
((ServerConnection) getConnection()).getConnectionId(),
getClientID(),
((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
getVirtualHost().getName(),
getChannel())
+ "] ";
}
@Override
public void close()
{
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
super.close();
}
}