blob: 4b69842c491a87366a295af3f72be4e3347f23fe [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private final IoSession _minaProtocolSession;
private AMQShortString _contextKey;
private AMQShortString _clientVersion = null;
private VirtualHost _virtualHost;
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
private AMQCodecFactory _codecFactory;
private AMQProtocolSessionMBean _managedObject;
private SaslServer _saslServer;
private Object _lastReceived;
private Object _lastSent;
protected boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
private org.apache.mina.common.WriteFuture _lastWriteFuture;
public ManagedObject getManagedObject()
{
return _managedObject;
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
_codecFactory = codecFactory;
try
{
IoServiceConfig config = session.getServiceConfig();
ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
}
catch (RuntimeException e)
{
e.printStackTrace();
throw e;
}
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
_codecFactory = codecFactory;
}
private AMQProtocolSessionMBean createMBean() throws AMQException
{
try
{
return new AMQProtocolSessionMBean(this);
}
catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
}
}
public IoSession getIOSession()
{
return _minaProtocolSession;
}
public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
{
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
{
protocolInitiationReceived((ProtocolInitiation) message);
}
else if (message instanceof AMQFrame)
{
AMQFrame frame = (AMQFrame) message;
frameReceived(frame);
}
else
{
throw new UnknnownMessageTypeException(message);
}
}
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
if (_logger.isDebugEnabled())
{
_logger.debug("Frame Received: " + frame);
}
// Check that this channel is not closing
if (channelAwaitingClosure(channelId))
{
if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
{
if (_logger.isInfoEnabled())
{
_logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
}
}
else
{
if (_logger.isInfoEnabled())
{
_logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
}
closeProtocolSession();
return;
}
}
try
{
body.handle(channelId, this);
}
catch (AMQException e)
{
closeChannel(channelId);
throw e;
}
}
private void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
try
{
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
// This sets the protocol version (and hence framing classes) for this session.
setProtocolVersion(pv);
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) getProtocolMinorVersion(),
null,
mechanisms.getBytes(),
locales.getBytes());
_minaProtocolSession.write(responseBody.generateFrame(0));
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
_minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
// TODO: Close connection (but how to wait until message is sent?)
// ritchiem 2006-12-04 will this not do?
// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
// future.join();
// close connection
}
}
public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
try
{
try
{
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if (!_frameListeners.isEmpty())
{
for (AMQMethodListener listener : _frameListeners)
{
wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt);
}
}
catch (AMQChannelException e)
{
if (getChannel(channelId) != null)
{
if (_logger.isInfoEnabled())
{
_logger.info("Closing channel due to: " + e.getMessage());
}
writeFrame(e.getCloseFrame(channelId));
closeChannel(channelId);
}
else
{
if (_logger.isDebugEnabled())
{
_logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
}
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + e.getMessage());
}
AMQConnectionException ce =
evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
AMQConstant.CHANNEL_ERROR.getName().toString());
closeConnection(channelId, ce, false);
}
}
catch (AMQConnectionException e)
{
closeConnection(channelId, e, false);
}
}
catch (Exception e)
{
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
}
_logger.error("Unexpected exception while processing frame. Closing connection.", e);
closeProtocolSession();
}
}
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentHeader(body);
}
public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentBody(body);
}
public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
{
// NO - OP
}
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
*
* @param frame the frame to write
*/
public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
_lastWriteFuture = _minaProtocolSession.write(frame);
}
public AMQShortString getContextKey()
{
return _contextKey;
}
public void setContextKey(AMQShortString contextKey)
{
_contextKey = contextKey;
}
public List<AMQChannel> getChannels()
{
return new ArrayList<AMQChannel>(_channelMap.values());
}
public AMQChannel getAndAssertChannel(int channelId) throws AMQException
{
AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
}
return channel;
}
public AMQChannel getChannel(int channelId) throws AMQException
{
final AMQChannel channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
return null;
}
else
{
return channel;
}
}
public boolean channelAwaitingClosure(int channelId)
{
return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
{
if (_closed)
{
throw new AMQException("Session is closed");
}
final int channelId = channel.getChannelId();
if (_closingChannelsList.contains(channelId))
{
throw new AMQException("Session is marked awaiting channel close");
}
if (_channelMap.size() == _maxNoOfChannels)
{
String errorMessage =
toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
}
else
{
_channelMap.put(channel.getChannelId(), channel);
}
if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
{
_cachedChannels[channelId] = channel;
}
checkForNotification();
}
private void checkForNotification()
{
int channelsCount = _channelMap.size();
if (channelsCount >= _maxNoOfChannels)
{
_managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
}
}
public Long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
}
public void setMaximumNumberOfChannels(Long value)
{
_maxNoOfChannels = value;
}
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
channel.commit();
}
}
public void rollbackTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
channel.rollback();
}
}
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
*
* @param channelId id of the channel to close
*
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
public void closeChannel(int channelId) throws AMQException
{
final AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
}
else
{
try
{
channel.close();
markChannelAwaitingCloseOk(channelId);
}
finally
{
removeChannel(channelId);
}
}
}
public void closeChannelOk(int channelId)
{
// todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler.
// When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory.
// We do it from the Close Handler as we are sending the OK back to the client.
// While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
// will send a close-ok.. Where we should call removeChannel.
// However, due to the poor exception handling on the client. The client-user will be notified of the
// InvalidArgument and if they then decide to close the session/connection then the there will be time
// for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
//removeChannel(channelId);
_closingChannelsList.remove(new Integer(channelId));
}
private void markChannelAwaitingCloseOk(int channelId)
{
_closingChannelsList.add(channelId);
}
/**
* In our current implementation this is used by the clustering code.
*
* @param channelId The channel to remove
*/
public void removeChannel(int channelId)
{
_channelMap.remove(channelId);
if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
{
_cachedChannels[channelId] = null;
}
}
/**
* Initialise heartbeats on the session.
*
* @param delay delay in seconds (not ms)
*/
public void initHeartbeats(int delay)
{
if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
_minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
}
}
/**
* Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
*
* @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels() throws AMQException
{
for (AMQChannel channel : _channelMap.values())
{
channel.close();
}
_channelMap.clear();
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
_cachedChannels[i] = null;
}
}
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
if (!_closed)
{
_closed = true;
if (_virtualHost != null)
{
_virtualHost.getConnectionRegistry().deregisterConnection(this);
}
closeAllChannels();
if (_managedObject != null)
{
_managedObject.unregister();
}
for (Task task : _taskList)
{
task.doTask(this);
}
}
}
public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
{
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + e.getMessage());
}
markChannelAwaitingCloseOk(channelId);
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));
if (closeProtocolSession)
{
closeProtocolSession();
}
}
public void closeProtocolSession()
{
closeProtocolSession(true);
}
public void closeProtocolSession(boolean waitLast)
{
if (waitLast && (_lastWriteFuture != null))
{
_logger.debug("Waiting for last write to join.");
_lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
_logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
final CloseFuture future = _minaProtocolSession.close();
future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
catch (AMQException e)
{
_logger.info(e.getMessage());
}
}
public String toString()
{
return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
{
return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
}
/** @return an object that can be used to identity */
public Object getKey()
{
return _minaProtocolSession.getRemoteAddress();
}
/**
* Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
* be bound to multiple addresses this could vary depending on the acceptor this session was created from.
*
* @return a String FQDN
*/
public String getLocalFQDN()
{
SocketAddress address = _minaProtocolSession.getLocalAddress();
// we use the vmpipe address in some tests hence the need for this rather ugly test. The host
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
return ((InetSocketAddress) address).getHostName();
}
else if (address instanceof VmPipeAddress)
{
return "vmpipe:" + ((VmPipeAddress) address).getPort();
}
else
{
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
public SaslServer getSaslServer()
{
return _saslServer;
}
public void setSaslServer(SaslServer saslServer)
{
_saslServer = saslServer;
}
public FieldTable getClientProperties()
{
return _clientProperties;
}
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
if (_clientProperties != null)
{
if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)
{
setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
{
_clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
}
}
_sessionIdentifier = new ProtocolSessionIdentifier(this);
}
private void setProtocolVersion(ProtocolVersion pv)
{
_protocolVersion = pv;
_protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
}
public byte getProtocolMajorVersion()
{
return _protocolVersion.getMajorVersion();
}
public ProtocolVersion getProtocolVersion()
{
return _protocolVersion;
}
public byte getProtocolMinorVersion()
{
return _protocolVersion.getMinorVersion();
}
public boolean isProtocolVersion(byte major, byte minor)
{
return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
}
public MethodRegistry getRegistry()
{
return getMethodRegistry();
}
public Object getClientIdentifier()
{
return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
{
return _virtualHost;
}
public void setVirtualHost(VirtualHost virtualHost) throws AMQException
{
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
_managedObject = createMBean();
_managedObject.register();
}
public void addSessionCloseTask(Task task)
{
_taskList.add(task);
}
public void removeSessionCloseTask(Task task)
{
_taskList.remove(task);
}
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
}
public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
}
public Principal getAuthorizedID()
{
return _authorizedID;
}
public MethodRegistry getMethodRegistry()
{
return MethodRegistry.getMethodRegistry(getProtocolVersion());
}
public MethodDispatcher getMethodDispatcher()
{
return _dispatcher;
}
public ProtocolSessionIdentifier getSessionIdentifier()
{
return _sessionIdentifier;
}
public String getClientVersion()
{
return (_clientVersion == null) ? null : _clientVersion.toString();
}
public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// No-op, interface munging between this and AMQProtocolSession
}
public void init()
{
// No-op, interface munging between this and AMQProtocolSession
}
}