blob: f270665b2085ce65702554ec4d463751b62e68c2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.transport;
import static org.apache.qpid.transport.Connection.State.CLOSED;
import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
/**
* Connection
*
* @author Rafael H. Schloming
*
* TODO the channels map should probably be replaced with something
* more efficient, e.g. an array or a map implementation that can use
* short instead of Short
*/
public class Connection extends ConnectionInvoker
implements ProtocolEventReceiver, ProtocolEventSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);
//Usable channels are numbered 0 to <ChannelMax> - 1
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
private long _lastSendTime;
private long _lastReadTime;
private NetworkConnection _networkConnection;
private FrameSizeObserver _frameSizeObserver;
private boolean _messageCompressionSupported;
private final AtomicBoolean _redirecting = new AtomicBoolean();
private boolean _virtualHostPropertiesSupported;
private boolean _queueLifetimePolicySupported;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
public static interface SessionFactory
{
Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay);
}
private static final class DefaultSessionFactory implements SessionFactory
{
public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay)
{
return new Session(conn, name, expiry, isNoReplay);
}
}
private static final SessionFactory DEFAULT_SESSION_FACTORY = new DefaultSessionFactory();
private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY;
private ConnectionDelegate delegate;
private ProtocolEventSender sender;
final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
final private Map<Integer,Session> channels = new ConcurrentHashMap<Integer,Session>();
private State state = NEW;
final private Object lock = new Object();
private long timeout = 60000;
private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
private ConnectionException error = null;
private int channelMax = 1;
private String locale;
private SaslServer saslServer;
private SaslClient saslClient;
private int idleTimeout = 0;
private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
private SocketAddress _remoteAddress;
private SocketAddress _localAddress;
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
{
this.delegate = delegate;
}
public void addConnectionListener(ConnectionListener listener)
{
listeners.add(listener);
}
public List<ConnectionListener> getListeners()
{
return Collections.unmodifiableList(listeners);
}
public ProtocolEventSender getSender()
{
return sender;
}
public void setSender(ProtocolEventSender sender)
{
this.sender = sender;
}
protected void setState(State state)
{
synchronized (lock)
{
this.state = state;
lock.notifyAll();
}
}
protected void setLocale(String locale)
{
this.locale = locale;
}
String getLocale()
{
return locale;
}
void setSaslServer(SaslServer saslServer)
{
this.saslServer = saslServer;
}
SaslServer getSaslServer()
{
return saslServer;
}
void setSaslClient(SaslClient saslClient)
{
this.saslClient = saslClient;
}
public SaslClient getSaslClient()
{
return saslClient;
}
public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs)
{
connect(host, port, vhost, username, password, ssl, saslMechs, null);
}
public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, Map<String,Object> clientProps)
{
ConnectionSettings settings = new ConnectionSettings();
settings.setHost(host);
settings.setPort(port);
settings.setVhost(vhost);
settings.setUsername(username);
settings.setPassword(password);
settings.setUseSSL(ssl);
settings.setSaslMechs(saslMechs);
settings.setClientProperties(clientProps);
connect(settings);
}
public void connect(ConnectionSettings settings)
{
synchronized (lock)
{
conSettings = settings;
_redirecting.set(false);
state = OPENING;
connectionLost.set(false);
securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
IoNetworkTransport transport = new IoNetworkTransport();
final InputHandler inputHandler = new InputHandler(new Assembler(this));
addFrameSizeObserver(inputHandler);
ExceptionHandlingByteBufferReceiver secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
}
_networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
setRemoteAddress(_networkConnection.getRemoteAddress());
setLocalAddress(_networkConnection.getLocalAddress());
final ByteBufferSender secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
}
Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE);
sender = disassembler;
addFrameSizeObserver(disassembler);
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting()))
{
w.await();
}
if (error != null)
{
ConnectionException t = error;
error = null;
try
{
close();
}
catch (ConnectionException ce)
{
if (!(t instanceof ProtocolVersionException))
{
throw ce;
}
}
t.rethrow();
}
switch (state)
{
case OPENING:
close();
throw new ConnectionException("connect() timed out");
case OPEN:
case RESUMING:
connectionLost.set(false);
break;
case CLOSED:
throw new ConnectionException("connect() aborted");
default:
throw new IllegalStateException(String.valueOf(state));
}
}
for (ConnectionListener listener: listeners)
{
listener.opened(this);
}
}
public Session createSession()
{
return createSession(0);
}
public Session createSession(long expiry)
{
return createSession(expiry, false);
}
public Session createSession(long expiry, boolean isNoReplay)
{
return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
}
public Session createSession(String name)
{
return createSession(name, 0);
}
public Session createSession(String name, long expiry)
{
return createSession(Strings.toUTF8(name), expiry);
}
public Session createSession(String name, long expiry,boolean isNoReplay)
{
return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay);
}
public Session createSession(byte[] name, long expiry)
{
return createSession(new Binary(name), expiry);
}
public Session createSession(Binary name, long expiry)
{
return createSession(name, expiry, false);
}
public Session createSession(Binary name, long expiry, boolean isNoReplay)
{
synchronized (lock)
{
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state != OPEN && error == null)
{
w.await();
}
if (state != OPEN)
{
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay);
registerSession(ssn);
map(ssn);
ssn.attach();
return ssn;
}
}
public void registerSession(Session ssn)
{
synchronized (lock)
{
sessions.put(ssn.getName(),ssn);
}
}
public void removeSession(Session ssn)
{
synchronized (lock)
{
sessions.remove(ssn.getName());
}
}
public void setSessionFactory(SessionFactory sessionFactory)
{
assert sessionFactory != null;
_sessionFactory = sessionFactory;
}
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
}
public void received(ProtocolEvent event)
{
_lastReadTime = System.currentTimeMillis();
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("RECV: [{}] {}", this, String.valueOf(event));
}
event.delegate(this, delegate);
}
public void send(ProtocolEvent event)
{
_lastSendTime = System.currentTimeMillis();
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("SEND: [{}] {}", this, String.valueOf(event));
}
ProtocolEventSender s = sender;
if (s == null)
{
throw new ConnectionException("connection closed");
}
s.send(event);
}
public void flush()
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("FLUSH: [{}]", this);
}
final ProtocolEventSender theSender = sender;
if(theSender != null)
{
theSender.flush();
}
}
protected void invoke(Method method)
{
method.setChannel(0);
send(method);
if (!method.isBatch())
{
flush();
}
}
public void dispatch(Method method)
{
int channel = method.getChannel();
Session ssn = getSession(channel);
if(ssn != null)
{
ssn.received(method);
}
else
{
/*
* A peer receiving any other control on a detached transport MUST discard it and
* send a session.detached with the "not-attached" reason code.
*/
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("Control received on unattached channel : {}", channel);
}
invokeSessionDetached(channel, SessionDetachCode.NOT_ATTACHED);
}
}
public int getChannelMax()
{
return channelMax;
}
protected void setChannelMax(int max)
{
channelMax = max;
}
private int map(Session ssn)
{
synchronized (lock)
{
//For a negotiated channelMax N, there are channels 0 to N-1 available.
for (int i = 0; i < getChannelMax(); i++)
{
if (!channels.containsKey(i))
{
map(ssn, i);
return i;
}
}
throw new RuntimeException("no more channels available");
}
}
protected void map(Session ssn, int channel)
{
synchronized (lock)
{
channels.put(channel, ssn);
ssn.setChannel(channel);
}
}
void unmap(Session ssn)
{
synchronized (lock)
{
channels.remove(ssn.getChannel());
}
}
public Session getSession(int channel)
{
synchronized (lock)
{
return channels.get(channel);
}
}
public void resume()
{
synchronized (lock)
{
for (Session ssn : sessions.values())
{
map(ssn);
ssn.resume();
}
setState(OPEN);
}
}
public void exception(ConnectionException e)
{
connectionLost.set(true);
synchronized (lock)
{
switch (state)
{
case OPENING:
case CLOSING:
error = e;
lock.notifyAll();
return;
}
}
for (ConnectionListener listener: listeners)
{
listener.exception(this, e);
}
}
public void exception(Throwable t)
{
exception(new ConnectionException(t));
}
public void closeCode(ConnectionClose close)
{
synchronized (lock)
{
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
{
exception(new ConnectionException(close));
}
}
}
public void closed()
{
if (state == OPEN)
{
exception(new ConnectionException("connection aborted"));
}
LOGGER.debug("connection closed: {}", this);
synchronized (lock)
{
List<Session> values = new ArrayList<Session>(channels.values());
for (Session ssn : values)
{
ssn.closed();
}
try
{
sender.close();
}
catch(Exception e)
{
// ignore.
}
sender = null;
setState(CLOSED);
}
for (ConnectionListener listener: listeners)
{
listener.closed(this);
}
}
public void close()
{
close(ConnectionCloseCode.NORMAL, null);
}
protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
connectionClose(replyCode, replyText, _options);
}
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
{
switch (state)
{
case OPEN:
state = CLOSING;
connectionClose(replyCode, replyText, _options);
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state == CLOSING && error == null)
{
w.await();
}
if (error != null)
{
close(replyCode, replyText, _options);
throw new ConnectionException(error);
}
switch (state)
{
case CLOSING:
close(replyCode, replyText, _options);
throw new ConnectionException("close() timed out");
case CLOSED:
break;
default:
throw new IllegalStateException(String.valueOf(state));
}
break;
case CLOSED:
break;
default:
if (sender != null)
{
sender.close();
w = new Waiter(lock, timeout);
while (w.hasTime() && sender != null && error == null)
{
w.await();
}
if (error != null)
{
throw new ConnectionException(error);
}
if (sender != null)
{
throw new ConnectionException("close() timed out");
}
}
break;
}
}
}
public String getUserID()
{
return userID;
}
public void setUserID(String id)
{
userID = id;
}
public void setServerProperties(final Map<String, Object> serverProperties)
{
_serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
_messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
_virtualHostPropertiesSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED)));
_queueLifetimePolicySupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED)));
}
public Map<String, Object> getServerProperties()
{
return _serverProperties;
}
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
}
public ConnectionSettings getConnectionSettings()
{
return conSettings;
}
public SecurityLayer getSecurityLayer()
{
return securityLayer;
}
public boolean isConnectionResuming()
{
return connectionLost.get();
}
protected boolean isConnectionLost()
{
return connectionLost.get();
}
protected Collection<? extends Session> getChannels()
{
return new ArrayList<>(channels.values());
}
public boolean hasSessionWithName(final byte[] name)
{
return sessions.containsKey(new Binary(name));
}
public void notifyFailoverRequired()
{
for (Session ssn : getChannels())
{
ssn.notifyFailoverRequired();
}
}
public SocketAddress getRemoteSocketAddress()
{
return _remoteAddress;
}
public SocketAddress getLocalAddress()
{
return _localAddress;
}
protected void setRemoteAddress(SocketAddress remoteAddress)
{
_remoteAddress = remoteAddress;
}
protected void setLocalAddress(SocketAddress localAddress)
{
_localAddress = localAddress;
}
private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode)
{
SessionDetached sessionDetached = new SessionDetached();
sessionDetached.setChannel(channel);
sessionDetached.setCode(sessionDetachCode);
invoke(sessionDetached);
}
protected void doHeartBeat()
{
connectionHeartbeat();
}
private class ConnectionActivity implements TransportActivity
{
@Override
public long getLastReadTime()
{
return _lastReadTime;
}
@Override
public long getLastWriteTime()
{
return _lastSendTime;
}
@Override
public void writerIdle()
{
getConnectionDelegate().writerIdle(Connection.this);
}
@Override
public void readerIdle()
{
LOGGER.error("Closing connection as no heartbeat or other activity detected within specified interval");
_networkConnection.close();
}
}
public void setNetworkConnection(NetworkConnection network)
{
_networkConnection = network;
}
public NetworkConnection getNetworkConnection()
{
return _networkConnection;
}
public void setMaxFrameSize(final int maxFrameSize)
{
if(_frameSizeObserver != null)
{
_frameSizeObserver.setMaxFrameSize(maxFrameSize);
}
}
public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver)
{
if(_frameSizeObserver == null)
{
_frameSizeObserver = frameSizeObserver;
}
else
{
final FrameSizeObserver currentObserver = _frameSizeObserver;
_frameSizeObserver = new FrameSizeObserver()
{
@Override
public void setMaxFrameSize(final int frameSize)
{
currentObserver.setMaxFrameSize(frameSize);
frameSizeObserver.setMaxFrameSize(frameSize);
}
};
}
}
public boolean isMessageCompressionSupported()
{
return _messageCompressionSupported;
}
public boolean isVirtualHostPropertiesSupported()
{
return _virtualHostPropertiesSupported;
}
public boolean isQueueLifetimePolicySupported()
{
return _queueLifetimePolicySupported;
}
public boolean isRedirecting()
{
return _redirecting.get();
}
public void setRedirecting(final boolean redirecting)
{
_redirecting.set(redirecting);
}
public boolean isClosing()
{
synchronized (lock)
{
return state == CLOSING || state == CLOSED;
}
}
protected void sendConnectionSecure(byte[] challenge, Option ... options)
{
super.connectionSecure(challenge, options);
}
protected void sendConnectionTune(int channelMax, int maxFrameSize, int heartbeatMin, int heartbeatMax, Option ... options)
{
super.connectionTune(channelMax, maxFrameSize, heartbeatMin, heartbeatMax, options);
}
protected void sendConnectionStart(final Map<String, Object> clientProperties,
final List<Object> mechanisms,
final List<Object> locales, final Option... options)
{
super.connectionStart(clientProperties, mechanisms, locales, options);
}
}